feat: add reporting workflow and optimize dashboard loading

This commit is contained in:
wanghep
2026-04-18 14:46:24 +08:00
parent d661b801df
commit 8c9117ca4c
18 changed files with 834 additions and 83 deletions

View File

@ -35,6 +35,15 @@ monitoring:
- 5
turnover_warning_threshold: 0.30
mail:
sender_email: "your_email@example.com"
smtp_username: "your_email@example.com"
smtp_password: "your_smtp_password"
smtp_host: "smtp.example.com"
smtp_port: 465
recipients:
- "recipient@example.com"
traders:
- name: "章盟主"
alias: "章建平"

View File

@ -11,6 +11,7 @@ dependencies = [
"fastapi>=0.116.1",
"uvicorn>=0.35.0",
"jinja2>=3.1.6",
"reportlab>=4.4.0",
]
[tool.setuptools]

View File

@ -0,0 +1,56 @@
from __future__ import annotations
import argparse
from pathlib import Path
from _bootstrap import add_src_to_path
add_src_to_path()
from lhbfx.config import load_config
from lhbfx.mailer import send_email
from lhbfx.pdf_export import generate_daily_report_pdf
from lhbfx.reporting import (
build_daily_report,
build_email_body,
default_report_output_path,
get_latest_trade_date,
)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Generate and optionally send lhbfx daily report")
parser.add_argument("--trade-date", help="Trade date in YYYY-MM-DD format")
parser.add_argument("--send", action="store_true", help="Send email after generating report")
return parser.parse_args()
def main() -> None:
args = parse_args()
config = load_config()
trade_date = args.trade_date or get_latest_trade_date(config)
if not trade_date:
raise RuntimeError("No trade date available in database")
report = build_daily_report(config=config, trade_date=trade_date)
pdf_path = default_report_output_path(trade_date)
generate_daily_report_pdf(report, pdf_path)
body_text = build_email_body(report)
print(f"Generated PDF: {pdf_path}")
if args.send:
if config.mail is None:
raise RuntimeError("Mail config is missing")
subject = f"lhbfx 盘后日报 - {trade_date}"
send_email(
mail_config=config.mail,
subject=subject,
body_text=body_text,
attachments=[pdf_path],
)
print(f"Email sent to: {', '.join(config.mail.recipients)}")
if __name__ == "__main__":
main()

View File

@ -59,6 +59,15 @@ def apply_incremental_alters(config: AppConfig) -> None:
cursor.execute(
"ALTER TABLE lhb_detail_seats ADD UNIQUE KEY uniq_lhb_detail_record (trade_date, stock_code, rid, table_title, seat_name)"
)
if not _index_exists(cursor, schema_name, "lhb_detail_seats", "idx_lhb_detail_trader_stock_date"):
cursor.execute(
"ALTER TABLE lhb_detail_seats ADD KEY idx_lhb_detail_trader_stock_date (matched_trader_name, stock_code, trade_date)"
)
if not _index_exists(cursor, schema_name, "warning_events", "idx_warning_events_trader_type_date_code"):
cursor.execute(
"ALTER TABLE warning_events ADD KEY idx_warning_events_trader_type_date_code (trader_name, warning_type, trade_date, stock_code)"
)
def main() -> None:

View File

@ -23,6 +23,16 @@ class DatabaseConfig:
connect_timeout_seconds: int = 10
@dataclass(slots=True)
class MailConfig:
sender_email: str
smtp_username: str
smtp_password: str
smtp_host: str
smtp_port: int
recipients: list[str]
class AppConfig:
def __init__(self, raw: dict[str, Any], path: Path) -> None:
self.raw = raw
@ -55,9 +65,22 @@ class AppConfig:
def data_sources(self) -> dict[str, Any]:
return self.raw.get("data_sources", {})
@property
def mail(self) -> MailConfig | None:
mail = self.raw.get("mail")
if not mail:
return None
return MailConfig(
sender_email=mail["sender_email"],
smtp_username=mail["smtp_username"],
smtp_password=mail["smtp_password"],
smtp_host=mail["smtp_host"],
smtp_port=int(mail["smtp_port"]),
recipients=list(mail.get("recipients", [])),
)
def load_config(path: str | Path | None = None) -> AppConfig:
config_path = Path(path) if path else DEFAULT_CONFIG_PATH
raw = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {}
return AppConfig(raw=raw, path=config_path)

