From 9e0ad439997da33b3f8c2b28dc03d084731448d7 Mon Sep 17 00:00:00 2001 From: wanghep Date: Mon, 20 Apr 2026 20:41:18 +0800 Subject: [PATCH] feat: check lhb data on api startup --- README.md | 4 ++ backend/src/lhbfx/app.py | 12 +++++ backend/src/lhbfx/pipeline.py | 99 ++++++++++++++++++++++++++++++++++- 3 files changed, 114 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 9485b34..2410247 100644 --- a/README.md +++ b/README.md @@ -38,6 +38,10 @@ - 优先读取数据库中的股票元数据。 - 外部快照失败时增加备用行情源兜底。 - 收盘后更新流程会同步补全行业、市值、流通市值等字段。 +- 后端 API 启动时会自动检查最近龙虎榜数据: + - 如果数据库最新交易日已经是当天,则跳过。 + - 如果发现最近工作日数据缺失,会尝试自动补齐并重新生成预警。 + - 检查失败不会阻塞 API 启动,避免服务不可用。 ## 环境要求 diff --git a/backend/src/lhbfx/app.py b/backend/src/lhbfx/app.py index 094912d..aabd4fa 100644 --- a/backend/src/lhbfx/app.py +++ b/backend/src/lhbfx/app.py @@ -1,5 +1,6 @@ from __future__ import annotations +import logging from pathlib import Path from fastapi import FastAPI, HTTPException @@ -8,6 +9,7 @@ from fastapi.staticfiles import StaticFiles from pydantic import BaseModel from .pipeline import ( + check_and_repair_recent_data, generate_warnings, get_pipeline_status, import_daily, @@ -32,6 +34,7 @@ BASE_DIR = Path(__file__).resolve().parents[2] PROJECT_DIR = BASE_DIR.parent FRONTEND_DIST_DIR = PROJECT_DIR / "frontend" / "dist" FRONTEND_ASSETS_DIR = FRONTEND_DIST_DIR / "assets" +logger = logging.getLogger(__name__) app = FastAPI(title="lhbfx API", version="0.1.0") @@ -47,6 +50,15 @@ if FRONTEND_ASSETS_DIR.exists(): app.mount("/assets", StaticFiles(directory=str(FRONTEND_ASSETS_DIR)), name="assets") +@app.on_event("startup") +def startup_check_recent_data() -> None: + try: + result = check_and_repair_recent_data() + logger.info("startup data check result: %s", result) + except Exception: + logger.exception("startup data check failed") + + @app.get("/api/summary") def api_summary(): return fetch_summary() diff --git a/backend/src/lhbfx/pipeline.py b/backend/src/lhbfx/pipeline.py index 4f977d1..dcb8ebd 100644 --- a/backend/src/lhbfx/pipeline.py +++ b/backend/src/lhbfx/pipeline.py @@ -3,8 +3,9 @@ from __future__ import annotations import json import time from dataclasses import dataclass -from datetime import date, timedelta +from datetime import date, datetime, timedelta from typing import Any +from zoneinfo import ZoneInfo from .config import AppConfig, load_config from .db import db_cursor @@ -22,6 +23,9 @@ class ImportResult: detail_count: int +SHANGHAI_TZ = ZoneInfo("Asia/Shanghai") + + def build_seat_matchers(config: AppConfig) -> list[tuple[str, str, str]]: matchers: list[tuple[str, str, str]] = [] for trader in config.traders: @@ -228,6 +232,99 @@ def build_schedule(year: int, start: str | None = None, end: str | None = None) return dates +def get_latest_imported_trade_date(*, config: AppConfig | None = None) -> str | None: + cfg = config or load_config() + with db_cursor(config=cfg) as (_, cursor): + cursor.execute("SELECT MAX(trade_date) AS latest_trade_date FROM lhb_overview") + row = cursor.fetchone() + + latest_trade_date = row["latest_trade_date"] if row else None + if latest_trade_date is None: + return None + if hasattr(latest_trade_date, "isoformat"): + return latest_trade_date.isoformat() + return str(latest_trade_date) + + +def check_and_repair_recent_data( + *, + config: AppConfig | None = None, + today: str | None = None, + max_backfill_days: int = 7, +) -> dict[str, Any]: + cfg = config or load_config() + today_date = date.fromisoformat(today) if today else datetime.now(SHANGHAI_TZ).date() + today_text = today_date.isoformat() + + if today_date.weekday() >= 5: + return { + "checked": True, + "skipped": True, + "reason": "weekend", + "today": today_text, + "latest_trade_date": get_latest_imported_trade_date(config=cfg), + "imported": [], + "failed": [], + } + + latest_trade_date = get_latest_imported_trade_date(config=cfg) + if latest_trade_date and latest_trade_date >= today_text: + return { + "checked": True, + "skipped": True, + "reason": "up_to_date", + "today": today_text, + "latest_trade_date": latest_trade_date, + "imported": [], + "failed": [], + } + + start_date = today_date - timedelta(days=max_backfill_days) + if latest_trade_date: + latest_date = date.fromisoformat(latest_trade_date) + start_date = max(start_date, latest_date + timedelta(days=1)) + + imported: list[dict[str, Any]] = [] + failed: list[dict[str, Any]] = [] + current = start_date + while current <= today_date: + if current.weekday() >= 5: + current += timedelta(days=1) + continue + + trade_date = current.isoformat() + try: + result = import_daily(trade_date, config=cfg) + if result.overview_count > 0: + imported.append( + { + "trade_date": result.trade_date, + "overview_count": result.overview_count, + "detail_count": result.detail_count, + } + ) + except Exception as exc: + failed.append({"trade_date": trade_date, "error": repr(exc)}) + current += timedelta(days=1) + + warning_result = None + rematch_result = None + if imported: + rematch_result = rematch_traders(config=cfg) + warning_result = generate_warnings(config=cfg) + + return { + "checked": True, + "skipped": False, + "today": today_text, + "latest_trade_date": latest_trade_date, + "imported": imported, + "failed": failed, + "rematch_result": rematch_result, + "warning_result": warning_result, + } + + def import_range( *, year: int | None = None,