Files
zjjk/backend/app/services/ashare_flow_service.py

671 lines
27 KiB
Python
Raw Permalink Normal View History

2026-03-20 21:47:30 +08:00
from __future__ import annotations
from collections import defaultdict
from datetime import datetime
from time import sleep
from zoneinfo import ZoneInfo
from app.clients.eastmoney_client import EastmoneyClient
from app.core.config import HISTORY_START_DATE
from app.repositories.monitoring_repository import MonitoringRepository
SECTOR_TYPE_CONFIG = {
"industry": {
"label": "行业板块",
"fs": "m:90+t:2",
},
"concept": {
"label": "概念板块",
"fs": "m:90+t:3",
},
"region": {
"label": "地域板块",
"fs": "m:90+t:1",
},
}
TRACKED_INDICES = [
{"code": "000001", "name": "上证指数", "secid": "1.000001"},
{"code": "399001", "name": "深证成指", "secid": "0.399001"},
{"code": "399006", "name": "创业板指", "secid": "0.399006"},
{"code": "000300", "name": "沪深300", "secid": "1.000300"},
{"code": "000905", "name": "中证500", "secid": "1.000905"},
{"code": "000852", "name": "中证1000", "secid": "1.000852"},
{"code": "932000", "name": "中证2000", "secid": "2.932000"},
{"code": "000688", "name": "科创50", "secid": "1.000688"},
]
ROLLING_WINDOWS = [5, 10, 30, 60, 90]
class AShareFlowService:
def __init__(self) -> None:
self.client = EastmoneyClient()
self.repository = MonitoringRepository()
self.tz = ZoneInfo("Asia/Shanghai")
@staticmethod
def _safe_float(value: str | float | int | None) -> float | None:
if value in (None, "", "-"):
return None
return float(value)
@staticmethod
def _safe_int(value: str | float | int | None) -> int | None:
if value in (None, "", "-"):
return None
return int(float(value))
@staticmethod
def _normalize_quote_price(value: int | float | None) -> float | None:
if value is None:
return None
return round(float(value) / 100, 2)
@staticmethod
def _normalize_quote_change_amount(value: int | float | None) -> float | None:
if value is None:
return None
return round(float(value) / 100, 2)
@staticmethod
def _normalize_quote_change_percent(value: int | float | None) -> float | None:
if value is None:
return None
return round(float(value) / 100, 2)
@staticmethod
def _normalize_flow_amount(value: str | float | int | None) -> float | None:
parsed = AShareFlowService._safe_float(value)
if parsed is None:
return None
return round(parsed / 100000000, 4)
@staticmethod
def _build_index_detail_url(code: str) -> str:
return f"https://quote.eastmoney.com/zs{code}.html"
@staticmethod
def _build_sector_detail_url(code: str) -> str:
return f"https://quote.eastmoney.com/bk/90.{code}.html"
def _normalize_stored_flow_amount(self, value: str | float | int | None) -> float:
parsed = self._safe_float(value) or 0.0
if abs(parsed) >= 100000:
return round(parsed / 100000000, 4)
return round(parsed, 4)
def _now(self) -> datetime:
return datetime.now(self.tz)
def _today(self) -> str:
return self._now().date().isoformat()
@staticmethod
def _rolling_field(window: int) -> str:
return f"rolling_net_inflow_{window}d"
def _build_history_value_map(self, category: str, *, current_trade_date: str, is_sector: bool) -> dict[str, list[float]]:
payloads = [
item
for item in self.repository.list_documents(category, limit=max(ROLLING_WINDOWS) * 2)
if item.get("trade_date") and item.get("trade_date") != current_trade_date
]
history_map: dict[str, list[float]] = defaultdict(list)
for payload in payloads:
if is_sector:
sector_groups = payload.get("sector_types", {})
for sector_type, group in sector_groups.items():
records = group.get("records", []) if isinstance(group, dict) else group
for record in records:
key = f"{sector_type}:{record['code']}"
history_map[key].append(self._normalize_stored_flow_amount(record.get("main_net_inflow")))
else:
for record in payload.get("records", []):
history_map[record["code"]].append(self._normalize_stored_flow_amount(record.get("main_net_inflow")))
return history_map
def _attach_rolling_metrics(self, records: list[dict], *, category: str, current_trade_date: str, is_sector: bool) -> list[dict]:
history_map = self._build_history_value_map(category, current_trade_date=current_trade_date, is_sector=is_sector)
return self._apply_rolling_metrics(records, history_map=history_map, is_sector=is_sector)
def _apply_rolling_metrics(
self,
records: list[dict],
*,
history_map: dict[str, list[float]],
is_sector: bool,
) -> list[dict]:
if not records:
return records
for record in records:
key = f"{record.get('sector_type')}:{record['code']}" if is_sector else record["code"]
previous_values = history_map.get(key, [])
current_value = float(record.get("main_net_inflow") or 0)
for window in ROLLING_WINDOWS:
field = self._rolling_field(window)
if len(previous_values) < window - 1:
record[field] = None
continue
record[field] = round(current_value + sum(previous_values[: window - 1]), 4)
return records
@staticmethod
def _has_sector_records(payload: dict) -> bool:
sector_types = payload.get("sector_types", {})
for group in sector_types.values():
records = group.get("records", []) if isinstance(group, dict) else group
if records:
return True
return False
def _parse_sector_realtime_record(self, row: dict, sector_type: str, updated_at: str) -> dict:
config = SECTOR_TYPE_CONFIG[sector_type]
return {
"trade_date": self._today(),
"sector_type": sector_type,
"sector_type_label": config["label"],
"code": row.get("f12"),
"name": row.get("f14"),
"detail_url": self._build_sector_detail_url(row.get("f12")),
"latest_price": self._safe_float(row.get("f2")),
"change_percent": self._safe_float(row.get("f3")),
"main_net_inflow": self._normalize_flow_amount(row.get("f62")),
"main_net_inflow_ratio": self._safe_float(row.get("f184")),
"super_large_net_inflow": self._normalize_flow_amount(row.get("f66")),
"super_large_net_inflow_ratio": self._safe_float(row.get("f69")),
"large_net_inflow": self._normalize_flow_amount(row.get("f72")),
"large_net_inflow_ratio": self._safe_float(row.get("f75")),
"medium_net_inflow": self._normalize_flow_amount(row.get("f78")),
"medium_net_inflow_ratio": self._safe_float(row.get("f81")),
"small_net_inflow": self._normalize_flow_amount(row.get("f84")),
"small_net_inflow_ratio": self._safe_float(row.get("f87")),
"updated_at": updated_at,
"source_name": "东方财富",
"source_url": "https://data.eastmoney.com/bkzj/",
"precision": "realtime_exact",
}
def _parse_daily_kline_record(
self,
*,
code: str,
name: str,
values: str,
source_url: str,
extra: dict | None = None,
) -> dict:
parts = values.split(",")
if len(parts) < 13:
raise ValueError(f"unexpected daykline payload: {values}")
payload = {
"trade_date": parts[0],
"code": code,
"name": name,
"detail_url": self._build_sector_detail_url(code) if source_url.endswith("/bkzj/") else self._build_index_detail_url(code),
"main_net_inflow": self._normalize_flow_amount(parts[1]),
"super_large_net_inflow": self._normalize_flow_amount(parts[2]),
"large_net_inflow": self._normalize_flow_amount(parts[3]),
"medium_net_inflow": self._normalize_flow_amount(parts[4]),
"small_net_inflow": self._normalize_flow_amount(parts[5]),
"main_net_inflow_ratio": self._safe_float(parts[6]),
"super_large_net_inflow_ratio": self._safe_float(parts[7]),
"large_net_inflow_ratio": self._safe_float(parts[8]),
"medium_net_inflow_ratio": self._safe_float(parts[9]),
"small_net_inflow_ratio": self._safe_float(parts[10]),
"latest_price": self._safe_float(parts[11]),
"change_percent": self._safe_float(parts[12]),
"updated_at": self._now().isoformat(timespec="seconds"),
"source_name": "东方财富",
"source_url": source_url,
"precision": "historical_exact",
}
if extra:
payload.update(extra)
return payload
def _parse_minute_kline_record(
self,
*,
code: str,
name: str,
values: str,
source_url: str,
latest_price: float | None,
change_amount: float | None,
change_percent: float | None,
extra: dict | None = None,
) -> dict:
parts = values.split(",")
if len(parts) < 6:
raise ValueError(f"unexpected minute kline payload: {values}")
payload = {
"trade_date": parts[0].split(" ")[0],
"snapshot_time": f"{parts[0]}:00+08:00",
"code": code,
"name": name,
"detail_url": self._build_index_detail_url(code),
"latest_price": latest_price,
"change_amount": change_amount,
"change_percent": change_percent,
"main_net_inflow": self._normalize_flow_amount(parts[1]),
"super_large_net_inflow": self._normalize_flow_amount(parts[2]),
"large_net_inflow": self._normalize_flow_amount(parts[3]),
"medium_net_inflow": self._normalize_flow_amount(parts[4]),
"small_net_inflow": self._normalize_flow_amount(parts[5]),
"main_net_inflow_ratio": None,
"super_large_net_inflow_ratio": None,
"large_net_inflow_ratio": None,
"medium_net_inflow_ratio": None,
"small_net_inflow_ratio": None,
"updated_at": self._now().isoformat(timespec="seconds"),
"source_name": "东方财富",
"source_url": source_url,
"precision": "realtime_exact",
}
if extra:
payload.update(extra)
return payload
def sync_sector_realtime(self) -> dict:
updated_at = self._now().isoformat(timespec="seconds")
payload = {
"trade_date": self._today(),
"updated_at": updated_at,
"source_name": "东方财富",
"source_url": "https://data.eastmoney.com/bkzj/",
"precision": "realtime_exact",
"sector_types": {},
}
raw_payloads: dict[str, list[dict]] = {}
for sector_type, config in SECTOR_TYPE_CONFIG.items():
rows = self.client.fetch_all_sector_realtime(sector_type_fs=config["fs"])
raw_payloads[sector_type] = rows
payload["sector_types"][sector_type] = {
"label": config["label"],
"records": [self._parse_sector_realtime_record(row, sector_type, updated_at) for row in rows],
}
self.repository.save_document(
"ashare_sector_realtime",
payload["trade_date"],
payload,
sort_value=payload["trade_date"],
)
self.repository.save_document(
"ashare_sector_catalog",
payload["trade_date"],
{
"trade_date": payload["trade_date"],
"updated_at": updated_at,
"sector_types": {
sector_type: [
{"code": item["code"], "name": item["name"], "sector_type": sector_type}
for item in data["records"]
]
for sector_type, data in payload["sector_types"].items()
},
},
sort_value=payload["trade_date"],
)
self.repository.save_raw_payload(f"ashare_sector_realtime_{payload['trade_date']}", raw_payloads)
for group in payload["sector_types"].values():
self._attach_rolling_metrics(
group["records"],
category="ashare_sector_daily",
current_trade_date=payload["trade_date"],
is_sector=True,
)
self.repository.save_document(
"ashare_sector_realtime",
payload["trade_date"],
payload,
sort_value=payload["trade_date"],
)
self.repository.save_document(
"ashare_sector_realtime_latest_success",
"default",
payload,
sort_value=payload["trade_date"],
)
self._persist_today_sector_daily(payload)
return payload
def sync_index_realtime(self) -> dict:
updated_at = self._now().isoformat(timespec="seconds")
records: list[dict] = []
raw_payloads: dict[str, dict] = {}
for definition in TRACKED_INDICES:
quote_payload = self.client.fetch_quote(definition["secid"])
minute_payload = self.client.fetch_fund_flow_minute_kline(definition["secid"], limit=1)
raw_payloads[definition["code"]] = {
"quote": quote_payload,
"minute": minute_payload,
}
quote_data = quote_payload.get("data") or {}
minute_data = minute_payload.get("data") or {}
minute_rows = minute_data.get("klines") or []
if not minute_rows:
continue
records.append(
self._parse_minute_kline_record(
code=definition["code"],
name=definition["name"],
values=minute_rows[-1],
source_url="https://data.eastmoney.com/zjlx/",
latest_price=self._normalize_quote_price(self._safe_int(quote_data.get("f43"))),
change_amount=self._normalize_quote_change_amount(self._safe_int(quote_data.get("f169"))),
change_percent=self._normalize_quote_change_percent(self._safe_int(quote_data.get("f170"))),
)
)
payload = {
"trade_date": self._today(),
"updated_at": updated_at,
"source_name": "东方财富",
"source_url": "https://data.eastmoney.com/zjlx/",
"precision": "realtime_exact",
"records": self._attach_rolling_metrics(
records,
category="ashare_index_daily",
current_trade_date=self._today(),
is_sector=False,
),
}
self.repository.save_document(
"ashare_index_realtime",
payload["trade_date"],
payload,
sort_value=payload["trade_date"],
)
self._persist_today_index_daily(payload)
self.repository.save_raw_payload(f"ashare_index_realtime_{payload['trade_date']}", raw_payloads)
return payload
def backfill_index_daily_history(self, start_date: str | None = None) -> dict:
history_start = start_date or self.repository.get_system_config().get("history_backfill_start_date", HISTORY_START_DATE)
by_date: dict[str, list[dict]] = defaultdict(list)
raw_payloads: dict[str, dict] = {}
for definition in TRACKED_INDICES:
response = self.client.fetch_fund_flow_daykline(definition["secid"], limit=800)
raw_payloads[definition["code"]] = response
data = response.get("data") or {}
for line in data.get("klines") or []:
record = self._parse_daily_kline_record(
code=definition["code"],
name=definition["name"],
values=line,
source_url="https://data.eastmoney.com/zjlx/",
)
if record["trade_date"] < history_start:
continue
by_date[record["trade_date"]].append(record)
for trade_date, records in by_date.items():
payload = {
"trade_date": trade_date,
"updated_at": self._now().isoformat(timespec="seconds"),
"source_name": "东方财富",
"source_url": "https://data.eastmoney.com/zjlx/",
"precision": "historical_exact",
"records": sorted(records, key=lambda item: item["code"]),
}
self.repository.save_document("ashare_index_daily", trade_date, payload, sort_value=trade_date)
self.repository.save_raw_payload(f"ashare_index_daily_backfill_{self._today()}", raw_payloads)
meta = {
"last_backfill_at": self._now().isoformat(timespec="seconds"),
"start_date": history_start,
"trading_day_count": len(by_date),
"index_count": len(TRACKED_INDICES),
}
self.repository.save_document("ashare_index_history_meta", "default", meta)
return meta
def _merge_sector_daily_payload(
self,
existing: dict,
trade_date: str,
batch_groups: dict[str, list[dict]],
) -> dict:
merged_sector_types = dict(existing.get("sector_types", {}))
for sector_type, records in batch_groups.items():
existing_group = merged_sector_types.get(sector_type, {})
existing_records = existing_group.get("records", []) if isinstance(existing_group, dict) else existing_group
code_map = {item["code"]: item for item in existing_records}
for record in records:
code_map[record["code"]] = record
merged_sector_types[sector_type] = {
"label": SECTOR_TYPE_CONFIG[sector_type]["label"],
"records": sorted(code_map.values(), key=lambda item: item["code"]),
}
return {
"trade_date": trade_date,
"updated_at": self._now().isoformat(timespec="seconds"),
"source_name": "东方财富",
"source_url": "https://data.eastmoney.com/bkzj/",
"precision": "historical_exact",
"sector_types": merged_sector_types,
}
def _persist_today_index_daily(self, payload: dict) -> None:
if not payload.get("records"):
return
self.repository.save_document(
"ashare_index_daily",
payload["trade_date"],
{
"trade_date": payload["trade_date"],
"updated_at": payload.get("updated_at"),
"source_name": payload.get("source_name", "东方财富"),
"source_url": payload.get("source_url"),
"precision": "realtime_exact",
"records": sorted(payload["records"], key=lambda item: item["code"]),
},
sort_value=payload["trade_date"],
)
def _persist_today_sector_daily(self, payload: dict) -> None:
if not self._has_sector_records(payload):
return
self.repository.save_document(
"ashare_sector_daily",
payload["trade_date"],
{
"trade_date": payload["trade_date"],
"updated_at": payload.get("updated_at"),
"source_name": payload.get("source_name", "东方财富"),
"source_url": payload.get("source_url"),
"precision": "realtime_exact",
"sector_types": {
sector_type: {
"label": group.get("label", SECTOR_TYPE_CONFIG[sector_type]["label"]),
"records": sorted(group.get("records", []), key=lambda item: item["code"]),
}
for sector_type, group in payload.get("sector_types", {}).items()
},
},
sort_value=payload["trade_date"],
)
def backfill_sector_daily_history(self, start_date: str | None = None, *, batch_size: int = 120) -> dict:
history_start = start_date or self.repository.get_system_config().get("history_backfill_start_date", HISTORY_START_DATE)
catalog = self.repository.get_document("ashare_sector_catalog", self._today(), {})
if not catalog:
catalog = self.sync_sector_realtime()
sector_items: list[tuple[str, dict]] = []
sector_types = catalog.get("sector_types", {})
for sector_type, items in sector_types.items():
iterable = items["records"] if isinstance(items, dict) else items
for item in iterable:
sector_items.append((sector_type, item))
sector_items.sort(key=lambda pair: (pair[0], pair[1]["code"]))
meta_state = self.repository.get_document("ashare_sector_history_meta", "default", {})
next_sector_index = 0
if meta_state.get("start_date") == history_start and not meta_state.get("completed", False):
next_sector_index = int(meta_state.get("next_sector_index", 0))
end_sector_index = min(next_sector_index + batch_size, len(sector_items))
selected_items = sector_items[next_sector_index:end_sector_index]
by_date: dict[str, dict[str, list[dict]]] = defaultdict(lambda: defaultdict(list))
raw_payload_index: dict[str, dict] = {}
failures: list[dict] = []
for position, (sector_type, item) in enumerate(selected_items, start=1):
code = item["code"]
name = item["name"]
try:
response = self.client.fetch_fund_flow_daykline(f"90.{code}", limit=800)
except Exception as exc:
failures.append(
{
"sector_type": sector_type,
"code": code,
"name": name,
"error": str(exc),
}
)
continue
raw_payload_index[f"{sector_type}:{code}"] = {
"code": code,
"name": name,
"sector_type": sector_type,
"response_meta": {
"market": (response.get("data") or {}).get("market"),
"name": (response.get("data") or {}).get("name"),
},
}
for line in (response.get("data") or {}).get("klines") or []:
record = self._parse_daily_kline_record(
code=code,
name=name,
values=line,
source_url="https://data.eastmoney.com/bkzj/",
extra={
"sector_type": sector_type,
"sector_type_label": SECTOR_TYPE_CONFIG[sector_type]["label"],
},
)
if record["trade_date"] < history_start:
continue
by_date[record["trade_date"]][sector_type].append(record)
if position % 50 == 0:
sleep(0.5)
for trade_date, sector_groups in by_date.items():
existing = self.repository.get_document("ashare_sector_daily", trade_date, {})
payload = self._merge_sector_daily_payload(existing, trade_date, sector_groups)
self.repository.save_document("ashare_sector_daily", trade_date, payload, sort_value=trade_date)
if raw_payload_index:
self.repository.save_raw_payload(
f"ashare_sector_daily_backfill_{self._today()}_{next_sector_index}_{end_sector_index}",
raw_payload_index,
)
completed = end_sector_index >= len(sector_items)
meta = {
"last_backfill_at": self._now().isoformat(timespec="seconds"),
"start_date": history_start,
"trading_day_count": len(self.repository.list_documents("ashare_sector_daily")),
"sector_count": len(sector_items),
"batch_size": batch_size,
"processed_in_batch": len(selected_items),
"next_sector_index": 0 if completed else end_sector_index,
"completed": completed,
"failed_sector_count": len(failures),
"failures": failures[:50],
}
self.repository.save_document("ashare_sector_history_meta", "default", meta)
return meta
def get_index_realtime(self) -> dict:
payload = self.repository.get_document("ashare_index_realtime", self._today(), {})
if payload:
self._persist_today_index_daily(payload)
history_map = self._build_history_value_map(
"ashare_index_daily",
current_trade_date=payload.get("trade_date", self._today()),
is_sector=False,
)
self._apply_rolling_metrics(
payload.get("records", []),
history_map=history_map,
is_sector=False,
)
return payload
return {
"trade_date": self._today(),
"updated_at": None,
"source_name": "东方财富",
"source_url": "https://data.eastmoney.com/zjlx/",
"precision": "unavailable",
"records": [],
}
def get_sector_realtime(self) -> dict:
payload = self.repository.get_document("ashare_sector_realtime", self._today(), {})
if not payload or not self._has_sector_records(payload):
payload = self.repository.get_document("ashare_sector_realtime_latest_success", "default", {})
if payload and self._has_sector_records(payload):
self._persist_today_sector_daily(payload)
return payload
for item in self.repository.list_documents("ashare_sector_realtime", limit=10):
if not self._has_sector_records(item):
continue
self._persist_today_sector_daily(item)
return item
return {
"trade_date": self._today(),
"updated_at": None,
"source_name": "东方财富",
"source_url": "https://data.eastmoney.com/bkzj/",
"precision": "unavailable",
"sector_types": {},
}
def get_index_daily(self, trade_date: str | None = None) -> dict:
target_date = trade_date or self._today()
payload = self.repository.get_document("ashare_index_daily", target_date, {})
if payload:
return payload
return {
"trade_date": target_date,
"updated_at": None,
"source_name": "东方财富",
"source_url": "https://data.eastmoney.com/zjlx/",
"precision": "unavailable",
"records": [],
}
def get_sector_daily(self, trade_date: str | None = None) -> dict:
target_date = trade_date or self._today()
payload = self.repository.get_document("ashare_sector_daily", target_date, {})
if payload:
return payload
return {
"trade_date": target_date,
"updated_at": None,
"source_name": "东方财富",
"source_url": "https://data.eastmoney.com/bkzj/",
"precision": "unavailable",
"sector_types": {},
}
ashare_flow_service = AShareFlowService()