View File

@ -0,0 +1,37 @@
from __future__ import annotations
import mimetypes
import smtplib
from email.message import EmailMessage
from pathlib import Path
from .config import MailConfig
def send_email(
*,
mail_config: MailConfig,
subject: str,
body_text: str,
attachments: list[Path] | None = None,
) -> None:
message = EmailMessage()
message["Subject"] = subject
message["From"] = mail_config.sender_email
message["To"] = ", ".join(mail_config.recipients)
message.set_content(body_text)
for attachment in attachments or []:
mime_type, _ = mimetypes.guess_type(str(attachment))
maintype, subtype = (mime_type or "application/octet-stream").split("/", 1)
with attachment.open("rb") as file_obj:
message.add_attachment(
file_obj.read(),
maintype=maintype,
subtype=subtype,
filename=attachment.name,
)
with smtplib.SMTP_SSL(mail_config.smtp_host, mail_config.smtp_port) as smtp:
smtp.login(mail_config.smtp_username, mail_config.smtp_password)
smtp.send_message(message)

View File

@ -0,0 +1,182 @@
from __future__ import annotations
from pathlib import Path
from reportlab.lib import colors
from reportlab.lib.pagesizes import A4
from reportlab.lib.styles import ParagraphStyle, getSampleStyleSheet
from reportlab.lib.units import mm
from reportlab.pdfbase.cidfonts import UnicodeCIDFont
from reportlab.pdfbase.pdfmetrics import registerFont
from reportlab.pdfbase.ttfonts import TTFont
from reportlab.platypus import Paragraph, SimpleDocTemplate, Spacer, Table, TableStyle
from .reporting import DailyReport, _major_board_label, _sector_label
WINDOWS_FONT_CANDIDATES = [
(r"C:\Windows\Fonts\simhei.ttf", "SimHei"),
(r"C:\Windows\Fonts\simfang.ttf", "SimFang"),
]
def _register_font() -> str:
for font_path, font_name in WINDOWS_FONT_CANDIDATES:
if Path(font_path).exists():
registerFont(TTFont(font_name, font_path))
return font_name
registerFont(UnicodeCIDFont("STSong-Light"))
return "STSong-Light"
FONT_NAME = _register_font()
def _styles():
base = getSampleStyleSheet()
return {
"title": ParagraphStyle(
"ReportTitle",
parent=base["Title"],
fontName=FONT_NAME,
fontSize=18,
leading=24,
),
"heading": ParagraphStyle(
"ReportHeading",
parent=base["Heading2"],
fontName=FONT_NAME,
fontSize=13,
leading=18,
spaceAfter=6,
),
"body": ParagraphStyle(
"ReportBody",
parent=base["BodyText"],
fontName=FONT_NAME,
fontSize=10.5,
leading=15,
),
}
def _build_table(rows: list[list[str]], col_widths: list[float] | None = None) -> Table:
table = Table(rows, repeatRows=1, colWidths=col_widths)
table.setStyle(
TableStyle(
[
("FONTNAME", (0, 0), (-1, -1), FONT_NAME),
("FONTSIZE", (0, 0), (-1, -1), 9),
("BACKGROUND", (0, 0), (-1, 0), colors.HexColor("#d6a85f")),
("TEXTCOLOR", (0, 0), (-1, 0), colors.HexColor("#101721")),
("GRID", (0, 0), (-1, -1), 0.5, colors.HexColor("#303846")),
("ROWBACKGROUNDS", (0, 1), (-1, -1), [colors.HexColor("#f7f7f7"), colors.white]),
("VALIGN", (0, 0), (-1, -1), "TOP"),
("LEFTPADDING", (0, 0), (-1, -1), 6),
("RIGHTPADDING", (0, 0), (-1, -1), 6),
("TOPPADDING", (0, 0), (-1, -1), 5),
("BOTTOMPADDING", (0, 0), (-1, -1), 5),
]
)
)
return table
def generate_daily_report_pdf(report: DailyReport, output_path: Path) -> Path:
styles = _styles()
output_path.parent.mkdir(parents=True, exist_ok=True)
doc = SimpleDocTemplate(
str(output_path),
pagesize=A4,
leftMargin=15 * mm,
rightMargin=15 * mm,
topMargin=15 * mm,
bottomMargin=15 * mm,
)
body_style = ParagraphStyle(
"WrappedBody",
parent=styles["body"],
fontName=FONT_NAME,
fontSize=9,
leading=12,
wordWrap="CJK",
)
story = [
Paragraph(f"lhbfx 盘后日报 - {report.trade_date}", styles["title"]),
Spacer(1, 8),
Paragraph(
f"关注池股票数:{len(report.watchlist_items)} 关注池流水数:{len(report.watch_actions)} 待加入关注候选数:{len(report.candidate_actions)} 预警数:{len(report.warning_items)}",
styles["body"],
),
Spacer(1, 10),
Paragraph("关注池情况", styles["heading"]),
]
watch_rows = [["股票", "游资", "行业名称", "上市板块", "买入(万)", "卖出(万)", "净额(万)", "席位"]]
if report.watch_actions:
for action in report.watch_actions[:20]:
watch_rows.append(
[
f"{action['stock_name']} {action['stock_code']}",
action["trader_name"],
_sector_label(action),
_major_board_label(action),
str(action.get("buy_amount_wan", "-")),
str(action.get("sell_amount_wan", "-")),
str(action.get("net_amount_wan", "-")),
Paragraph(str(action.get("seat_name", "-")), body_style),
]
)
else:
watch_rows.append(["", "-", "-", "-", "-", "-", "-", "-"])
watch_col_widths = [
28 * mm,
18 * mm,
33 * mm,
18 * mm,
18 * mm,
18 * mm,
18 * mm,
52 * mm,
]
story.append(_build_table(watch_rows, watch_col_widths))
story.extend([Spacer(1, 10), Paragraph("今日待加入关注", styles["heading"])])
candidate_rows = [["股票", "游资", "行业名称", "上市板块", "买入(万)", "卖出(万)", "净额(万)", "股价", "涨跌"]]
if report.candidate_actions:
for action in report.candidate_actions[:20]:
candidate_rows.append(
[
f"{action['stock_name']} {action['stock_code']}",
action["trader_name"],
_sector_label(action),
_major_board_label(action),
str(action.get("buy_amount_wan", "-")),
str(action.get("sell_amount_wan", "-")),
str(action.get("net_amount_wan", "-")),
str(action.get("current_price", "-")),
str(action.get("pct_chg", "-")),
]
)
else:
candidate_rows.append(["", "-", "-", "-", "-", "-", "-", "-", "-"])
story.append(_build_table(candidate_rows))
if report.warning_items:
story.extend([Spacer(1, 10), Paragraph("风险与预警", styles["heading"])])
warning_rows = [["股票", "游资", "类型", "等级", "原因"]]
for item in report.warning_items[:20]:
warning_rows.append(
[
f"{item['stock_name']} {item['stock_code']}",
str(item.get("trader_name", "-")),
str(item.get("warning_type", "-")),
str(item.get("warning_level", "-")),
str(item.get("trigger_reason", "-")),
]
)
story.append(_build_table(warning_rows))
doc.build(story)
return output_path

