from __future__ import annotations import json import subprocess import time from typing import Any from urllib.parse import urlencode from urllib.request import Request, urlopen class EastmoneyClient: HISTORY_ENDPOINT = "https://datacenter-web.eastmoney.com/api/data/v1/get" KAMT_ENDPOINT = "https://push2.eastmoney.com/api/qt/kamt/get" KAMT_BSMIN_ENDPOINT = "https://push2.eastmoney.com/api/qt/kamtbs.rtmin/get" CLIST_ENDPOINT = "https://push2.eastmoney.com/api/qt/clist/get" QUOTE_ENDPOINT = "https://push2.eastmoney.com/api/qt/stock/get" STOCK_TRENDS_ENDPOINT = "https://push2his.eastmoney.com/api/qt/stock/trends2/get" STOCK_KLINE_ENDPOINT = "https://push2his.eastmoney.com/api/qt/stock/kline/get" FFLOW_DAYKLINE_ENDPOINT = "https://push2his.eastmoney.com/api/qt/stock/fflow/daykline/get" FFLOW_MINUTE_KLINE_ENDPOINT = "https://push2.eastmoney.com/api/qt/stock/fflow/kline/get" def __init__(self) -> None: self.default_headers = { "User-Agent": "Mozilla/5.0", "Accept-Language": "zh-CN,zh;q=0.9", "Referer": "https://data.eastmoney.com/hsgt/hsgtV2.html", } @staticmethod def _parse_json_payload(content: str) -> dict[str, Any]: payload = content.strip() if payload.startswith("{"): return json.loads(payload) left = payload.find("(") right = payload.rfind(")") if left != -1 and right != -1 and right > left: return json.loads(payload[left + 1 : right]) raise ValueError("unexpected eastmoney payload") def _get_json( self, url: str, params: dict[str, Any], *, headers: dict[str, str] | None = None, ) -> dict[str, Any]: full_url = f"{url}?{urlencode(params)}" request_headers = {**self.default_headers, **(headers or {})} last_error: Exception | None = None for attempt in range(5): request = Request(full_url, headers=request_headers) try: with urlopen(request, timeout=20) as response: content = response.read().decode("utf-8", "ignore") return self._parse_json_payload(content) except Exception as exc: last_error = exc if attempt < 4: time.sleep(attempt + 1) if last_error is not None: raise last_error raise RuntimeError("东方财富接口请求失败") def fetch_realtime_overview(self) -> dict[str, Any]: return self._get_json( self.KAMT_ENDPOINT, { "fields1": "f1,f2,f3,f4", "fields2": "f51,f52,f53,f54,f56,f60,f61,f62,f63,f65,f66", "ut": "fa5fd1943c7b386f172d6893dbfba10b", }, ) def fetch_intraday_timeline(self) -> dict[str, Any]: return self._get_json( self.KAMT_BSMIN_ENDPOINT, { "fields1": "f1,f2,f3,f4", "fields2": "f51,f54,f52,f58,f53,f62,f56,f57,f60,f61", "ut": "b2884a393a59ad64002292a3e90d46a5", }, ) def fetch_history(self, start_date: str, page_size: int = 500) -> list[dict[str, Any]]: page_number = 1 pages = 1 rows: list[dict[str, Any]] = [] while page_number <= pages: response = self._get_json( self.HISTORY_ENDPOINT, { "reportName": "RPT_MUTUAL_DEAL_HISTORY", "columns": "ALL", "pageNumber": page_number, "pageSize": page_size, "sortColumns": "TRADE_DATE", "sortTypes": "-1", "source": "WEB", "client": "WEB", "filter": f'(MUTUAL_TYPE in ("002","004"))(TRADE_DATE>=\'{start_date}\')', }, ) result = response.get("result") or {} pages = result.get("pages") or 0 rows.extend(result.get("data") or []) page_number += 1 return rows def fetch_sector_realtime_page( self, *, sector_type_fs: str, page_number: int = 1, page_size: int = 100, ) -> dict[str, Any]: timestamp = int(time.time() * 1000) params = { "cb": f"jQuery11230{timestamp}", "pn": page_number, "pz": page_size, "po": 1, "np": 1, "fltt": 2, "invt": 2, "fid": "f62", "ut": "b2884a393a59ad64002292a3e90d46a5", "fs": sector_type_fs, "fields": "f12,f14,f2,f3,f62,f184,f66,f69,f72,f75,f78,f81,f84,f87,f124,f204,f205,f206", "_": str(timestamp), } headers = {"Referer": "https://data.eastmoney.com/bkzj/hy.html"} try: return self._get_json(self.CLIST_ENDPOINT, params, headers=headers) except Exception: return self._get_json_via_curl(self.CLIST_ENDPOINT, params, headers=headers) def _get_json_via_curl( self, url: str, params: dict[str, Any], *, headers: dict[str, str] | None = None, ) -> dict[str, Any]: request_headers = {**self.default_headers, **(headers or {})} full_url = f"{url}?{urlencode(params)}" command = [ "curl.exe", "--silent", "--show-error", "--connect-timeout", "20", "--max-time", "30", full_url, ] for key, value in request_headers.items(): command.extend(["-H", f"{key}: {value}"]) last_error: Exception | None = None for attempt in range(3): try: completed = subprocess.run(command, capture_output=True, text=True, check=True) return self._parse_json_payload(completed.stdout) except Exception as exc: last_error = exc if attempt < 2: time.sleep(attempt + 1) if last_error is not None: raise last_error raise RuntimeError("curl fallback failed") def fetch_all_sector_realtime(self, *, sector_type_fs: str, page_size: int = 100) -> list[dict[str, Any]]: page_number = 1 pages = 1 rows: list[dict[str, Any]] = [] while page_number <= pages: response = self.fetch_sector_realtime_page( sector_type_fs=sector_type_fs, page_number=page_number, page_size=page_size, ) data = response.get("data") or {} page_rows = data.get("diff") or [] total = int(data.get("total") or 0) pages = max((total + page_size - 1) // page_size, 1) rows.extend(page_rows) if not page_rows: break time.sleep(0.15) page_number += 1 return rows def fetch_quote(self, secid: str) -> dict[str, Any]: return self._get_json( self.QUOTE_ENDPOINT, { "secid": secid, "fields": "f57,f58,f43,f169,f170,f62,f184,f66,f69,f72,f75,f78,f81,f84,f87", }, ) def fetch_stock_trends(self, secid: str, *, ndays: int = 1) -> dict[str, Any]: return self._get_json( self.STOCK_TRENDS_ENDPOINT, { "secid": secid, "fields1": "f1,f2,f3,f4,f5,f6,f7,f8", "fields2": "f51,f52,f53,f54,f55,f56,f57,f58", "iscr": 0, "ndays": ndays, }, ) def fetch_stock_kline(self, secid: str, *, limit: int = 120, klt: int = 101) -> dict[str, Any]: return self._get_json( self.STOCK_KLINE_ENDPOINT, { "secid": secid, "fields1": "f1,f2,f3,f4,f5,f6", "fields2": "f51,f52,f53,f54,f55,f56,f57,f58,f59,f60,f61", "klt": klt, "fqt": 0, "lmt": limit, "end": "20500101", }, ) def fetch_fund_flow_daykline(self, secid: str, *, limit: int = 600) -> dict[str, Any]: return self._get_json( self.FFLOW_DAYKLINE_ENDPOINT, { "lmt": limit, "klt": 101, "secid": secid, "fields1": "f1,f2,f3,f7", "fields2": "f51,f52,f53,f54,f55,f56,f57,f58,f59,f60,f61,f62,f63", }, ) def fetch_fund_flow_minute_kline(self, secid: str, *, limit: int = 1) -> dict[str, Any]: return self._get_json( self.FFLOW_MINUTE_KLINE_ENDPOINT, { "lmt": limit, "klt": 1, "secid": secid, "fields1": "f1,f2,f3,f7", "fields2": "f51,f52,f53,f54,f55,f56,f57,f58,f59,f60,f61,f62,f63", }, )