feat: unify after-market updates across environments

This commit is contained in:
wanghep
2026-04-27 19:31:13 +08:00
parent 2c1fc0da1f
commit a492c73cc6
5 changed files with 211 additions and 80 deletions

View File

@ -42,6 +42,10 @@
- 如果数据库最新交易日已经是当天,则跳过。 - 如果数据库最新交易日已经是当天,则跳过。
- 如果发现最近工作日数据缺失,会尝试自动补齐并重新生成预警。 - 如果发现最近工作日数据缺失,会尝试自动补齐并重新生成预警。
- 检查失败不会阻塞 API 启动,避免服务不可用。 - 检查失败不会阻塞 API 启动,避免服务不可用。
- 后端服务内置收盘后调度:
- 工作日 17:00 之后会自动尝试执行收盘后更新。
- 如果 17:00 时数据源尚未准备好,会按固定间隔重试,直到当天数据补齐。
- 本地和服务器使用同一套调度逻辑,不再依赖本机 Codex 自动任务。
## 环境要求 ## 环境要求

View File

@ -2,20 +2,12 @@ from __future__ import annotations
import argparse import argparse
from datetime import datetime from datetime import datetime
from zoneinfo import ZoneInfo
from _bootstrap import add_src_to_path from _bootstrap import add_src_to_path
add_src_to_path() add_src_to_path()
from lhbfx.config import load_config from lhbfx.after_market import SHANGHAI_TZ, default_trade_date, run_after_market_update
from lhbfx.mailer import send_email
from lhbfx.pdf_export import generate_daily_report_pdf
from lhbfx.pipeline import generate_warnings, import_daily, rematch_traders
from lhbfx.reporting import build_daily_report, build_email_body, default_report_output_path
SHANGHAI_TZ = ZoneInfo("Asia/Shanghai")
def parse_args() -> argparse.Namespace: def parse_args() -> argparse.Namespace:
@ -25,91 +17,28 @@ def parse_args() -> argparse.Namespace:
parser.add_argument("--force", action="store_true", help="Run even if the target date is not a weekday") parser.add_argument("--force", action="store_true", help="Run even if the target date is not a weekday")
return parser.parse_args() return parser.parse_args()
def default_trade_date() -> str:
return datetime.now(SHANGHAI_TZ).date().isoformat()
def is_weekday(trade_date: str) -> bool: def is_weekday(trade_date: str) -> bool:
return datetime.fromisoformat(trade_date).weekday() < 5 return datetime.fromisoformat(trade_date).weekday() < 5
def build_update_email_body(
*,
trade_date: str,
overview_count: int,
detail_count: int,
rematch_updated: int,
warning_total: int,
report_body: str,
) -> str:
lines = [
f"lhbfx 收盘后更新完成 - {trade_date}",
"",
f"龙虎榜概览更新数: {overview_count}",
f"席位明细更新数: {detail_count}",
f"游资重新匹配数: {rematch_updated}",
f"预警总数: {warning_total}",
"",
]
lines.append(report_body)
return "\n".join(lines)
def main() -> None: def main() -> None:
args = parse_args() args = parse_args()
config = load_config()
trade_date = args.trade_date or default_trade_date() trade_date = args.trade_date or default_trade_date()
if not args.force and not is_weekday(trade_date): if not args.force and not is_weekday(trade_date):
print(f"Skip {trade_date}: not a weekday, assumed non-trading day.") print(f"Skip {trade_date}: not a weekday, assumed non-trading day.")
return return
import_result = import_daily(trade_date, config=config) result = run_after_market_update(
if import_result.overview_count <= 0: trade_date=trade_date,
print(f"Skip {trade_date}: no overview rows imported, source data may not be ready or market may be closed.") send_report_email=args.send_email,
force=args.force,
)
if result["status"] == "skipped":
print(f"Skip {trade_date}: {result['reason']}")
return return
rematch_result = rematch_traders(config=config) print("After-market update finished:", result)
warning_result = generate_warnings(config=config)
report = build_daily_report(config=config, trade_date=trade_date)
pdf_path = default_report_output_path(trade_date)
generate_daily_report_pdf(report, pdf_path)
print(
"After-market update finished:",
{
"trade_date": trade_date,
"overview_count": import_result.overview_count,
"detail_count": import_result.detail_count,
"rematch_updated": rematch_result["updated"],
"warning_total": warning_result["total"],
"pdf_path": str(pdf_path),
},
)
if args.send_email:
if config.mail is None:
raise RuntimeError("Mail config is missing")
report_body = build_email_body(report)
body_text = build_update_email_body(
trade_date=trade_date,
overview_count=import_result.overview_count,
detail_count=import_result.detail_count,
rematch_updated=rematch_result["updated"],
warning_total=warning_result["total"],
report_body=report_body,
)
subject = f"lhbfx 收盘后更新完成 - {trade_date}"
send_email(
mail_config=config.mail,
subject=subject,
body_text=body_text,
attachments=[pdf_path],
)
print(f"Email sent to: {', '.join(config.mail.recipients)}")
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -0,0 +1,113 @@
from __future__ import annotations
from datetime import datetime
from typing import Any
from zoneinfo import ZoneInfo
from .config import AppConfig, load_config
from .mailer import send_email
from .pdf_export import generate_daily_report_pdf
from .pipeline import generate_warnings, import_daily, rematch_traders
from .reporting import build_daily_report, build_email_body, default_report_output_path
SHANGHAI_TZ = ZoneInfo("Asia/Shanghai")
def default_trade_date() -> str:
return datetime.now(SHANGHAI_TZ).date().isoformat()
def is_weekday(trade_date: str) -> bool:
return datetime.fromisoformat(trade_date).weekday() < 5
def build_update_email_body(
*,
trade_date: str,
overview_count: int,
detail_count: int,
rematch_updated: int,
warning_total: int,
report_body: str,
) -> str:
lines = [
f"lhbfx 收盘后更新完成 - {trade_date}",
"",
f"龙虎榜概览更新数: {overview_count}",
f"席位明细更新数: {detail_count}",
f"游资重新匹配数: {rematch_updated}",
f"预警总数: {warning_total}",
"",
]
lines.append(report_body)
return "\n".join(lines)
def run_after_market_update(
*,
config: AppConfig | None = None,
trade_date: str | None = None,
send_report_email: bool = False,
force: bool = False,
) -> dict[str, Any]:
cfg = config or load_config()
effective_trade_date = trade_date or default_trade_date()
if not force and not is_weekday(effective_trade_date):
return {
"status": "skipped",
"reason": "weekend",
"trade_date": effective_trade_date,
}
import_result = import_daily(effective_trade_date, config=cfg)
if import_result.overview_count <= 0:
return {
"status": "skipped",
"reason": "no_data",
"trade_date": effective_trade_date,
"overview_count": import_result.overview_count,
"detail_count": import_result.detail_count,
}
rematch_result = rematch_traders(config=cfg)
warning_result = generate_warnings(config=cfg)
report = build_daily_report(config=cfg, trade_date=effective_trade_date)
pdf_path = default_report_output_path(effective_trade_date)
generate_daily_report_pdf(report, pdf_path)
emailed = False
if send_report_email:
if cfg.mail is None:
raise RuntimeError("Mail config is missing")
report_body = build_email_body(report)
body_text = build_update_email_body(
trade_date=effective_trade_date,
overview_count=import_result.overview_count,
detail_count=import_result.detail_count,
rematch_updated=rematch_result["updated"],
warning_total=warning_result["total"],
report_body=report_body,
)
subject = f"lhbfx 收盘后更新完成 - {effective_trade_date}"
send_email(
mail_config=cfg.mail,
subject=subject,
body_text=body_text,
attachments=[pdf_path],
)
emailed = True
return {
"status": "completed",
"trade_date": effective_trade_date,
"overview_count": import_result.overview_count,
"detail_count": import_result.detail_count,
"rematch_updated": rematch_result["updated"],
"warning_total": warning_result["total"],
"pdf_path": str(pdf_path),
"emailed": emailed,
}

