feat: check lhb data on api startup

This commit is contained in:
wanghep
2026-04-20 20:41:18 +08:00
parent 001ef0dfde
commit 9e0ad43999
3 changed files with 114 additions and 1 deletions

View File

@ -38,6 +38,10 @@
- 优先读取数据库中的股票元数据。
- 外部快照失败时增加备用行情源兜底。
- 收盘后更新流程会同步补全行业、市值、流通市值等字段。
- 后端 API 启动时会自动检查最近龙虎榜数据:
- 如果数据库最新交易日已经是当天,则跳过。
- 如果发现最近工作日数据缺失,会尝试自动补齐并重新生成预警。
- 检查失败不会阻塞 API 启动,避免服务不可用。
## 环境要求

View File

@ -1,5 +1,6 @@
from __future__ import annotations
import logging
from pathlib import Path
from fastapi import FastAPI, HTTPException
@ -8,6 +9,7 @@ from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from .pipeline import (
check_and_repair_recent_data,
generate_warnings,
get_pipeline_status,
import_daily,
@ -32,6 +34,7 @@ BASE_DIR = Path(__file__).resolve().parents[2]
PROJECT_DIR = BASE_DIR.parent
FRONTEND_DIST_DIR = PROJECT_DIR / "frontend" / "dist"
FRONTEND_ASSETS_DIR = FRONTEND_DIST_DIR / "assets"
logger = logging.getLogger(__name__)
app = FastAPI(title="lhbfx API", version="0.1.0")
@ -47,6 +50,15 @@ if FRONTEND_ASSETS_DIR.exists():
app.mount("/assets", StaticFiles(directory=str(FRONTEND_ASSETS_DIR)), name="assets")
@app.on_event("startup")
def startup_check_recent_data() -> None:
try:
result = check_and_repair_recent_data()
logger.info("startup data check result: %s", result)
except Exception:
logger.exception("startup data check failed")
@app.get("/api/summary")
def api_summary():
return fetch_summary()

View File

@ -3,8 +3,9 @@ from __future__ import annotations
import json
import time
from dataclasses import dataclass
from datetime import date, timedelta
from datetime import date, datetime, timedelta
from typing import Any
from zoneinfo import ZoneInfo
from .config import AppConfig, load_config
from .db import db_cursor
@ -22,6 +23,9 @@ class ImportResult:
detail_count: int
SHANGHAI_TZ = ZoneInfo("Asia/Shanghai")
def build_seat_matchers(config: AppConfig) -> list[tuple[str, str, str]]:
matchers: list[tuple[str, str, str]] = []
for trader in config.traders:
@ -228,6 +232,99 @@ def build_schedule(year: int, start: str | None = None, end: str | None = None)
return dates
def get_latest_imported_trade_date(*, config: AppConfig | None = None) -> str | None:
cfg = config or load_config()
with db_cursor(config=cfg) as (_, cursor):
cursor.execute("SELECT MAX(trade_date) AS latest_trade_date FROM lhb_overview")
row = cursor.fetchone()
latest_trade_date = row["latest_trade_date"] if row else None
if latest_trade_date is None:
return None
if hasattr(latest_trade_date, "isoformat"):
return latest_trade_date.isoformat()
return str(latest_trade_date)
def check_and_repair_recent_data(
*,
config: AppConfig | None = None,
today: str | None = None,
max_backfill_days: int = 7,
) -> dict[str, Any]:
cfg = config or load_config()
today_date = date.fromisoformat(today) if today else datetime.now(SHANGHAI_TZ).date()
today_text = today_date.isoformat()
if today_date.weekday() >= 5:
return {
"checked": True,
"skipped": True,
"reason": "weekend",
"today": today_text,
"latest_trade_date": get_latest_imported_trade_date(config=cfg),
"imported": [],
"failed": [],
}
latest_trade_date = get_latest_imported_trade_date(config=cfg)
if latest_trade_date and latest_trade_date >= today_text:
return {
"checked": True,
"skipped": True,
"reason": "up_to_date",
"today": today_text,
"latest_trade_date": latest_trade_date,
"imported": [],
"failed": [],
}
start_date = today_date - timedelta(days=max_backfill_days)
if latest_trade_date:
latest_date = date.fromisoformat(latest_trade_date)
start_date = max(start_date, latest_date + timedelta(days=1))
imported: list[dict[str, Any]] = []
failed: list[dict[str, Any]] = []
current = start_date
while current <= today_date:
if current.weekday() >= 5:
current += timedelta(days=1)
continue
trade_date = current.isoformat()
try:
result = import_daily(trade_date, config=cfg)
if result.overview_count > 0:
imported.append(
{
"trade_date": result.trade_date,
"overview_count": result.overview_count,
"detail_count": result.detail_count,
}
)
except Exception as exc:
failed.append({"trade_date": trade_date, "error": repr(exc)})
current += timedelta(days=1)
warning_result = None
rematch_result = None
if imported:
rematch_result = rematch_traders(config=cfg)
warning_result = generate_warnings(config=cfg)
return {
"checked": True,
"skipped": False,
"today": today_text,
"latest_trade_date": latest_trade_date,
"imported": imported,
"failed": failed,
"rematch_result": rematch_result,
"warning_result": warning_result,
}
def import_range(
*,
year: int | None = None,