from __future__ import annotations from datetime import datetime from uuid import uuid4 from zoneinfo import ZoneInfo from app.repositories.monitoring_repository import MonitoringRepository from app.services.email_notification_service import email_notification_service class AlertService: def __init__(self) -> None: self.repository = MonitoringRepository() self.tz = ZoneInfo("Asia/Shanghai") @staticmethod def _format_amount(value: float | None) -> str: if value is None: return "-" return f"{value:.4f}" @staticmethod def _format_signed_amount(value: float | None) -> str: if value is None: return "-" return f"{value:+.4f}" @staticmethod def _format_points(value: float | None) -> str: if value is None: return "-" return f"{value:.2f}" @staticmethod def _build_benchmark_lookup(snapshot: dict) -> dict[str, float | None]: lookup: dict[str, float | None] = {} for item in snapshot.get("benchmark_series", []) or []: points = item.get("points", []) or [] lookup[item.get("label", "")] = points[-1].get("value") if points else None return lookup def _build_enhanced_body( self, *, snapshot: dict, title: str, summary: str, note: str | None = None, operation_advice: str | None = None, market_context: list[str] | None = None, ) -> str: benchmarks = self._build_benchmark_lookup(snapshot) lines = [ title, "", f"交易日期: {snapshot.get('trade_date')}", f"快照时间: {snapshot.get('snapshot_time') or snapshot.get('updated_at') or '-'}", f"南向总净流入: {self._format_signed_amount(snapshot.get('total_net_inflow'))} 亿港元", f"1分钟净变化: {self._format_signed_amount(snapshot.get('one_min_change'))} 亿港元", f"5分钟净变化: {self._format_signed_amount(snapshot.get('five_min_change'))} 亿港元", f"恒生指数: {self._format_points(benchmarks.get('恒生指数'))}", f"恒生科技指数: {self._format_points(benchmarks.get('恒生科技指数'))}", "", "摘要", summary, ] if market_context: lines.extend(["", "市场要点"]) lines.extend([f"{index}. {item}" for index, item in enumerate(market_context, start=1)]) if operation_advice: lines.extend(["", "操作建议", operation_advice]) if note: lines.extend(["", "备注", note]) return "\n".join(lines) def _send_email(self, subject: str, body: str) -> None: config = self.repository.get_system_config() email_notification_service.send( smtp_host=config.get("smtp_host", ""), smtp_port=int(config.get("smtp_port", 465)), smtp_username=config.get("smtp_username", ""), smtp_password=config.get("smtp_password", ""), sender_email=config.get("sender_email", ""), recipients=config.get("recipients", []), subject=subject, text_body=body, ) def _build_record( self, *, snapshot: dict, rule_code: str, subject: str, summary: str, description: str, trigger_value: float | None, ) -> dict: return { "id": f"push-{uuid4().hex[:12]}", "triggered_at": datetime.now(self.tz).isoformat(timespec="seconds"), "push_type": "email", "rule_code": rule_code, "trigger_value_hkd_billion": trigger_value, "description": description, "email_subject": subject, "email_summary": summary, "status": "pending", "error_message": None, } def send_snapshot_alert( self, *, snapshot: dict, rule_code: str, subject: str, summary: str, description: str, trigger_value: float | None, body_note: str | None = None, body_text: str | None = None, operation_advice: str | None = None, market_context: list[str] | None = None, ) -> dict: body = ( body_text if body_text is not None else self._build_enhanced_body( snapshot=snapshot, title="南向资金监控自动告警", summary=summary, note=body_note, operation_advice=operation_advice, market_context=market_context, ) ) record = self._build_record( snapshot=snapshot, rule_code=rule_code, subject=subject, summary=summary, description=description, trigger_value=trigger_value, ) try: self._send_email(subject=subject, body=body) record["status"] = "sent" except Exception as exc: record["status"] = "failed" record["error_message"] = str(exc) self.repository.append_push_record(record) return record def send_test_alert(self) -> dict: snapshot = self.repository.get_latest_snapshot() history = self.repository.get_history() if not snapshot: raise ValueError("未找到可用于测试的快照数据") if snapshot.get("total_net_inflow") is None: raise ValueError("快照缺少 total_net_inflow,无法发送测试告警") total_net_inflow = snapshot.get("total_net_inflow") five_min_change = snapshot.get("five_min_change") cumulative = history.get("summary", {}).get("cumulative_net_inflow_hkd_billion") summary = ( f"{snapshot.get('trade_date')} 南向资金当前净流入 {self._format_signed_amount(total_net_inflow)} 亿港元," f"5分钟净变化 {self._format_signed_amount(five_min_change)} 亿港元," f"统计区间累计 {self._format_amount(cumulative)} 亿港元。" ) body = self._build_enhanced_body( snapshot=snapshot, title="南向资金监控提醒", summary=summary, market_context=[ "南向资金尾盘仍以净流出为主。", "恒生科技指数仍处在弱修复阶段。", ], operation_advice="南向资金仍偏弱,短线先控制仓位,等待南向重新转正后再考虑分批回补恒生科技方向。", ) return self.send_snapshot_alert( snapshot=snapshot, rule_code="manual_test_alert", subject=f"[南向资金监控] {snapshot.get('trade_date')} 港股操作建议", summary=summary, description="用于发送简版正式样式邮件。", trigger_value=total_net_inflow, body_text=body, ) alert_service = AlertService()