from __future__ import annotations import threading from datetime import datetime, time, timedelta from zoneinfo import ZoneInfo from app.repositories.monitoring_repository import MonitoringRepository from app.services.ashare_flow_service import ashare_flow_service from app.services.eastmoney_sync_service import eastmoney_sync_service from app.services.etf_monitor_service import etf_monitor_service from app.services.market_clock import get_market_state class SyncScheduler: def __init__(self) -> None: self.repository = MonitoringRepository() self.tz = ZoneInfo("Asia/Shanghai") self._thread: threading.Thread | None = None self._stop_event = threading.Event() self._market_failure_count = 0 self._etf_failure_count = 0 def start(self) -> None: if self._thread and self._thread.is_alive(): return self._stop_event.clear() self._thread = threading.Thread(target=self._run, name="southbound-sync", daemon=True) self._thread.start() def stop(self) -> None: self._stop_event.set() if self._thread and self._thread.is_alive(): self._thread.join(timeout=2) def _run(self) -> None: history_ready = False while not self._stop_event.is_set(): now = datetime.now(self.tz) state = get_market_state(now) interval_seconds = self._get_wait_seconds(now, state) if state in {"trading_am", "trading_pm", "finalizing"}: if not history_ready: try: etf_monitor_service.ensure_history_backfilled() history_ready = True except Exception: self._etf_failure_count += 1 try: eastmoney_sync_service.sync() ashare_flow_service.sync_index_realtime() ashare_flow_service.sync_sector_realtime() self._market_failure_count = 0 except Exception: self._market_failure_count += 1 interval_seconds = max(interval_seconds, min(180, 30 * self._market_failure_count)) if self._is_etf_enabled(): try: etf_monitor_service.sync_group_realtime("broad") etf_monitor_service.sync_group_realtime("sector") self._etf_failure_count = 0 except Exception: self._etf_failure_count += 1 interval_seconds = max(interval_seconds, min(180, 15 * self._etf_failure_count)) else: self._market_failure_count = 0 self._etf_failure_count = 0 self._stop_event.wait(interval_seconds) def _get_wait_seconds(self, now: datetime, state: str) -> int: config = self.repository.get_system_config() realtime_interval = max(int(config.get("realtime_collection_interval_seconds", 60)), 15) etf_interval = max(int(config.get("etf_realtime_interval_seconds", realtime_interval)), 15) active_interval = min(realtime_interval, etf_interval) if self._is_etf_enabled() else realtime_interval if state in {"trading_am", "trading_pm", "finalizing"}: return active_interval if state == "midday_break": return self._seconds_until(now, time(13, 0)) if state == "pre_open": return self._seconds_until(now, time(9, 30)) return self._seconds_until_next_day_open(now) def _is_etf_enabled(self) -> bool: config = self.repository.get_system_config() return bool(config.get("etf_enabled", True)) def _seconds_until(self, now: datetime, target_time: time) -> int: target = datetime.combine(now.date(), target_time, tzinfo=self.tz) delta = (target - now).total_seconds() return max(int(delta), 15) def _seconds_until_next_day_open(self, now: datetime) -> int: next_open = datetime.combine(now.date() + timedelta(days=1), time(9, 30), tzinfo=self.tz) delta = (next_open - now).total_seconds() return max(int(delta), 300) sync_scheduler = SyncScheduler()