from __future__ import annotations import json import re from typing import Any from urllib.request import Request, urlopen class ThsEtfClient: TODAY_ENDPOINT = "https://d.10jqka.com.cn/v6/line/{market}_{code}/01/today.js" TIME_ENDPOINT = "https://d.10jqka.com.cn/v6/time/{market}_{code}/last.js" HISTORY_ENDPOINT = "https://d.10jqka.com.cn/v6/line/{market}_{code}/01/last.js" PROFILE_ENDPOINT = "https://fund.10jqka.com.cn/interface/net/index/0_{code}" def __init__(self) -> None: self.default_headers = { "User-Agent": "Mozilla/5.0", "Accept-Language": "zh-CN,zh;q=0.9", } def _get_text(self, url: str, *, referer: str | None = None) -> str: headers = dict(self.default_headers) if referer: headers["Referer"] = referer request = Request(url, headers=headers) with urlopen(request, timeout=20) as response: return response.read().decode("utf-8", "ignore") @staticmethod def _extract_wrapped_json(payload: str) -> dict[str, Any]: match = re.search(r"\((\{.*\})\)\s*$", payload.strip(), re.S) if not match: raise ValueError("unexpected ths payload") return json.loads(match.group(1)) @staticmethod def _fund_detail_url(code: str) -> str: return f"https://fund.10jqka.com.cn/{code}/" def fetch_profile(self, code: str) -> dict[str, Any]: url = self.PROFILE_ENDPOINT.format(code=code) return json.loads(self._get_text(url, referer=self._fund_detail_url(code))) def fetch_today_quote(self, market: str, code: str) -> dict[str, Any]: url = self.TODAY_ENDPOINT.format(market=market, code=code) payload = self._extract_wrapped_json(self._get_text(url, referer=self._fund_detail_url(code))) return payload.get(f"{market}_{code}", {}) def fetch_intraday_time(self, market: str, code: str) -> dict[str, Any]: url = self.TIME_ENDPOINT.format(market=market, code=code) payload = self._extract_wrapped_json(self._get_text(url, referer=self._fund_detail_url(code))) return payload.get(f"{market}_{code}", {}) def fetch_history(self, market: str, code: str) -> dict[str, Any]: url = self.HISTORY_ENDPOINT.format(market=market, code=code) return self._extract_wrapped_json(self._get_text(url, referer=self._fund_detail_url(code)))