Files
zjjk/backend/app/services/monitoring_service.py

210 lines
8.7 KiB
Python
Raw Permalink Normal View History

2026-03-20 21:47:30 +08:00
from datetime import datetime
from zoneinfo import ZoneInfo
from app.api.schemas import (
BenchmarkTimelinePoint,
BenchmarkTimelineSeries,
HistoryResponse,
HistorySummary,
MetaResponse,
OverviewResponse,
OverviewSnapshot,
PushRecord,
PushRecordsResponse,
RecentTradeDay,
RuleItem,
RulesResponse,
SourceDiagnosticsResponse,
StatPoint,
TimelinePoint,
ValueWithStatus,
)
from app.core.config import HISTORY_START_DATE
from app.repositories.monitoring_repository import MonitoringRepository
from app.services.market_clock import get_market_state
class MonitoringService:
def __init__(self) -> None:
self.repository = MonitoringRepository()
@staticmethod
def _empty_snapshot(trade_date: str, source_name: str) -> dict:
return {
"trade_date": trade_date,
"snapshot_time": None,
"market_state": get_market_state(),
"total_net_inflow": None,
"cumulative_net_inflow": None,
"shanghai_net_inflow": None,
"shenzhen_net_inflow": None,
"buy_amount": None,
"sell_amount": None,
"net_buy_amount": None,
"one_min_change": None,
"five_min_change": None,
"precision": "unavailable",
"source_name": source_name,
"source_url": None,
"updated_at": None,
"unavailable_reason": "今天的实时快照尚未同步成功,系统不会继续展示昨天的数据。",
"threshold_progress": 0.0,
"next_threshold_hkd_billion": 50,
"minute_timeline": [],
"benchmark_series": [],
}
def get_meta(self) -> MetaResponse:
config = self.repository.get_system_config()
return MetaResponse(
product_name=config["product_name"],
version="0.1.0",
timezone=config["timezone"],
market_state=get_market_state(),
current_trade_date=datetime.now(ZoneInfo(config["timezone"])).date().isoformat(),
source_name=config["source_name"],
source_strategy=config["source_strategy"],
note="当前版本已接入真实数据同步链路,前端展示结果来自已持久化的 JSON 数据。",
)
def get_overview(self) -> OverviewResponse:
config = self.repository.get_system_config()
current_trade_date = datetime.now(ZoneInfo(config["timezone"])).date().isoformat()
payload = self.repository.get_snapshot_by_trade_date(current_trade_date)
if not payload:
payload = self._empty_snapshot(current_trade_date, config["source_name"])
precision = payload.get("precision", "unavailable")
def value(label: str, key: str) -> ValueWithStatus:
return ValueWithStatus(
amount_hkd_billion=payload.get(key),
precision=precision,
label=label,
)
snapshot = OverviewSnapshot(
trade_date=payload["trade_date"],
snapshot_time=payload.get("snapshot_time"),
market_state=payload.get("market_state", get_market_state()),
total_net_inflow=value("当日总净流入", "total_net_inflow"),
cumulative_net_inflow=value("当日累计净流入", "cumulative_net_inflow"),
shanghai_net_inflow=value("港股通(沪)净流入", "shanghai_net_inflow"),
shenzhen_net_inflow=value("港股通(深)净流入", "shenzhen_net_inflow"),
buy_amount=value("买入额", "buy_amount"),
sell_amount=value("卖出额", "sell_amount"),
net_buy_amount=value("净买额", "net_buy_amount"),
one_min_change=value("1 分钟变化", "one_min_change"),
five_min_change=value("5 分钟变化", "five_min_change"),
threshold_progress=payload.get("threshold_progress", 0),
next_threshold_hkd_billion=payload.get("next_threshold_hkd_billion", 50),
source_name=payload.get("source_name", "东方财富"),
source_url=payload.get("source_url"),
updated_at=payload.get("updated_at"),
unavailable_reason=payload.get("unavailable_reason"),
)
minute_timeline = [TimelinePoint(**item) for item in payload.get("minute_timeline", [])]
benchmark_series = [
BenchmarkTimelineSeries(
key=item["key"],
label=item["label"],
unit=item["unit"],
detail_url=item.get("detail_url"),
points=[BenchmarkTimelinePoint(**point) for point in item.get("points", [])],
)
for item in payload.get("benchmark_series", [])
]
recent_push_records = [PushRecord(**item) for item in self.repository.get_push_records().get("records", [])[:5]]
return OverviewResponse(
snapshot=snapshot,
minute_timeline=minute_timeline,
benchmark_series=benchmark_series,
recent_push_records=recent_push_records,
)
def get_history(self) -> HistoryResponse:
payload = self.repository.get_history()
summary = HistorySummary(**payload["summary"])
return HistoryResponse(
start_date=payload.get("start_date", HISTORY_START_DATE),
daily=[StatPoint(**item) for item in payload.get("daily", [])],
weekly=[StatPoint(**item) for item in payload.get("weekly", [])],
monthly=[StatPoint(**item) for item in payload.get("monthly", [])],
cumulative=[StatPoint(**item) for item in payload.get("cumulative", [])],
benchmark_history={
key: [StatPoint(**item) for item in value]
for key, value in payload.get("benchmark_history", {}).items()
},
recent_trade_days=[RecentTradeDay(**item) for item in payload.get("recent_trade_days", [])],
summary=summary,
)
def get_push_records(self) -> PushRecordsResponse:
records = [PushRecord(**item) for item in self.repository.get_push_records().get("records", [])]
return PushRecordsResponse(records=records)
def get_rules(self) -> RulesResponse:
config = self.repository.get_system_config()
items = [
RuleItem(
key="realtime_collection_interval_seconds",
label="实时采集间隔",
value=f'{config["realtime_collection_interval_seconds"]}',
description="分钟级快照拉取周期。",
),
RuleItem(
key="history_backfill_start_date",
label="历史回补起始日",
value=config["history_backfill_start_date"],
description="历史统计的回补起点。",
),
RuleItem(
key="threshold_step_hkd_billion",
label="阈值突破档位",
value=f'{config["threshold_step_hkd_billion"]} 亿港元',
description="累计净流入按该步长触发突破提醒。",
),
RuleItem(
key="five_minute_flow_alert_hkd_billion",
label="5 分钟异动阈值",
value=f'{config["five_minute_flow_alert_hkd_billion"]} 亿港元',
description="5 分钟净变化绝对值超过该阈值触发告警。",
),
RuleItem(
key="email_enabled",
label="邮件开关",
value="开启" if config["email_enabled"] else "关闭",
description="是否发送邮件推送。",
),
RuleItem(
key="sender_email",
label="发件邮箱",
value=config["sender_email"],
description="SMTP 发件人邮箱。",
),
RuleItem(
key="smtp",
label="SMTP 地址与端口",
value=f'{config["smtp_host"]}:{config["smtp_port"]}',
description="邮件服务配置。",
),
RuleItem(
key="recipients",
label="收件人列表",
value=", ".join(config["recipients"]),
description="告警接收对象。",
),
RuleItem(
key="source_name",
label="当前主数据源",
value=config["source_name"],
description="正式口径唯一主源。",
),
]
return RulesResponse(items=items)
def get_source_diagnostics(self) -> SourceDiagnosticsResponse:
return SourceDiagnosticsResponse(**self.repository.get_source_diagnostics())
monitoring_service = MonitoringService()