196 lines
7.0 KiB
Python
196 lines
7.0 KiB
Python
|
|
from __future__ import annotations
|
|||
|
|
|
|||
|
|
from datetime import datetime
|
|||
|
|
from uuid import uuid4
|
|||
|
|
from zoneinfo import ZoneInfo
|
|||
|
|
|
|||
|
|
from app.repositories.monitoring_repository import MonitoringRepository
|
|||
|
|
from app.services.email_notification_service import email_notification_service
|
|||
|
|
|
|||
|
|
|
|||
|
|
class AlertService:
|
|||
|
|
def __init__(self) -> None:
|
|||
|
|
self.repository = MonitoringRepository()
|
|||
|
|
self.tz = ZoneInfo("Asia/Shanghai")
|
|||
|
|
|
|||
|
|
@staticmethod
|
|||
|
|
def _format_amount(value: float | None) -> str:
|
|||
|
|
if value is None:
|
|||
|
|
return "-"
|
|||
|
|
return f"{value:.4f}"
|
|||
|
|
|
|||
|
|
@staticmethod
|
|||
|
|
def _format_signed_amount(value: float | None) -> str:
|
|||
|
|
if value is None:
|
|||
|
|
return "-"
|
|||
|
|
return f"{value:+.4f}"
|
|||
|
|
|
|||
|
|
@staticmethod
|
|||
|
|
def _format_points(value: float | None) -> str:
|
|||
|
|
if value is None:
|
|||
|
|
return "-"
|
|||
|
|
return f"{value:.2f}"
|
|||
|
|
|
|||
|
|
@staticmethod
|
|||
|
|
def _build_benchmark_lookup(snapshot: dict) -> dict[str, float | None]:
|
|||
|
|
lookup: dict[str, float | None] = {}
|
|||
|
|
for item in snapshot.get("benchmark_series", []) or []:
|
|||
|
|
points = item.get("points", []) or []
|
|||
|
|
lookup[item.get("label", "")] = points[-1].get("value") if points else None
|
|||
|
|
return lookup
|
|||
|
|
|
|||
|
|
def _build_enhanced_body(
|
|||
|
|
self,
|
|||
|
|
*,
|
|||
|
|
snapshot: dict,
|
|||
|
|
title: str,
|
|||
|
|
summary: str,
|
|||
|
|
note: str | None = None,
|
|||
|
|
operation_advice: str | None = None,
|
|||
|
|
market_context: list[str] | None = None,
|
|||
|
|
) -> str:
|
|||
|
|
benchmarks = self._build_benchmark_lookup(snapshot)
|
|||
|
|
lines = [
|
|||
|
|
title,
|
|||
|
|
"",
|
|||
|
|
f"交易日期: {snapshot.get('trade_date')}",
|
|||
|
|
f"快照时间: {snapshot.get('snapshot_time') or snapshot.get('updated_at') or '-'}",
|
|||
|
|
f"南向总净流入: {self._format_signed_amount(snapshot.get('total_net_inflow'))} 亿港元",
|
|||
|
|
f"1分钟净变化: {self._format_signed_amount(snapshot.get('one_min_change'))} 亿港元",
|
|||
|
|
f"5分钟净变化: {self._format_signed_amount(snapshot.get('five_min_change'))} 亿港元",
|
|||
|
|
f"恒生指数: {self._format_points(benchmarks.get('恒生指数'))}",
|
|||
|
|
f"恒生科技指数: {self._format_points(benchmarks.get('恒生科技指数'))}",
|
|||
|
|
"",
|
|||
|
|
"摘要",
|
|||
|
|
summary,
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
if market_context:
|
|||
|
|
lines.extend(["", "市场要点"])
|
|||
|
|
lines.extend([f"{index}. {item}" for index, item in enumerate(market_context, start=1)])
|
|||
|
|
|
|||
|
|
if operation_advice:
|
|||
|
|
lines.extend(["", "操作建议", operation_advice])
|
|||
|
|
|
|||
|
|
if note:
|
|||
|
|
lines.extend(["", "备注", note])
|
|||
|
|
|
|||
|
|
return "\n".join(lines)
|
|||
|
|
|
|||
|
|
def _send_email(self, subject: str, body: str) -> None:
|
|||
|
|
config = self.repository.get_system_config()
|
|||
|
|
email_notification_service.send(
|
|||
|
|
smtp_host=config.get("smtp_host", ""),
|
|||
|
|
smtp_port=int(config.get("smtp_port", 465)),
|
|||
|
|
smtp_username=config.get("smtp_username", ""),
|
|||
|
|
smtp_password=config.get("smtp_password", ""),
|
|||
|
|
sender_email=config.get("sender_email", ""),
|
|||
|
|
recipients=config.get("recipients", []),
|
|||
|
|
subject=subject,
|
|||
|
|
text_body=body,
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
def _build_record(
|
|||
|
|
self,
|
|||
|
|
*,
|
|||
|
|
snapshot: dict,
|
|||
|
|
rule_code: str,
|
|||
|
|
subject: str,
|
|||
|
|
summary: str,
|
|||
|
|
description: str,
|
|||
|
|
trigger_value: float | None,
|
|||
|
|
) -> dict:
|
|||
|
|
return {
|
|||
|
|
"id": f"push-{uuid4().hex[:12]}",
|
|||
|
|
"triggered_at": datetime.now(self.tz).isoformat(timespec="seconds"),
|
|||
|
|
"push_type": "email",
|
|||
|
|
"rule_code": rule_code,
|
|||
|
|
"trigger_value_hkd_billion": trigger_value,
|
|||
|
|
"description": description,
|
|||
|
|
"email_subject": subject,
|
|||
|
|
"email_summary": summary,
|
|||
|
|
"status": "pending",
|
|||
|
|
"error_message": None,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
def send_snapshot_alert(
|
|||
|
|
self,
|
|||
|
|
*,
|
|||
|
|
snapshot: dict,
|
|||
|
|
rule_code: str,
|
|||
|
|
subject: str,
|
|||
|
|
summary: str,
|
|||
|
|
description: str,
|
|||
|
|
trigger_value: float | None,
|
|||
|
|
body_note: str | None = None,
|
|||
|
|
body_text: str | None = None,
|
|||
|
|
operation_advice: str | None = None,
|
|||
|
|
market_context: list[str] | None = None,
|
|||
|
|
) -> dict:
|
|||
|
|
body = (
|
|||
|
|
body_text
|
|||
|
|
if body_text is not None
|
|||
|
|
else self._build_enhanced_body(
|
|||
|
|
snapshot=snapshot,
|
|||
|
|
title="南向资金监控自动告警",
|
|||
|
|
summary=summary,
|
|||
|
|
note=body_note,
|
|||
|
|
operation_advice=operation_advice,
|
|||
|
|
market_context=market_context,
|
|||
|
|
)
|
|||
|
|
)
|
|||
|
|
record = self._build_record(
|
|||
|
|
snapshot=snapshot,
|
|||
|
|
rule_code=rule_code,
|
|||
|
|
subject=subject,
|
|||
|
|
summary=summary,
|
|||
|
|
description=description,
|
|||
|
|
trigger_value=trigger_value,
|
|||
|
|
)
|
|||
|
|
try:
|
|||
|
|
self._send_email(subject=subject, body=body)
|
|||
|
|
record["status"] = "sent"
|
|||
|
|
except Exception as exc:
|
|||
|
|
record["status"] = "failed"
|
|||
|
|
record["error_message"] = str(exc)
|
|||
|
|
self.repository.append_push_record(record)
|
|||
|
|
return record
|
|||
|
|
|
|||
|
|
def send_test_alert(self) -> dict:
|
|||
|
|
snapshot = self.repository.get_latest_snapshot()
|
|||
|
|
history = self.repository.get_history()
|
|||
|
|
if not snapshot:
|
|||
|
|
raise ValueError("未找到可用于测试的快照数据")
|
|||
|
|
if snapshot.get("total_net_inflow") is None:
|
|||
|
|
raise ValueError("快照缺少 total_net_inflow,无法发送测试告警")
|
|||
|
|
total_net_inflow = snapshot.get("total_net_inflow")
|
|||
|
|
five_min_change = snapshot.get("five_min_change")
|
|||
|
|
cumulative = history.get("summary", {}).get("cumulative_net_inflow_hkd_billion")
|
|||
|
|
summary = (
|
|||
|
|
f"{snapshot.get('trade_date')} 南向资金当前净流入 {self._format_signed_amount(total_net_inflow)} 亿港元,"
|
|||
|
|
f"5分钟净变化 {self._format_signed_amount(five_min_change)} 亿港元,"
|
|||
|
|
f"统计区间累计 {self._format_amount(cumulative)} 亿港元。"
|
|||
|
|
)
|
|||
|
|
body = self._build_enhanced_body(
|
|||
|
|
snapshot=snapshot,
|
|||
|
|
title="南向资金监控提醒",
|
|||
|
|
summary=summary,
|
|||
|
|
market_context=[
|
|||
|
|
"南向资金尾盘仍以净流出为主。",
|
|||
|
|
"恒生科技指数仍处在弱修复阶段。",
|
|||
|
|
],
|
|||
|
|
operation_advice="南向资金仍偏弱,短线先控制仓位,等待南向重新转正后再考虑分批回补恒生科技方向。",
|
|||
|
|
)
|
|||
|
|
return self.send_snapshot_alert(
|
|||
|
|
snapshot=snapshot,
|
|||
|
|
rule_code="manual_test_alert",
|
|||
|
|
subject=f"[南向资金监控] {snapshot.get('trade_date')} 港股操作建议",
|
|||
|
|
summary=summary,
|
|||
|
|
description="用于发送简版正式样式邮件。",
|
|||
|
|
trigger_value=total_net_inflow,
|
|||
|
|
body_text=body,
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
|
|||
|
|
alert_service = AlertService()
|