View File

@ -28,6 +28,7 @@ from .queries import (
delete_watchlist_item, delete_watchlist_item,
upsert_watchlist_item, upsert_watchlist_item,
) )
from .scheduler import start_after_market_scheduler
BASE_DIR = Path(__file__).resolve().parents[2] BASE_DIR = Path(__file__).resolve().parents[2]
@ -58,6 +59,11 @@ def startup_check_recent_data() -> None:
except Exception: except Exception:
logger.exception("startup data check failed") logger.exception("startup data check failed")
try:
start_after_market_scheduler()
except Exception:
logger.exception("after-market scheduler startup failed")
@app.get("/api/summary") @app.get("/api/summary")
def api_summary(): def api_summary():

View File

@ -0,0 +1,79 @@
from __future__ import annotations
import logging
import threading
import time
from datetime import datetime
from typing import Any
from .after_market import SHANGHAI_TZ, run_after_market_update
from .config import AppConfig, load_config
from .pipeline import get_latest_imported_trade_date
logger = logging.getLogger(__name__)
_scheduler_thread: threading.Thread | None = None
_scheduler_started = False
_scheduler_lock = threading.Lock()
def _should_attempt_update(*, config: AppConfig, now: datetime) -> tuple[bool, str]:
if now.weekday() >= 5:
return False, "weekend"
if now.hour < 17:
return False, "before_after_market_window"
latest_trade_date = get_latest_imported_trade_date(config=config)
if latest_trade_date and latest_trade_date >= now.date().isoformat():
return False, "up_to_date"
return True, "missing_today_data"
def _scheduler_loop(config: AppConfig, poll_interval_seconds: int, retry_interval_seconds: int) -> None:
last_attempt_at: float = 0
while True:
try:
now = datetime.now(SHANGHAI_TZ)
should_attempt, reason = _should_attempt_update(config=config, now=now)
if should_attempt:
current_time = time.time()
if current_time - last_attempt_at >= retry_interval_seconds:
last_attempt_at = current_time
result = run_after_market_update(
config=config,
trade_date=now.date().isoformat(),
send_report_email=True,
)
logger.info("after-market scheduler result: %s", result)
else:
logger.debug("after-market scheduler idle: %s", reason)
except Exception:
logger.exception("after-market scheduler iteration failed")
time.sleep(poll_interval_seconds)
def start_after_market_scheduler(
*,
config: AppConfig | None = None,
poll_interval_seconds: int = 300,
retry_interval_seconds: int = 900,
) -> threading.Thread | None:
global _scheduler_thread, _scheduler_started
with _scheduler_lock:
if _scheduler_started:
return _scheduler_thread
cfg = config or load_config()
_scheduler_thread = threading.Thread(
target=_scheduler_loop,
args=(cfg, poll_interval_seconds, retry_interval_seconds),
name="lhbfx-after-market-scheduler",
daemon=True,
)
_scheduler_thread.start()
_scheduler_started = True
logger.info("after-market scheduler started")
return _scheduler_thread