2026-03-20 21:47:30 +08:00
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
|
|
import threading
|
|
|
|
|
from datetime import datetime, time, timedelta
|
|
|
|
|
from zoneinfo import ZoneInfo
|
|
|
|
|
|
|
|
|
|
from app.repositories.monitoring_repository import MonitoringRepository
|
|
|
|
|
from app.services.ashare_flow_service import ashare_flow_service
|
|
|
|
|
from app.services.eastmoney_sync_service import eastmoney_sync_service
|
2026-04-08 20:04:40 +08:00
|
|
|
from app.services.etf_monitor_service import etf_monitor_service
|
2026-03-20 21:47:30 +08:00
|
|
|
from app.services.market_clock import get_market_state
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class SyncScheduler:
|
|
|
|
|
def __init__(self) -> None:
|
|
|
|
|
self.repository = MonitoringRepository()
|
|
|
|
|
self.tz = ZoneInfo("Asia/Shanghai")
|
|
|
|
|
self._thread: threading.Thread | None = None
|
|
|
|
|
self._stop_event = threading.Event()
|
2026-04-08 20:04:40 +08:00
|
|
|
self._market_failure_count = 0
|
|
|
|
|
self._etf_failure_count = 0
|
2026-03-20 21:47:30 +08:00
|
|
|
|
|
|
|
|
def start(self) -> None:
|
|
|
|
|
if self._thread and self._thread.is_alive():
|
|
|
|
|
return
|
|
|
|
|
self._stop_event.clear()
|
|
|
|
|
self._thread = threading.Thread(target=self._run, name="southbound-sync", daemon=True)
|
|
|
|
|
self._thread.start()
|
|
|
|
|
|
|
|
|
|
def stop(self) -> None:
|
|
|
|
|
self._stop_event.set()
|
|
|
|
|
if self._thread and self._thread.is_alive():
|
|
|
|
|
self._thread.join(timeout=2)
|
|
|
|
|
|
|
|
|
|
def _run(self) -> None:
|
2026-04-08 20:04:40 +08:00
|
|
|
history_ready = False
|
2026-03-20 21:47:30 +08:00
|
|
|
while not self._stop_event.is_set():
|
|
|
|
|
now = datetime.now(self.tz)
|
|
|
|
|
state = get_market_state(now)
|
|
|
|
|
interval_seconds = self._get_wait_seconds(now, state)
|
|
|
|
|
|
|
|
|
|
if state in {"trading_am", "trading_pm", "finalizing"}:
|
2026-04-08 20:04:40 +08:00
|
|
|
if not history_ready:
|
|
|
|
|
try:
|
|
|
|
|
etf_monitor_service.ensure_history_backfilled()
|
|
|
|
|
history_ready = True
|
|
|
|
|
except Exception:
|
|
|
|
|
self._etf_failure_count += 1
|
|
|
|
|
|
2026-03-20 21:47:30 +08:00
|
|
|
try:
|
|
|
|
|
eastmoney_sync_service.sync()
|
|
|
|
|
ashare_flow_service.sync_index_realtime()
|
|
|
|
|
ashare_flow_service.sync_sector_realtime()
|
2026-04-08 20:04:40 +08:00
|
|
|
self._market_failure_count = 0
|
2026-03-20 21:47:30 +08:00
|
|
|
except Exception:
|
2026-04-08 20:04:40 +08:00
|
|
|
self._market_failure_count += 1
|
|
|
|
|
interval_seconds = max(interval_seconds, min(180, 30 * self._market_failure_count))
|
|
|
|
|
|
|
|
|
|
if self._is_etf_enabled():
|
|
|
|
|
try:
|
|
|
|
|
etf_monitor_service.sync_group_realtime("broad")
|
|
|
|
|
etf_monitor_service.sync_group_realtime("sector")
|
|
|
|
|
self._etf_failure_count = 0
|
|
|
|
|
except Exception:
|
|
|
|
|
self._etf_failure_count += 1
|
|
|
|
|
interval_seconds = max(interval_seconds, min(180, 15 * self._etf_failure_count))
|
2026-03-20 21:47:30 +08:00
|
|
|
else:
|
2026-04-08 20:04:40 +08:00
|
|
|
self._market_failure_count = 0
|
|
|
|
|
self._etf_failure_count = 0
|
2026-03-20 21:47:30 +08:00
|
|
|
|
|
|
|
|
self._stop_event.wait(interval_seconds)
|
|
|
|
|
|
|
|
|
|
def _get_wait_seconds(self, now: datetime, state: str) -> int:
|
|
|
|
|
config = self.repository.get_system_config()
|
|
|
|
|
realtime_interval = max(int(config.get("realtime_collection_interval_seconds", 60)), 15)
|
2026-04-08 20:04:40 +08:00
|
|
|
etf_interval = max(int(config.get("etf_realtime_interval_seconds", realtime_interval)), 15)
|
|
|
|
|
active_interval = min(realtime_interval, etf_interval) if self._is_etf_enabled() else realtime_interval
|
2026-03-20 21:47:30 +08:00
|
|
|
|
|
|
|
|
if state in {"trading_am", "trading_pm", "finalizing"}:
|
2026-04-08 20:04:40 +08:00
|
|
|
return active_interval
|
2026-03-20 21:47:30 +08:00
|
|
|
if state == "midday_break":
|
|
|
|
|
return self._seconds_until(now, time(13, 0))
|
|
|
|
|
if state == "pre_open":
|
|
|
|
|
return self._seconds_until(now, time(9, 30))
|
|
|
|
|
return self._seconds_until_next_day_open(now)
|
|
|
|
|
|
2026-04-08 20:04:40 +08:00
|
|
|
def _is_etf_enabled(self) -> bool:
|
|
|
|
|
config = self.repository.get_system_config()
|
|
|
|
|
return bool(config.get("etf_enabled", True))
|
|
|
|
|
|
2026-03-20 21:47:30 +08:00
|
|
|
def _seconds_until(self, now: datetime, target_time: time) -> int:
|
|
|
|
|
target = datetime.combine(now.date(), target_time, tzinfo=self.tz)
|
|
|
|
|
delta = (target - now).total_seconds()
|
|
|
|
|
return max(int(delta), 15)
|
|
|
|
|
|
|
|
|
|
def _seconds_until_next_day_open(self, now: datetime) -> int:
|
|
|
|
|
next_open = datetime.combine(now.date() + timedelta(days=1), time(9, 30), tzinfo=self.tz)
|
|
|
|
|
delta = (next_open - now).total_seconds()
|
|
|
|
|
return max(int(delta), 300)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
sync_scheduler = SyncScheduler()
|