Files
zjjk/backend/app/services/alert_service.py
2026-03-20 21:47:30 +08:00

196 lines
7.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
from datetime import datetime
from uuid import uuid4
from zoneinfo import ZoneInfo
from app.repositories.monitoring_repository import MonitoringRepository
from app.services.email_notification_service import email_notification_service
class AlertService:
def __init__(self) -> None:
self.repository = MonitoringRepository()
self.tz = ZoneInfo("Asia/Shanghai")
@staticmethod
def _format_amount(value: float | None) -> str:
if value is None:
return "-"
return f"{value:.4f}"
@staticmethod
def _format_signed_amount(value: float | None) -> str:
if value is None:
return "-"
return f"{value:+.4f}"
@staticmethod
def _format_points(value: float | None) -> str:
if value is None:
return "-"
return f"{value:.2f}"
@staticmethod
def _build_benchmark_lookup(snapshot: dict) -> dict[str, float | None]:
lookup: dict[str, float | None] = {}
for item in snapshot.get("benchmark_series", []) or []:
points = item.get("points", []) or []
lookup[item.get("label", "")] = points[-1].get("value") if points else None
return lookup
def _build_enhanced_body(
self,
*,
snapshot: dict,
title: str,
summary: str,
note: str | None = None,
operation_advice: str | None = None,
market_context: list[str] | None = None,
) -> str:
benchmarks = self._build_benchmark_lookup(snapshot)
lines = [
title,
"",
f"交易日期: {snapshot.get('trade_date')}",
f"快照时间: {snapshot.get('snapshot_time') or snapshot.get('updated_at') or '-'}",
f"南向总净流入: {self._format_signed_amount(snapshot.get('total_net_inflow'))} 亿港元",
f"1分钟净变化: {self._format_signed_amount(snapshot.get('one_min_change'))} 亿港元",
f"5分钟净变化: {self._format_signed_amount(snapshot.get('five_min_change'))} 亿港元",
f"恒生指数: {self._format_points(benchmarks.get('恒生指数'))}",
f"恒生科技指数: {self._format_points(benchmarks.get('恒生科技指数'))}",
"",
"摘要",
summary,
]
if market_context:
lines.extend(["", "市场要点"])
lines.extend([f"{index}. {item}" for index, item in enumerate(market_context, start=1)])
if operation_advice:
lines.extend(["", "操作建议", operation_advice])
if note:
lines.extend(["", "备注", note])
return "\n".join(lines)
def _send_email(self, subject: str, body: str) -> None:
config = self.repository.get_system_config()
email_notification_service.send(
smtp_host=config.get("smtp_host", ""),
smtp_port=int(config.get("smtp_port", 465)),
smtp_username=config.get("smtp_username", ""),
smtp_password=config.get("smtp_password", ""),
sender_email=config.get("sender_email", ""),
recipients=config.get("recipients", []),
subject=subject,
text_body=body,
)
def _build_record(
self,
*,
snapshot: dict,
rule_code: str,
subject: str,
summary: str,
description: str,
trigger_value: float | None,
) -> dict:
return {
"id": f"push-{uuid4().hex[:12]}",
"triggered_at": datetime.now(self.tz).isoformat(timespec="seconds"),
"push_type": "email",
"rule_code": rule_code,
"trigger_value_hkd_billion": trigger_value,
"description": description,
"email_subject": subject,
"email_summary": summary,
"status": "pending",
"error_message": None,
}
def send_snapshot_alert(
self,
*,
snapshot: dict,
rule_code: str,
subject: str,
summary: str,
description: str,
trigger_value: float | None,
body_note: str | None = None,
body_text: str | None = None,
operation_advice: str | None = None,
market_context: list[str] | None = None,
) -> dict:
body = (
body_text
if body_text is not None
else self._build_enhanced_body(
snapshot=snapshot,
title="南向资金监控自动告警",
summary=summary,
note=body_note,
operation_advice=operation_advice,
market_context=market_context,
)
)
record = self._build_record(
snapshot=snapshot,
rule_code=rule_code,
subject=subject,
summary=summary,
description=description,
trigger_value=trigger_value,
)
try:
self._send_email(subject=subject, body=body)
record["status"] = "sent"
except Exception as exc:
record["status"] = "failed"
record["error_message"] = str(exc)
self.repository.append_push_record(record)
return record
def send_test_alert(self) -> dict:
snapshot = self.repository.get_latest_snapshot()
history = self.repository.get_history()
if not snapshot:
raise ValueError("未找到可用于测试的快照数据")
if snapshot.get("total_net_inflow") is None:
raise ValueError("快照缺少 total_net_inflow无法发送测试告警")
total_net_inflow = snapshot.get("total_net_inflow")
five_min_change = snapshot.get("five_min_change")
cumulative = history.get("summary", {}).get("cumulative_net_inflow_hkd_billion")
summary = (
f"{snapshot.get('trade_date')} 南向资金当前净流入 {self._format_signed_amount(total_net_inflow)} 亿港元,"
f"5分钟净变化 {self._format_signed_amount(five_min_change)} 亿港元,"
f"统计区间累计 {self._format_amount(cumulative)} 亿港元。"
)
body = self._build_enhanced_body(
snapshot=snapshot,
title="南向资金监控提醒",
summary=summary,
market_context=[
"南向资金尾盘仍以净流出为主。",
"恒生科技指数仍处在弱修复阶段。",
],
operation_advice="南向资金仍偏弱,短线先控制仓位,等待南向重新转正后再考虑分批回补恒生科技方向。",
)
return self.send_snapshot_alert(
snapshot=snapshot,
rule_code="manual_test_alert",
subject=f"[南向资金监控] {snapshot.get('trade_date')} 港股操作建议",
summary=summary,
description="用于发送简版正式样式邮件。",
trigger_value=total_net_inflow,
body_text=body,
)
alert_service = AlertService()