import base64 import json import re import urllib.error import urllib.request from datetime import datetime from pathlib import Path from uuid import uuid4 from fastapi import HTTPException, UploadFile from app.core.config import BASE_DIR, CAPITAL_IMAGE_DB_FILE, CAPITAL_IMAGE_UPLOADS_DIR from app.repositories.monitoring_repository import MonitoringRepository from app.repositories.capital_image_repository import CapitalImageRepository def _extract_json_block(content: str) -> dict: fenced_match = re.search(r"```json\s*(\{.*?\})\s*```", content, flags=re.DOTALL) if fenced_match: return json.loads(fenced_match.group(1)) object_match = re.search(r"(\{.*\})", content, flags=re.DOTALL) if object_match: return json.loads(object_match.group(1)) raise ValueError("No JSON object found in model output") class CapitalImageService: def __init__(self) -> None: self.repository = CapitalImageRepository(CAPITAL_IMAGE_DB_FILE) self.monitoring_repository = MonitoringRepository() def list_records(self, trade_date: str | None = None, subject: str | None = None) -> dict: items = [ self._serialize_record(record) for record in self.repository.list_records(trade_date=trade_date, subject=subject) ] return {"items": items, "total": len(items)} def get_record(self, record_id: str) -> dict: record = self.repository.get_record(record_id) if record is None: raise HTTPException(status_code=404, detail="Record not found") return self._serialize_record(record) async def create_record( self, upload_file: UploadFile, trade_date: str | None = None, subject: str | None = None, ) -> dict: suffix = Path(upload_file.filename or "upload.jpg").suffix or ".jpg" record_id = uuid4().hex image_name = upload_file.filename or f"{record_id}{suffix}" stored_path = CAPITAL_IMAGE_UPLOADS_DIR / f"{record_id}{suffix.lower()}" binary = await upload_file.read() stored_path.parent.mkdir(parents=True, exist_ok=True) stored_path.write_bytes(binary) extraction = self._extract_from_image( image_bytes=binary, original_filename=image_name, stored_path=stored_path, trade_date=trade_date, subject=subject, ) now = datetime.now().isoformat(timespec="seconds") payload = { "id": record_id, "trade_date": extraction.get("trade_date") or trade_date, "subject": extraction.get("subject") or subject, "snapshot_time": extraction.get("snapshot_time"), "main_force_amount_yi": extraction.get("main_force_amount_yi"), "institution_amount_yi": extraction.get("institution_amount_yi"), "large_household_amount_yi": extraction.get("large_household_amount_yi"), "retail_amount_yi": extraction.get("retail_amount_yi"), "overall_trend": extraction.get("overall_trend"), "intraday_summary": extraction.get("intraday_summary"), "review_status": extraction.get("review_status", "pending_review"), "extraction_method": extraction.get("extraction_method", "fallback"), "image_name": image_name, "image_path": str(stored_path), "raw_extraction": extraction, "created_at": now, "updated_at": now, } record = self.repository.insert_record(payload) return {"item": self._serialize_record(record)} def _extract_from_image( self, image_bytes: bytes, original_filename: str, stored_path: Path, trade_date: str | None, subject: str | None, ) -> dict: llm_config = self._get_llm_config() if llm_config["api_key"]: try: return self._extract_via_model( image_bytes=image_bytes, trade_date=trade_date, subject=subject, llm_config=llm_config, ) except Exception as exc: # pragma: no cover return { **self._build_fallback_payload(original_filename, trade_date, subject), "review_status": "pending_review", "extraction_method": "fallback_after_model_error", "model_error": str(exc), } sidecar_payload = self._load_sidecar_payload(original_filename) if sidecar_payload is not None: return sidecar_payload return self._build_fallback_payload(original_filename, trade_date, subject) def _extract_via_model( self, image_bytes: bytes, trade_date: str | None, subject: str | None, llm_config: dict, ) -> dict: api_key = llm_config["api_key"] base_url = llm_config["base_url"].rstrip("/") model = llm_config["model"] encoded_image = base64.b64encode(image_bytes).decode("utf-8") prompt = """ You are extracting structured data from a Chinese stock capital flow screenshot. Return only JSON with these keys: trade_date, subject, snapshot_time, main_force_amount_yi, institution_amount_yi, large_household_amount_yi, retail_amount_yi, overall_trend, intraday_summary, review_status, extraction_method. Rules: 1. intraday_summary must describe only the intraday capital-flow trend, not repeat raw numbers. 2. overall_trend should be a short phrase like "震荡上行", "冲高回落", "弱势下探", "午后修复". 3. If a number is not clearly visible, set it to null. 4. review_status should be "extracted". 5. extraction_method should be "vision_model". 6. If trade_date is absent in the image, keep null. """ payload = { "model": model, "messages": [ { "role": "system", "content": "You extract structured JSON from Chinese capital-flow screenshots." }, { "role": "user", "content": [ {"type": "text", "text": prompt}, { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{encoded_image}", }, }, ], } ], } request = urllib.request.Request( url=f"{base_url}/chat/completions", data=json.dumps(payload).encode("utf-8"), headers={ "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", }, method="POST", ) try: with urllib.request.urlopen(request, timeout=180) as response: response_payload = json.loads(response.read().decode("utf-8")) except urllib.error.HTTPError as exc: # pragma: no cover error_text = exc.read().decode("utf-8", errors="ignore") raise RuntimeError(f"Model request failed: {error_text}") from exc choices = response_payload.get("choices", []) content = "" if choices: content = choices[0].get("message", {}).get("content", "") parsed = _extract_json_block(content) if subject and not parsed.get("subject"): parsed["subject"] = subject if trade_date and not parsed.get("trade_date"): parsed["trade_date"] = trade_date return parsed def _get_llm_config(self) -> dict: config = self.monitoring_repository.get_system_config() return { "provider": config.get("llm_provider", "openai_compatible"), "api_key": config.get("llm_api_key", ""), "base_url": config.get("llm_base_url", "https://api.openai.com/v1"), "model": config.get("llm_vision_model", "gpt-4.1-mini"), } def _load_sidecar_payload(self, original_filename: str) -> dict | None: candidate_paths = [ BASE_DIR.parent / "zijin" / f"{Path(original_filename).stem}.json", BASE_DIR / "data" / "capital_images" / f"{Path(original_filename).stem}.json", ] for candidate in candidate_paths: if candidate.exists(): payload = json.loads(candidate.read_text(encoding="utf-8")) capital_flow = payload.get("capital_flow_amounts", {}) overall_trend = payload.get("overall_trend", {}) intraday_summary = overall_trend.get("summary") or payload.get("llm_summary") return { "trade_date": payload.get("date"), "subject": payload.get("subject"), "snapshot_time": payload.get("snapshot_time"), "main_force_amount_yi": capital_flow.get("main_force_yi"), "institution_amount_yi": capital_flow.get("institution_yi"), "large_household_amount_yi": capital_flow.get("large_household_yi"), "retail_amount_yi": capital_flow.get("retail_yi"), "overall_trend": overall_trend.get("direction"), "intraday_summary": intraday_summary, "review_status": "sidecar_loaded", "extraction_method": "sidecar_json", "sidecar_path": str(candidate), } return None def _build_fallback_payload( self, original_filename: str, trade_date: str | None, subject: str | None, ) -> dict: return { "trade_date": trade_date, "subject": subject, "snapshot_time": None, "main_force_amount_yi": None, "institution_amount_yi": None, "large_household_amount_yi": None, "retail_amount_yi": None, "overall_trend": "待识别", "intraday_summary": "当前未配置视觉模型,图片已保存,待接入大模型后补充日内资金走势总结。", "review_status": "pending_review", "extraction_method": "storage_only", "fallback_reason": f"No vision model configured for {original_filename}", } def _serialize_record(self, record: dict) -> dict: return { **record, "image_url": f"/capital-images/uploads/{Path(record['image_path']).name}", } capital_image_service = CapitalImageService()