from datetime import datetime from zoneinfo import ZoneInfo from app.api.schemas import ( BenchmarkTimelinePoint, BenchmarkTimelineSeries, HistoryResponse, HistorySummary, MetaResponse, OverviewResponse, OverviewSnapshot, PushRecord, PushRecordsResponse, RecentTradeDay, RuleItem, RulesResponse, SourceDiagnosticsResponse, StatPoint, TimelinePoint, ValueWithStatus, ) from app.core.config import HISTORY_START_DATE from app.repositories.monitoring_repository import MonitoringRepository from app.services.market_clock import get_market_state class MonitoringService: def __init__(self) -> None: self.repository = MonitoringRepository() @staticmethod def _empty_snapshot(trade_date: str, source_name: str) -> dict: return { "trade_date": trade_date, "snapshot_time": None, "market_state": get_market_state(), "total_net_inflow": None, "cumulative_net_inflow": None, "shanghai_net_inflow": None, "shenzhen_net_inflow": None, "buy_amount": None, "sell_amount": None, "net_buy_amount": None, "one_min_change": None, "five_min_change": None, "precision": "unavailable", "source_name": source_name, "source_url": None, "updated_at": None, "unavailable_reason": "今天的实时快照尚未同步成功,系统不会继续展示昨天的数据。", "threshold_progress": 0.0, "next_threshold_hkd_billion": 50, "minute_timeline": [], "benchmark_series": [], } def get_meta(self) -> MetaResponse: config = self.repository.get_system_config() return MetaResponse( product_name=config["product_name"], version="0.1.0", timezone=config["timezone"], market_state=get_market_state(), current_trade_date=datetime.now(ZoneInfo(config["timezone"])).date().isoformat(), source_name=config["source_name"], source_strategy=config["source_strategy"], note="当前版本已接入真实数据同步链路,前端展示结果来自已持久化的 JSON 数据。", ) def get_overview(self) -> OverviewResponse: config = self.repository.get_system_config() current_trade_date = datetime.now(ZoneInfo(config["timezone"])).date().isoformat() payload = self.repository.get_snapshot_by_trade_date(current_trade_date) if not payload: payload = self._empty_snapshot(current_trade_date, config["source_name"]) precision = payload.get("precision", "unavailable") def value(label: str, key: str) -> ValueWithStatus: return ValueWithStatus( amount_hkd_billion=payload.get(key), precision=precision, label=label, ) snapshot = OverviewSnapshot( trade_date=payload["trade_date"], snapshot_time=payload.get("snapshot_time"), market_state=payload.get("market_state", get_market_state()), total_net_inflow=value("当日总净流入", "total_net_inflow"), cumulative_net_inflow=value("当日累计净流入", "cumulative_net_inflow"), shanghai_net_inflow=value("港股通(沪)净流入", "shanghai_net_inflow"), shenzhen_net_inflow=value("港股通(深)净流入", "shenzhen_net_inflow"), buy_amount=value("买入额", "buy_amount"), sell_amount=value("卖出额", "sell_amount"), net_buy_amount=value("净买额", "net_buy_amount"), one_min_change=value("1 分钟变化", "one_min_change"), five_min_change=value("5 分钟变化", "five_min_change"), threshold_progress=payload.get("threshold_progress", 0), next_threshold_hkd_billion=payload.get("next_threshold_hkd_billion", 50), source_name=payload.get("source_name", "东方财富"), source_url=payload.get("source_url"), updated_at=payload.get("updated_at"), unavailable_reason=payload.get("unavailable_reason"), ) minute_timeline = [TimelinePoint(**item) for item in payload.get("minute_timeline", [])] benchmark_series = [ BenchmarkTimelineSeries( key=item["key"], label=item["label"], unit=item["unit"], detail_url=item.get("detail_url"), points=[BenchmarkTimelinePoint(**point) for point in item.get("points", [])], ) for item in payload.get("benchmark_series", []) ] recent_push_records = [PushRecord(**item) for item in self.repository.get_push_records().get("records", [])[:5]] return OverviewResponse( snapshot=snapshot, minute_timeline=minute_timeline, benchmark_series=benchmark_series, recent_push_records=recent_push_records, ) def get_history(self) -> HistoryResponse: payload = self.repository.get_history() summary = HistorySummary(**payload["summary"]) return HistoryResponse( start_date=payload.get("start_date", HISTORY_START_DATE), daily=[StatPoint(**item) for item in payload.get("daily", [])], weekly=[StatPoint(**item) for item in payload.get("weekly", [])], monthly=[StatPoint(**item) for item in payload.get("monthly", [])], cumulative=[StatPoint(**item) for item in payload.get("cumulative", [])], benchmark_history={ key: [StatPoint(**item) for item in value] for key, value in payload.get("benchmark_history", {}).items() }, recent_trade_days=[RecentTradeDay(**item) for item in payload.get("recent_trade_days", [])], summary=summary, ) def get_push_records(self) -> PushRecordsResponse: records = [PushRecord(**item) for item in self.repository.get_push_records().get("records", [])] return PushRecordsResponse(records=records) def get_rules(self) -> RulesResponse: config = self.repository.get_system_config() items = [ RuleItem( key="realtime_collection_interval_seconds", label="实时采集间隔", value=f'{config["realtime_collection_interval_seconds"]} 秒', description="分钟级快照拉取周期。", ), RuleItem( key="history_backfill_start_date", label="历史回补起始日", value=config["history_backfill_start_date"], description="历史统计的回补起点。", ), RuleItem( key="threshold_step_hkd_billion", label="阈值突破档位", value=f'{config["threshold_step_hkd_billion"]} 亿港元', description="累计净流入按该步长触发突破提醒。", ), RuleItem( key="five_minute_flow_alert_hkd_billion", label="5 分钟异动阈值", value=f'{config["five_minute_flow_alert_hkd_billion"]} 亿港元', description="5 分钟净变化绝对值超过该阈值触发告警。", ), RuleItem( key="email_enabled", label="邮件开关", value="开启" if config["email_enabled"] else "关闭", description="是否发送邮件推送。", ), RuleItem( key="sender_email", label="发件邮箱", value=config["sender_email"], description="SMTP 发件人邮箱。", ), RuleItem( key="smtp", label="SMTP 地址与端口", value=f'{config["smtp_host"]}:{config["smtp_port"]}', description="邮件服务配置。", ), RuleItem( key="recipients", label="收件人列表", value=", ".join(config["recipients"]), description="告警接收对象。", ), RuleItem( key="source_name", label="当前主数据源", value=config["source_name"], description="正式口径唯一主源。", ), ] return RulesResponse(items=items) def get_source_diagnostics(self) -> SourceDiagnosticsResponse: return SourceDiagnosticsResponse(**self.repository.get_source_diagnostics()) monitoring_service = MonitoringService()