View File

@ -36,22 +36,22 @@ def _parse_json_list(value: Any) -> list[Any]:
def _infer_market_label(stock_code: str) -> str:
if stock_code.startswith(("6", "9", "5", "688")):
return "A"
return "A"
return "\u6caaA"
return "\u6df1A"
def _infer_board_label(stock_code: str) -> str:
if stock_code.startswith(("688", "689")):
return "绉戝垱鏉?"
return "\u79d1\u521b\u677f"
if stock_code.startswith(("300", "301")):
return "鍒涗笟鏉?"
return "\u521b\u4e1a\u677f"
if stock_code.startswith(("8", "4", "920")):
return "鍖椾氦鎵€"
return "\u5317\u4ea4\u6240"
if stock_code.startswith(("60", "601", "603", "605", "900")):
return "娌富鏉?"
return "\u6caa\u4e3b\u677f"
if stock_code.startswith(("000", "001", "002", "003", "200")):
return "娣变富鏉?"
return "A鑲?"
return "\u6df1\u4e3b\u677f"
return "A\u80a1"
def fetch_summary() -> dict[str, Any]:
@ -121,15 +121,29 @@ def fetch_traders() -> list[dict[str, Any]]:
t.alias_name,
t.warning_weight,
t.style_tags,
COUNT(DISTINCT d.stock_code) AS stock_count,
COUNT(DISTINCT CASE WHEN w.warning_type = 'sell_alert' THEN CONCAT(w.trade_date, ':', w.stock_code) END) AS sell_alert_count,
COUNT(DISTINCT CASE WHEN w.warning_type = 'slow_exit_watch' THEN CONCAT(w.trade_date, ':', w.stock_code) END) AS slow_exit_count
COALESCE(ds.stock_count, 0) AS stock_count,
COALESCE(ws.sell_alert_count, 0) AS sell_alert_count,
COALESCE(ws.slow_exit_count, 0) AS slow_exit_count
FROM traders t
LEFT JOIN lhb_detail_seats d
ON d.matched_trader_name = t.name
LEFT JOIN warning_events w
ON w.trader_name = t.name
GROUP BY t.id, t.name, t.alias_name, t.warning_weight, t.style_tags
LEFT JOIN (
SELECT
matched_trader_name,
COUNT(DISTINCT stock_code) AS stock_count
FROM lhb_detail_seats
WHERE matched_trader_name IS NOT NULL
GROUP BY matched_trader_name
) ds
ON ds.matched_trader_name = t.name
LEFT JOIN (
SELECT
trader_name,
COUNT(DISTINCT CASE WHEN warning_type = 'sell_alert' THEN CONCAT(trade_date, ':', stock_code) END) AS sell_alert_count,
COUNT(DISTINCT CASE WHEN warning_type = 'slow_exit_watch' THEN CONCAT(trade_date, ':', stock_code) END) AS slow_exit_count
FROM warning_events
WHERE trader_name IS NOT NULL
GROUP BY trader_name
) ws
ON ws.trader_name = t.name
ORDER BY stock_count DESC, t.name
"""
)
@ -274,6 +288,7 @@ def fetch_trader_actions(
o.price AS current_price,
o.pct_chg,
s.industry,
s.concept_tags,
s.market,
s.total_market_value,
s.circulating_market_value,
@ -313,6 +328,7 @@ def fetch_trader_actions(
actions = [_normalize_row(row) for row in cursor.fetchall()]
for action in actions:
action["concept_tags"] = _parse_json_list(action.get("concept_tags"))
action["market"] = action.get("market") or _infer_market_label(action["stock_code"])
action["board_label"] = _infer_board_label(action["stock_code"])
@ -419,65 +435,6 @@ def fetch_trader_detail(trader_id: int) -> dict[str, Any]:
for stock in stocks:
stock["is_net_amount_increasing"] = increasing_by_stock.get(stock["stock_code"], False)
missing_codes = [
row["stock_code"]
for row in stocks[:30]
if not row.get("industry") or not row.get("market") or row.get("total_market_value") is None
]
if missing_codes:
eastmoney = EastMoneyClient()
seen_codes: set[str] = set()
for stock_code in missing_codes:
if stock_code in seen_codes:
continue
seen_codes.add(stock_code)
profile: dict[str, Any] = {}
snapshot: dict[str, Any] = {}
try:
profile = eastmoney.fetch_company_profile(stock_code)
except Exception:
profile = {}
try:
snapshot = eastmoney.fetch_quote_snapshot(stock_code)
except Exception:
snapshot = {}
cursor.execute(
"""
INSERT INTO stocks (
stock_code,
stock_name,
market,
industry,
concept_tags,
total_market_value,
circulating_market_value
)
VALUES (%s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
stock_name = COALESCE(NULLIF(VALUES(stock_name), ''), stock_name),
market = COALESCE(NULLIF(VALUES(market), ''), market),
industry = COALESCE(NULLIF(VALUES(industry), ''), industry),
concept_tags = COALESCE(VALUES(concept_tags), concept_tags),
total_market_value = COALESCE(VALUES(total_market_value), total_market_value),
circulating_market_value = COALESCE(VALUES(circulating_market_value), circulating_market_value)
""",
(
stock_code,
profile.get("stock_name") or (snapshot.get("stock_name") if snapshot else None) or stock_code,
profile.get("market"),
profile.get("industry") or snapshot.get("industry"),
json.dumps(profile.get("concept_tags") or [], ensure_ascii=False) if profile.get("concept_tags") else None,
snapshot.get("total_market_value"),
snapshot.get("circulating_market_value"),
),
)
cursor.execute(stock_query, (trader_name,))
stocks = [_normalize_row(row) for row in cursor.fetchall()]
for stock in stocks:
stock["is_net_amount_increasing"] = increasing_by_stock.get(stock["stock_code"], False)
cursor.execute(
"""
SELECT trade_date, stock_code, stock_name, warning_type, warning_level, trigger_reason

View File

@ -0,0 +1,254 @@
from __future__ import annotations
import json
from collections import defaultdict
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from .config import AppConfig
from .db import db_cursor
from .queries import fetch_trader_actions, fetch_watchlist, fetch_warnings
from .sources.eastmoney import EastMoneyClient
@dataclass(slots=True)
class DailyReport:
trade_date: str
watchlist_items: list[dict[str, Any]]
watch_actions: list[dict[str, Any]]
candidate_actions: list[dict[str, Any]]
warning_items: list[dict[str, Any]]
def get_latest_trade_date(config: AppConfig) -> str | None:
with db_cursor(config=config) as (_, cursor):
cursor.execute("SELECT MAX(trade_date) AS latest_trade_date FROM lhb_overview")
row = cursor.fetchone()
latest_trade_date = row["latest_trade_date"] if row else None
return latest_trade_date.isoformat() if latest_trade_date else None
def _to_float(value: Any) -> float:
if value in (None, "", "-"):
return 0.0
try:
return float(value)
except (TypeError, ValueError):
return 0.0
def _normalize_seat_name(value: str) -> str:
return "".join(value.split()).strip()
def _merged_action_side(buy_amount: float, sell_amount: float, net_amount: float) -> str:
if buy_amount > 0 and sell_amount <= 0:
return "buy"
if sell_amount > 0 and buy_amount <= 0:
return "sell"
if net_amount >= 0:
return "net_buy"
return "net_sell"
def aggregate_watch_actions(actions: list[dict[str, Any]]) -> list[dict[str, Any]]:
groups: dict[str, dict[str, Any]] = {}
for action in actions:
key = "::".join(
[
action["stock_code"],
action["trade_date"],
action["trader_name"],
_normalize_seat_name(action["seat_name"]),
]
)
existing = groups.get(key)
if existing is None:
groups[key] = dict(action)
continue
next_buy = _to_float(existing.get("buy_amount_wan")) + _to_float(action.get("buy_amount_wan"))
next_sell = _to_float(existing.get("sell_amount_wan")) + _to_float(action.get("sell_amount_wan"))
next_net = next_buy - next_sell
table_titles = [existing.get("table_title", ""), action.get("table_title", "")]
merged_titles = " / ".join(dict.fromkeys(title for title in table_titles if title))
existing.update(
{
"table_title": merged_titles,
"buy_amount_wan": f"{next_buy:.2f}",
"sell_amount_wan": f"{next_sell:.2f}",
"net_amount_wan": f"{next_net:.2f}",
"action_side": _merged_action_side(next_buy, next_sell, next_net),
}
)
return list(groups.values())
def unique_candidate_actions(actions: list[dict[str, Any]], watched_codes: set[str]) -> list[dict[str, Any]]:
unique: dict[str, dict[str, Any]] = {}
for action in actions:
if action["stock_code"] in watched_codes:
continue
unique.setdefault(action["stock_code"], action)
return list(unique.values())
def _sector_label(action: dict[str, Any]) -> str:
concept_tags = action.get("concept_tags") or []
if isinstance(concept_tags, str):
try:
concept_tags = json.loads(concept_tags)
except json.JSONDecodeError:
concept_tags = []
if concept_tags:
return " / ".join(concept_tags[:2])
return str(action.get("industry") or action.get("board_label") or "-")
def _major_board_label(action: dict[str, Any]) -> str:
return str(action.get("board_label") or action.get("market") or "-")
def enrich_stock_metadata(*, config: AppConfig, stock_codes: set[str]) -> None:
if not stock_codes:
return
placeholders = ", ".join(["%s"] * len(stock_codes))
with db_cursor(config=config) as (_, cursor):
cursor.execute(
f"""
SELECT stock_code, stock_name, industry, concept_tags, market
FROM stocks
WHERE stock_code IN ({placeholders})
""",
tuple(sorted(stock_codes)),
)
existing_rows = {row["stock_code"]: row for row in cursor.fetchall()}
client = EastMoneyClient()
for stock_code in sorted(stock_codes):
row = existing_rows.get(stock_code)
has_industry = bool(row and row.get("industry"))
has_concepts = bool(row and row.get("concept_tags"))
if has_industry and has_concepts:
continue
profile = client.fetch_company_profile(stock_code)
cursor.execute(
"""
INSERT INTO stocks (stock_code, stock_name, market, industry, concept_tags)
VALUES (%s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
stock_name = COALESCE(NULLIF(VALUES(stock_name), ''), stock_name),
market = COALESCE(NULLIF(VALUES(market), ''), market),
industry = COALESCE(NULLIF(VALUES(industry), ''), industry),
concept_tags = COALESCE(VALUES(concept_tags), concept_tags)
""",
(
stock_code,
profile.get("stock_name") or (row.get("stock_name") if row else stock_code),
profile.get("market"),
profile.get("industry"),
json.dumps(profile.get("concept_tags") or [], ensure_ascii=False),
),
)
def build_daily_report(*, config: AppConfig, trade_date: str) -> DailyReport:
actions_response = fetch_trader_actions(
date_from=trade_date,
date_to=trade_date,
limit=500,
)
initial_actions = actions_response["actions"]
enrich_stock_metadata(
config=config,
stock_codes={action["stock_code"] for action in initial_actions},
)
all_actions = fetch_trader_actions(
date_from=trade_date,
date_to=trade_date,
limit=500,
)["actions"]
watchlist_items = fetch_watchlist()
watch_codes = {item["stock_code"] for item in watchlist_items}
watch_actions_raw = [action for action in all_actions if action["stock_code"] in watch_codes]
watch_actions = aggregate_watch_actions(watch_actions_raw)
candidate_actions = unique_candidate_actions(all_actions, watch_codes)
warning_items = [
warning
for warning in fetch_warnings(limit=100)
if warning.get("stock_code") in watch_codes
]
return DailyReport(
trade_date=trade_date,
watchlist_items=watchlist_items,
watch_actions=watch_actions,
candidate_actions=candidate_actions,
warning_items=warning_items,
)
def build_email_body(report: DailyReport) -> str:
lines = [
f"lhbfx 盘后日报 - {report.trade_date}",
"",
f"关注池股票数:{len(report.watchlist_items)}",
f"关注池当日流水数:{len(report.watch_actions)}",
f"待加入关注候选数:{len(report.candidate_actions)}",
f"风险预警数:{len(report.warning_items)}",
"",
"关注池情况:",
]
if report.watch_actions:
for action in report.watch_actions[:10]:
lines.append(
f"- {action['stock_name']} {action['stock_code']} | {action['trader_name']} | "
f"行业名称 {_sector_label(action)} | "
f"上市板块 {_major_board_label(action)} | "
f"买入 {action.get('buy_amount_wan', '-')}万 | "
f"卖出 {action.get('sell_amount_wan', '-')}万 | "
f"净额 {action.get('net_amount_wan', '-')}"
)
else:
lines.append("- 今日关注池暂无新增流水")
lines.extend(["", "今日待加入关注:"])
if report.candidate_actions:
for action in report.candidate_actions[:10]:
lines.append(
f"- {action['stock_name']} {action['stock_code']} | {action['trader_name']} | "
f"行业名称 {_sector_label(action)} | "
f"上市板块 {_major_board_label(action)} | "
f"买入 {action.get('buy_amount_wan', '-')}万 | "
f"卖出 {action.get('sell_amount_wan', '-')}万 | "
f"净额 {action.get('net_amount_wan', '-')}万 | "
f"股价 {action.get('current_price', '-')} | "
f"涨跌 {action.get('pct_chg', '-')}"
)
else:
lines.append("- 今日暂无候选股票")
if report.warning_items:
lines.extend(["", "关注池风险提示:"])
for warning in report.warning_items[:10]:
lines.append(
f"- {warning['stock_name']} {warning['stock_code']} | {warning['warning_type']} | {warning['trigger_reason']}"
)
lines.extend(["", "详见附件 PDF 日报。"])
return "\n".join(lines)
def default_report_output_path(trade_date: str) -> Path:
root = Path(__file__).resolve().parents[2]
output_dir = root.parent / "output" / "reports"
output_dir.mkdir(parents=True, exist_ok=True)
return output_dir / f"lhbfx-daily-report-{trade_date}.pdf"

View File

@ -75,6 +75,7 @@ CREATE TABLE IF NOT EXISTS lhb_detail_seats (
KEY idx_lhb_detail_code (stock_code),
KEY idx_lhb_detail_trade_date (trade_date),
KEY idx_lhb_detail_trader (matched_trader_name),
KEY idx_lhb_detail_trader_stock_date (matched_trader_name, stock_code, trade_date),
UNIQUE KEY uniq_lhb_detail_record (trade_date, stock_code, rid, table_title, seat_name)
);
@ -94,7 +95,8 @@ CREATE TABLE IF NOT EXISTS warning_events (
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
KEY idx_warning_events_code (stock_code),
KEY idx_warning_events_trade_date (trade_date),
KEY idx_warning_events_trader (trader_name)
KEY idx_warning_events_trader (trader_name),
KEY idx_warning_events_trader_type_date_code (trader_name, warning_type, trade_date, stock_code)
);
CREATE TABLE IF NOT EXISTS watchlist_entries (

View File

@ -21,7 +21,53 @@ def infer_secid(stock_code: str) -> str:
return f"0.{stock_code}"
def infer_symbol(stock_code: str) -> str:
if stock_code.startswith(("6", "9", "5", "688")):
return f"SH{stock_code}"
if stock_code.startswith(("8", "4")):
return f"BJ{stock_code}"
return f"SZ{stock_code}"
class EastMoneyClient:
def fetch_company_profile(self, stock_code: str) -> dict[str, Any]:
symbol = infer_symbol(stock_code)
survey_url = "https://emweb.securities.eastmoney.com/PC_HSF10/CompanySurvey/CompanySurveyAjax"
concept_url = "https://emweb.securities.eastmoney.com/PC_HSF10/CoreConception/PageAjax"
survey_response = requests.get(
survey_url,
params={"code": symbol},
headers=DEFAULT_HEADERS,
timeout=20,
)
survey_response.raise_for_status()
survey_payload = survey_response.json() or {}
basic = survey_payload.get("jbzl") or {}
concept_response = requests.get(
concept_url,
params={"code": symbol},
headers=DEFAULT_HEADERS,
timeout=20,
)
concept_response.raise_for_status()
concept_payload = concept_response.json() or {}
concept_rows = concept_payload.get("ssbk") or []
concept_tags = []
for row in concept_rows:
board_name = row.get("BOARD_NAME")
if board_name and board_name not in concept_tags:
concept_tags.append(board_name)
return {
"stock_code": stock_code,
"stock_name": basic.get("agjc"),
"market": basic.get("zqlb"),
"industry": basic.get("sshy") or basic.get("sszjhhy"),
"concept_tags": concept_tags,
}
def fetch_quote_snapshot(self, stock_code: str) -> dict[str, Any]:
secid = infer_secid(stock_code)
url = "https://push2.eastmoney.com/api/qt/stock/get"