Files
zjjk/backend/app/services/alert_engine.py

121 lines
5.1 KiB
Python
Raw Permalink Normal View History

2026-03-20 21:47:30 +08:00
from __future__ import annotations
from datetime import datetime, timedelta
from typing import Any
from zoneinfo import ZoneInfo
from app.repositories.monitoring_repository import MonitoringRepository
from app.services.alert_service import AlertService
class AlertEngine:
def __init__(self) -> None:
self.repository = MonitoringRepository()
self.tz = ZoneInfo("Asia/Shanghai")
self.alert_service = AlertService()
def evaluate(self, snapshot: dict) -> list[dict]:
trade_date = snapshot["trade_date"]
state = self._load_state(trade_date)
config = self.repository.get_system_config()
results: list[dict] = []
results.extend(self._evaluate_threshold_break(snapshot, config, state))
results.extend(self._evaluate_five_minute(snapshot, config, state))
self._save_state(trade_date, state)
return results
def _load_state(self, trade_date: str) -> dict[str, Any]:
payload = self.repository.get_alert_state(trade_date)
if not payload:
return {
"trade_date": trade_date,
"thresholds_triggered": [],
"last_five_minute_alert_at": None,
}
payload.setdefault("trade_date", trade_date)
payload.setdefault("thresholds_triggered", [])
payload.setdefault("last_five_minute_alert_at", None)
return payload
def _save_state(self, trade_date: str, state: dict[str, Any]) -> None:
payload = {
"trade_date": trade_date,
"thresholds_triggered": sorted(set(state.get("thresholds_triggered", []))),
"last_five_minute_alert_at": state.get("last_five_minute_alert_at"),
}
self.repository.save_alert_state(trade_date, payload)
def _evaluate_threshold_break(
self, snapshot: dict, config: dict, state: dict[str, Any]
) -> list[dict]:
total = snapshot.get("total_net_inflow")
if total is None:
return []
threshold_step = float(config.get("threshold_step_hkd_billion", 40))
if threshold_step <= 0 or abs(total) < threshold_step:
return []
results: list[dict] = []
triggered_values = {float(value) for value in state.get("thresholds_triggered", [])}
direction = 1 if total > 0 else -1
current_step = int(abs(total) // threshold_step)
for step in range(1, current_step + 1):
threshold_value = direction * step * threshold_step
if threshold_value in triggered_values:
continue
flow_label = "净流入" if threshold_value > 0 else "净流出"
threshold_abs = abs(threshold_value)
record = self.alert_service.send_snapshot_alert(
snapshot=snapshot,
rule_code="threshold_break",
subject=f"[南向资金监控] {snapshot['trade_date']} 南向{flow_label}突破 {threshold_abs:.0f} 亿港元",
summary=f"当前累计{flow_label} {AlertService._format_amount(abs(total))} 亿港元,突破 {threshold_abs:.0f} 亿港元档位。",
description=f"累计{flow_label}突破 {threshold_abs:.0f} 亿港元。",
trigger_value=total,
body_note=f"当前南向资金累计{'净流入' if total > 0 else '净流出'} {AlertService._format_amount(abs(total))} 亿港元,已突破 {threshold_abs:.0f} 亿港元。",
)
results.append(record)
triggered_values.add(threshold_value)
state["thresholds_triggered"] = sorted(triggered_values)
return results
def _evaluate_five_minute(
self, snapshot: dict, config: dict, state: dict[str, Any]
) -> list[dict]:
change = snapshot.get("five_min_change")
if change is None:
return []
threshold = float(config.get("five_minute_flow_alert_hkd_billion", 15))
if threshold <= 0 or abs(change) < threshold:
return []
now = datetime.now(self.tz)
last_trigger_at = state.get("last_five_minute_alert_at")
cooldown_minutes = int(config.get("five_minute_cooldown_minutes", 5))
if last_trigger_at:
last_at = datetime.fromisoformat(last_trigger_at)
if now - last_at < timedelta(minutes=cooldown_minutes):
return []
direction = "流入" if change > 0 else "流出"
record = self.alert_service.send_snapshot_alert(
snapshot=snapshot,
rule_code="five_minute_flow",
subject=f"[南向资金监控] {snapshot['trade_date']} 5分钟快速{direction} {abs(change):.4f} 亿港元",
summary=f"5分钟净变化 {abs(change):.4f} 亿港元,超过阈值 {threshold:.4f} 亿港元。",
description=f"5分钟净变化{direction} {change:.4f} 亿港元。",
trigger_value=change,
body_note=f"5分钟净变化{direction} {change:.4f} 亿港元,阈值为 {threshold:.4f} 亿港元。",
)
state["last_five_minute_alert_at"] = now.isoformat(timespec="seconds")
return [record]
alert_engine = AlertEngine()