diff --git a/README.md b/README.md index 2410247..7d98364 100644 --- a/README.md +++ b/README.md @@ -42,6 +42,10 @@ - 如果数据库最新交易日已经是当天,则跳过。 - 如果发现最近工作日数据缺失,会尝试自动补齐并重新生成预警。 - 检查失败不会阻塞 API 启动,避免服务不可用。 +- 后端服务内置收盘后调度: + - 工作日 17:00 之后会自动尝试执行收盘后更新。 + - 如果 17:00 时数据源尚未准备好,会按固定间隔重试,直到当天数据补齐。 + - 本地和服务器使用同一套调度逻辑,不再依赖本机 Codex 自动任务。 ## 环境要求 diff --git a/backend/scripts/after_market_update.py b/backend/scripts/after_market_update.py index 981a4b6..4c79596 100644 --- a/backend/scripts/after_market_update.py +++ b/backend/scripts/after_market_update.py @@ -2,20 +2,12 @@ from __future__ import annotations import argparse from datetime import datetime -from zoneinfo import ZoneInfo from _bootstrap import add_src_to_path add_src_to_path() -from lhbfx.config import load_config -from lhbfx.mailer import send_email -from lhbfx.pdf_export import generate_daily_report_pdf -from lhbfx.pipeline import generate_warnings, import_daily, rematch_traders -from lhbfx.reporting import build_daily_report, build_email_body, default_report_output_path - - -SHANGHAI_TZ = ZoneInfo("Asia/Shanghai") +from lhbfx.after_market import SHANGHAI_TZ, default_trade_date, run_after_market_update def parse_args() -> argparse.Namespace: @@ -25,91 +17,28 @@ def parse_args() -> argparse.Namespace: parser.add_argument("--force", action="store_true", help="Run even if the target date is not a weekday") return parser.parse_args() - -def default_trade_date() -> str: - return datetime.now(SHANGHAI_TZ).date().isoformat() - - def is_weekday(trade_date: str) -> bool: return datetime.fromisoformat(trade_date).weekday() < 5 -def build_update_email_body( - *, - trade_date: str, - overview_count: int, - detail_count: int, - rematch_updated: int, - warning_total: int, - report_body: str, -) -> str: - lines = [ - f"lhbfx 收盘后更新完成 - {trade_date}", - "", - f"龙虎榜概览更新数: {overview_count}", - f"席位明细更新数: {detail_count}", - f"游资重新匹配数: {rematch_updated}", - f"预警总数: {warning_total}", - "", - ] - lines.append(report_body) - return "\n".join(lines) - - def main() -> None: args = parse_args() - config = load_config() trade_date = args.trade_date or default_trade_date() if not args.force and not is_weekday(trade_date): print(f"Skip {trade_date}: not a weekday, assumed non-trading day.") return - import_result = import_daily(trade_date, config=config) - if import_result.overview_count <= 0: - print(f"Skip {trade_date}: no overview rows imported, source data may not be ready or market may be closed.") + result = run_after_market_update( + trade_date=trade_date, + send_report_email=args.send_email, + force=args.force, + ) + if result["status"] == "skipped": + print(f"Skip {trade_date}: {result['reason']}") return - rematch_result = rematch_traders(config=config) - warning_result = generate_warnings(config=config) - - report = build_daily_report(config=config, trade_date=trade_date) - pdf_path = default_report_output_path(trade_date) - generate_daily_report_pdf(report, pdf_path) - - print( - "After-market update finished:", - { - "trade_date": trade_date, - "overview_count": import_result.overview_count, - "detail_count": import_result.detail_count, - "rematch_updated": rematch_result["updated"], - "warning_total": warning_result["total"], - "pdf_path": str(pdf_path), - }, - ) - - if args.send_email: - if config.mail is None: - raise RuntimeError("Mail config is missing") - - report_body = build_email_body(report) - body_text = build_update_email_body( - trade_date=trade_date, - overview_count=import_result.overview_count, - detail_count=import_result.detail_count, - rematch_updated=rematch_result["updated"], - warning_total=warning_result["total"], - report_body=report_body, - ) - subject = f"lhbfx 收盘后更新完成 - {trade_date}" - send_email( - mail_config=config.mail, - subject=subject, - body_text=body_text, - attachments=[pdf_path], - ) - print(f"Email sent to: {', '.join(config.mail.recipients)}") + print("After-market update finished:", result) if __name__ == "__main__": diff --git a/backend/src/lhbfx/after_market.py b/backend/src/lhbfx/after_market.py new file mode 100644 index 0000000..89e6e11 --- /dev/null +++ b/backend/src/lhbfx/after_market.py @@ -0,0 +1,113 @@ +from __future__ import annotations + +from datetime import datetime +from typing import Any +from zoneinfo import ZoneInfo + +from .config import AppConfig, load_config +from .mailer import send_email +from .pdf_export import generate_daily_report_pdf +from .pipeline import generate_warnings, import_daily, rematch_traders +from .reporting import build_daily_report, build_email_body, default_report_output_path + + +SHANGHAI_TZ = ZoneInfo("Asia/Shanghai") + + +def default_trade_date() -> str: + return datetime.now(SHANGHAI_TZ).date().isoformat() + + +def is_weekday(trade_date: str) -> bool: + return datetime.fromisoformat(trade_date).weekday() < 5 + + +def build_update_email_body( + *, + trade_date: str, + overview_count: int, + detail_count: int, + rematch_updated: int, + warning_total: int, + report_body: str, +) -> str: + lines = [ + f"lhbfx 收盘后更新完成 - {trade_date}", + "", + f"龙虎榜概览更新数: {overview_count}", + f"席位明细更新数: {detail_count}", + f"游资重新匹配数: {rematch_updated}", + f"预警总数: {warning_total}", + "", + ] + lines.append(report_body) + return "\n".join(lines) + + +def run_after_market_update( + *, + config: AppConfig | None = None, + trade_date: str | None = None, + send_report_email: bool = False, + force: bool = False, +) -> dict[str, Any]: + cfg = config or load_config() + effective_trade_date = trade_date or default_trade_date() + + if not force and not is_weekday(effective_trade_date): + return { + "status": "skipped", + "reason": "weekend", + "trade_date": effective_trade_date, + } + + import_result = import_daily(effective_trade_date, config=cfg) + if import_result.overview_count <= 0: + return { + "status": "skipped", + "reason": "no_data", + "trade_date": effective_trade_date, + "overview_count": import_result.overview_count, + "detail_count": import_result.detail_count, + } + + rematch_result = rematch_traders(config=cfg) + warning_result = generate_warnings(config=cfg) + + report = build_daily_report(config=cfg, trade_date=effective_trade_date) + pdf_path = default_report_output_path(effective_trade_date) + generate_daily_report_pdf(report, pdf_path) + + emailed = False + if send_report_email: + if cfg.mail is None: + raise RuntimeError("Mail config is missing") + + report_body = build_email_body(report) + body_text = build_update_email_body( + trade_date=effective_trade_date, + overview_count=import_result.overview_count, + detail_count=import_result.detail_count, + rematch_updated=rematch_result["updated"], + warning_total=warning_result["total"], + report_body=report_body, + ) + subject = f"lhbfx 收盘后更新完成 - {effective_trade_date}" + send_email( + mail_config=cfg.mail, + subject=subject, + body_text=body_text, + attachments=[pdf_path], + ) + emailed = True + + return { + "status": "completed", + "trade_date": effective_trade_date, + "overview_count": import_result.overview_count, + "detail_count": import_result.detail_count, + "rematch_updated": rematch_result["updated"], + "warning_total": warning_result["total"], + "pdf_path": str(pdf_path), + "emailed": emailed, + } diff --git a/backend/src/lhbfx/app.py b/backend/src/lhbfx/app.py index aabd4fa..16d3d25 100644 --- a/backend/src/lhbfx/app.py +++ b/backend/src/lhbfx/app.py @@ -28,6 +28,7 @@ from .queries import ( delete_watchlist_item, upsert_watchlist_item, ) +from .scheduler import start_after_market_scheduler BASE_DIR = Path(__file__).resolve().parents[2] @@ -58,6 +59,11 @@ def startup_check_recent_data() -> None: except Exception: logger.exception("startup data check failed") + try: + start_after_market_scheduler() + except Exception: + logger.exception("after-market scheduler startup failed") + @app.get("/api/summary") def api_summary(): diff --git a/backend/src/lhbfx/scheduler.py b/backend/src/lhbfx/scheduler.py new file mode 100644 index 0000000..87c12fd --- /dev/null +++ b/backend/src/lhbfx/scheduler.py @@ -0,0 +1,79 @@ +from __future__ import annotations + +import logging +import threading +import time +from datetime import datetime +from typing import Any + +from .after_market import SHANGHAI_TZ, run_after_market_update +from .config import AppConfig, load_config +from .pipeline import get_latest_imported_trade_date + + +logger = logging.getLogger(__name__) +_scheduler_thread: threading.Thread | None = None +_scheduler_started = False +_scheduler_lock = threading.Lock() + + +def _should_attempt_update(*, config: AppConfig, now: datetime) -> tuple[bool, str]: + if now.weekday() >= 5: + return False, "weekend" + if now.hour < 17: + return False, "before_after_market_window" + + latest_trade_date = get_latest_imported_trade_date(config=config) + if latest_trade_date and latest_trade_date >= now.date().isoformat(): + return False, "up_to_date" + + return True, "missing_today_data" + + +def _scheduler_loop(config: AppConfig, poll_interval_seconds: int, retry_interval_seconds: int) -> None: + last_attempt_at: float = 0 + while True: + try: + now = datetime.now(SHANGHAI_TZ) + should_attempt, reason = _should_attempt_update(config=config, now=now) + if should_attempt: + current_time = time.time() + if current_time - last_attempt_at >= retry_interval_seconds: + last_attempt_at = current_time + result = run_after_market_update( + config=config, + trade_date=now.date().isoformat(), + send_report_email=True, + ) + logger.info("after-market scheduler result: %s", result) + else: + logger.debug("after-market scheduler idle: %s", reason) + except Exception: + logger.exception("after-market scheduler iteration failed") + + time.sleep(poll_interval_seconds) + + +def start_after_market_scheduler( + *, + config: AppConfig | None = None, + poll_interval_seconds: int = 300, + retry_interval_seconds: int = 900, +) -> threading.Thread | None: + global _scheduler_thread, _scheduler_started + + with _scheduler_lock: + if _scheduler_started: + return _scheduler_thread + + cfg = config or load_config() + _scheduler_thread = threading.Thread( + target=_scheduler_loop, + args=(cfg, poll_interval_seconds, retry_interval_seconds), + name="lhbfx-after-market-scheduler", + daemon=True, + ) + _scheduler_thread.start() + _scheduler_started = True + logger.info("after-market scheduler started") + return _scheduler_thread