from __future__ import annotations from datetime import datetime, timedelta from typing import Any from zoneinfo import ZoneInfo from app.repositories.monitoring_repository import MonitoringRepository from app.services.alert_service import AlertService class AlertEngine: def __init__(self) -> None: self.repository = MonitoringRepository() self.tz = ZoneInfo("Asia/Shanghai") self.alert_service = AlertService() def evaluate(self, snapshot: dict) -> list[dict]: trade_date = snapshot["trade_date"] state = self._load_state(trade_date) config = self.repository.get_system_config() results: list[dict] = [] results.extend(self._evaluate_threshold_break(snapshot, config, state)) results.extend(self._evaluate_five_minute(snapshot, config, state)) self._save_state(trade_date, state) return results def _load_state(self, trade_date: str) -> dict[str, Any]: payload = self.repository.get_alert_state(trade_date) if not payload: return { "trade_date": trade_date, "thresholds_triggered": [], "last_five_minute_alert_at": None, } payload.setdefault("trade_date", trade_date) payload.setdefault("thresholds_triggered", []) payload.setdefault("last_five_minute_alert_at", None) return payload def _save_state(self, trade_date: str, state: dict[str, Any]) -> None: payload = { "trade_date": trade_date, "thresholds_triggered": sorted(set(state.get("thresholds_triggered", []))), "last_five_minute_alert_at": state.get("last_five_minute_alert_at"), } self.repository.save_alert_state(trade_date, payload) def _evaluate_threshold_break( self, snapshot: dict, config: dict, state: dict[str, Any] ) -> list[dict]: total = snapshot.get("total_net_inflow") if total is None: return [] threshold_step = float(config.get("threshold_step_hkd_billion", 40)) if threshold_step <= 0 or abs(total) < threshold_step: return [] results: list[dict] = [] triggered_values = {float(value) for value in state.get("thresholds_triggered", [])} direction = 1 if total > 0 else -1 current_step = int(abs(total) // threshold_step) for step in range(1, current_step + 1): threshold_value = direction * step * threshold_step if threshold_value in triggered_values: continue flow_label = "净流入" if threshold_value > 0 else "净流出" threshold_abs = abs(threshold_value) record = self.alert_service.send_snapshot_alert( snapshot=snapshot, rule_code="threshold_break", subject=f"[南向资金监控] {snapshot['trade_date']} 南向{flow_label}突破 {threshold_abs:.0f} 亿港元", summary=f"当前累计{flow_label} {AlertService._format_amount(abs(total))} 亿港元,突破 {threshold_abs:.0f} 亿港元档位。", description=f"累计{flow_label}突破 {threshold_abs:.0f} 亿港元。", trigger_value=total, body_note=f"当前南向资金累计{'净流入' if total > 0 else '净流出'} {AlertService._format_amount(abs(total))} 亿港元,已突破 {threshold_abs:.0f} 亿港元。", ) results.append(record) triggered_values.add(threshold_value) state["thresholds_triggered"] = sorted(triggered_values) return results def _evaluate_five_minute( self, snapshot: dict, config: dict, state: dict[str, Any] ) -> list[dict]: change = snapshot.get("five_min_change") if change is None: return [] threshold = float(config.get("five_minute_flow_alert_hkd_billion", 15)) if threshold <= 0 or abs(change) < threshold: return [] now = datetime.now(self.tz) last_trigger_at = state.get("last_five_minute_alert_at") cooldown_minutes = int(config.get("five_minute_cooldown_minutes", 5)) if last_trigger_at: last_at = datetime.fromisoformat(last_trigger_at) if now - last_at < timedelta(minutes=cooldown_minutes): return [] direction = "流入" if change > 0 else "流出" record = self.alert_service.send_snapshot_alert( snapshot=snapshot, rule_code="five_minute_flow", subject=f"[南向资金监控] {snapshot['trade_date']} 5分钟快速{direction} {abs(change):.4f} 亿港元", summary=f"5分钟净变化 {abs(change):.4f} 亿港元,超过阈值 {threshold:.4f} 亿港元。", description=f"5分钟净变化{direction} {change:.4f} 亿港元。", trigger_value=change, body_note=f"5分钟净变化{direction} {change:.4f} 亿港元,阈值为 {threshold:.4f} 亿港元。", ) state["last_five_minute_alert_at"] = now.isoformat(timespec="seconds") return [record] alert_engine = AlertEngine()