chore: initialize lhbfx project and documentation

This commit is contained in:
wanghep
2026-04-17 21:20:26 +08:00
commit 5a5dd3c9fd
54 changed files with 11185 additions and 0 deletions

View File

@ -0,0 +1,92 @@
app:
name: "lhbfx"
timezone: "Asia/Shanghai"
environment: "local"
database:
driver: "mysql"
host: "127.0.0.1"
port: 3306
username: "your_username"
password: "your_password"
database: "lhbfx"
charset: "utf8mb4"
pool_size: 10
connect_timeout_seconds: 10
data_sources:
lhb_priority:
- "tonghuashun"
quote_priority:
- "tonghuashun"
fundamentals_priority:
- "tonghuashun"
- "akshare"
monitoring:
target_year: 2026
warning_days_without_action: 4
max_position_ratio: 0.10
fib_levels:
- 0.382
- 0.5
- 0.618
moving_average_days:
- 5
turnover_warning_threshold: 0.30
traders:
- name: "章盟主"
alias: "章建平"
style_tags:
- "板块中军"
- "权重大票"
- "趋势强化"
- "大资金体量"
warning_weight: "high"
match_keywords:
- "上海长宁区江苏路"
- "上海浦东新区海阳西路"
- "宁波彩虹北路"
seats:
core:
- "国泰海通证券上海江苏路"
active:
- "国泰海通证券上海海阳西路"
- "国泰海通证券宁波彩虹北路"
history: []
- name: "炒股养家"
alias: ""
style_tags:
- "情绪周期"
- "题材确认"
- "高辨识度龙头"
warning_weight: "medium"
match_keywords:
- "上海宛平南路"
- "上海茅台路"
seats:
core:
- "华鑫证券上海宛平南路"
active:
- "华鑫证券上海茅台路"
history: []
- name: "欢乐海岸"
alias: ""
style_tags:
- "高位强势股"
- "妖股"
- "封板后锁仓"
- "高溢价品牌效应"
warning_weight: "high"
match_keywords:
- "中信证券深圳总部"
- "中信证券深圳后海"
seats:
core:
- "中信证券深圳总部"
- "中信证券深圳后海"
active: []
history: []

17
backend/pyproject.toml Normal file
View File

@ -0,0 +1,17 @@
[project]
name = "lhbfx"
version = "0.1.0"
description = "龙虎榜游资监控系统"
requires-python = ">=3.11"
dependencies = [
"PyMySQL>=1.1.1",
"PyYAML>=6.0.2",
"requests>=2.32.3",
"beautifulsoup4>=4.12.3",
"fastapi>=0.116.1",
"uvicorn>=0.35.0",
"jinja2>=3.1.6",
]
[tool.setuptools]
package-dir = {"" = "src"}

View File

@ -0,0 +1,11 @@
from __future__ import annotations
import sys
from pathlib import Path
def add_src_to_path() -> None:
backend_dir = Path(__file__).resolve().parents[1]
src_dir = backend_dir / "src"
if str(src_dir) not in sys.path:
sys.path.insert(0, str(src_dir))

View File

@ -0,0 +1,21 @@
from __future__ import annotations
from _bootstrap import add_src_to_path
add_src_to_path()
from lhbfx.pipeline import generate_warnings
def main() -> None:
result = generate_warnings()
print(
"预警生成完成:"
f"sell_alert={result['sell_alert']}, "
f"slow_exit_watch={result['slow_exit_watch']}, "
f"total={result['total']}"
)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,25 @@
from __future__ import annotations
import argparse
from datetime import datetime
from _bootstrap import add_src_to_path
add_src_to_path()
from lhbfx.pipeline import import_daily
def main() -> None:
parser = argparse.ArgumentParser(description="导入同花顺某日全量龙虎榜数据")
parser.add_argument("--date", default=datetime.now().strftime("%Y-%m-%d"), help="交易日期,格式 YYYY-MM-DD")
args = parser.parse_args()
result = import_daily(args.date)
print(f"导入日期: {args.date}")
print(f"概览记录数: {result.overview_count}")
print(f"营业部明细数: {result.detail_count}")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,56 @@
from __future__ import annotations
import argparse
from datetime import datetime
from pathlib import Path
from _bootstrap import add_src_to_path
add_src_to_path()
from lhbfx.pipeline import import_range
def log_line(log_file: Path, message: str) -> None:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
line = f"[{timestamp}] {message}"
print(line, flush=True)
log_file.parent.mkdir(parents=True, exist_ok=True)
with log_file.open("a", encoding="utf-8") as f:
f.write(line + "\n")
def main() -> None:
parser = argparse.ArgumentParser(description="按工作日批量导入某年同花顺龙虎榜数据")
parser.add_argument("--year", type=int, default=2026, help="年份,默认 2026")
parser.add_argument("--start", default=None, help="起始日期 YYYY-MM-DD")
parser.add_argument("--end", default=None, help="结束日期 YYYY-MM-DD")
parser.add_argument("--limit", type=int, default=None, help="限制导入的交易日数量")
parser.add_argument("--log-file", default=None, help="日志文件路径")
args = parser.parse_args()
default_log = Path(__file__).resolve().parents[1] / "output" / "logs" / f"import_{args.year}.log"
log_file = Path(args.log_file) if args.log_file else default_log
log_line(
log_file,
f"开始导入 year={args.year}, start={args.start}, end={args.end}, limit={args.limit}",
)
result = import_range(year=args.year, start=args.start, end=args.end, limit=args.limit)
for item in result["success"]:
log_line(
log_file,
f"{item['trade_date']} -> 成功, 概览 {item['overview_count']} 条, 明细 {item['detail_count']}",
)
for item in result["failed"]:
log_line(log_file, f"{item['trade_date']} -> 失败, error={item['error']}")
log_line(
log_file,
f"导入完成 success={result['success_count']}, failed={result['failed_count']}, total={result['requested_days']}",
)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,75 @@
from pathlib import Path
from _bootstrap import add_src_to_path
add_src_to_path()
from lhbfx.config import AppConfig, load_config
from lhbfx.db import db_cursor, execute_schema
from lhbfx.seed import seed_traders
def _column_exists(cursor, schema_name: str, table_name: str, column_name: str) -> bool:
cursor.execute(
"""
SELECT 1
FROM information_schema.COLUMNS
WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s AND COLUMN_NAME = %s
LIMIT 1
""",
(schema_name, table_name, column_name),
)
return cursor.fetchone() is not None
def _index_exists(cursor, schema_name: str, table_name: str, index_name: str) -> bool:
cursor.execute(
"""
SELECT 1
FROM information_schema.STATISTICS
WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s AND INDEX_NAME = %s
LIMIT 1
""",
(schema_name, table_name, index_name),
)
return cursor.fetchone() is not None
def apply_incremental_alters(config: AppConfig) -> None:
schema_name = config.database.database
with db_cursor(config=config) as (_, cursor):
if not _column_exists(cursor, schema_name, "lhb_overview", "rid"):
cursor.execute("ALTER TABLE lhb_overview ADD COLUMN rid VARCHAR(32) NULL AFTER flag")
if not _column_exists(cursor, schema_name, "lhb_overview", "detail_url"):
cursor.execute("ALTER TABLE lhb_overview ADD COLUMN detail_url VARCHAR(255) NULL AFTER net_buy")
if not _index_exists(cursor, schema_name, "lhb_overview", "uniq_lhb_overview_record"):
cursor.execute("ALTER TABLE lhb_overview ADD UNIQUE KEY uniq_lhb_overview_record (trade_date, stock_code, rid)")
if not _column_exists(cursor, schema_name, "lhb_detail_seats", "rid"):
cursor.execute("ALTER TABLE lhb_detail_seats ADD COLUMN rid VARCHAR(32) NULL AFTER stock_name")
if not _column_exists(cursor, schema_name, "lhb_detail_seats", "section_title"):
cursor.execute("ALTER TABLE lhb_detail_seats ADD COLUMN section_title VARCHAR(255) NULL AFTER rid")
if not _column_exists(cursor, schema_name, "lhb_detail_seats", "buy_ratio"):
cursor.execute("ALTER TABLE lhb_detail_seats ADD COLUMN buy_ratio VARCHAR(32) NULL AFTER buy_amount_wan")
if not _column_exists(cursor, schema_name, "lhb_detail_seats", "sell_ratio"):
cursor.execute("ALTER TABLE lhb_detail_seats ADD COLUMN sell_ratio VARCHAR(32) NULL AFTER sell_amount_wan")
if not _column_exists(cursor, schema_name, "lhb_detail_seats", "detail_url"):
cursor.execute("ALTER TABLE lhb_detail_seats ADD COLUMN detail_url VARCHAR(255) NULL AFTER matched_seat")
if not _index_exists(cursor, schema_name, "lhb_detail_seats", "uniq_lhb_detail_record"):
cursor.execute(
"ALTER TABLE lhb_detail_seats ADD UNIQUE KEY uniq_lhb_detail_record (trade_date, stock_code, rid, table_title, seat_name)"
)
def main() -> None:
root = Path(__file__).resolve().parents[1]
schema_path = root / "src" / "lhbfx" / "schema.sql"
config = load_config()
execute_schema(schema_path=schema_path, config=config)
apply_incremental_alters(config)
seed_traders(config=config)
print("数据库建表完成,游资配置已写入数据库。")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,16 @@
from __future__ import annotations
from _bootstrap import add_src_to_path
add_src_to_path()
from lhbfx.pipeline import rematch_traders
def main() -> None:
result = rematch_traders()
print(f"重新匹配完成,处理记录数: {result['updated']}")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,14 @@
from __future__ import annotations
import uvicorn
from _bootstrap import add_src_to_path
def main() -> None:
add_src_to_path()
uvicorn.run("lhbfx.app:app", host="127.0.0.1", port=8000, reload=False)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,2 @@
"""lhbfx package."""

170
backend/src/lhbfx/app.py Normal file
View File

@ -0,0 +1,170 @@
from __future__ import annotations
from pathlib import Path
from fastapi import FastAPI, HTTPException
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from .pipeline import (
generate_warnings,
get_pipeline_status,
import_daily,
import_range,
refresh_trade_date,
rematch_traders,
)
from .queries import (
fetch_stock_detail,
fetch_summary,
fetch_trader_actions,
fetch_trader_detail,
fetch_traders,
fetch_watchlist,
fetch_warnings,
delete_watchlist_item,
upsert_watchlist_item,
)
BASE_DIR = Path(__file__).resolve().parents[2]
PROJECT_DIR = BASE_DIR.parent
FRONTEND_DIST_DIR = PROJECT_DIR / "frontend" / "dist"
FRONTEND_ASSETS_DIR = FRONTEND_DIST_DIR / "assets"
app = FastAPI(title="lhbfx API", version="0.1.0")
class WatchlistPayload(BaseModel):
stock_code: str
stock_name: str
source_trade_date: str | None = None
source_trader_name: str | None = None
if FRONTEND_ASSETS_DIR.exists():
app.mount("/assets", StaticFiles(directory=str(FRONTEND_ASSETS_DIR)), name="assets")
@app.get("/api/summary")
def api_summary():
return fetch_summary()
@app.get("/api/warnings")
def api_warnings(limit: int = 50):
return fetch_warnings(limit=limit)
@app.get("/api/traders")
def api_traders():
return fetch_traders()
@app.get("/api/watchlist")
def api_watchlist(include_archived: bool = False):
return fetch_watchlist(include_archived=include_archived)
@app.post("/api/watchlist")
def api_watchlist_upsert(payload: WatchlistPayload):
return upsert_watchlist_item(
stock_code=payload.stock_code,
stock_name=payload.stock_name,
source_trade_date=payload.source_trade_date,
source_trader_name=payload.source_trader_name,
)
@app.delete("/api/watchlist/{stock_code}")
def api_watchlist_delete(stock_code: str):
deleted = delete_watchlist_item(stock_code)
if not deleted:
raise HTTPException(status_code=404, detail="watchlist item not found")
return {"ok": True, "stock_code": stock_code}
@app.get("/api/actions")
def api_trader_actions(
trade_date: str | None = None,
date_from: str | None = None,
date_to: str | None = None,
trader_name: str | None = None,
limit: int = 200,
):
return fetch_trader_actions(
trade_date=trade_date,
date_from=date_from,
date_to=date_to,
trader_name=trader_name,
limit=limit,
)
@app.get("/api/traders/{trader_id}")
def api_trader_detail(trader_id: int):
data = fetch_trader_detail(trader_id)
if not data:
raise HTTPException(status_code=404, detail="trader not found")
return data
@app.get("/api/stocks/{stock_code}")
def api_stock_detail(stock_code: str):
data = fetch_stock_detail(stock_code)
if not data:
raise HTTPException(status_code=404, detail="stock not found")
return data
@app.get("/api/pipeline/status")
def api_pipeline_status():
return get_pipeline_status()
@app.post("/api/pipeline/import/daily")
def api_import_daily(trade_date: str):
result = import_daily(trade_date)
return {
"trade_date": result.trade_date,
"overview_count": result.overview_count,
"detail_count": result.detail_count,
}
@app.post("/api/pipeline/import/range")
def api_import_range(
year: int | None = None,
start: str | None = None,
end: str | None = None,
limit: int | None = None,
):
return import_range(year=year, start=start, end=end, limit=limit)
@app.post("/api/pipeline/rematch-traders")
def api_rematch_traders():
return rematch_traders()
@app.post("/api/pipeline/generate-warnings")
def api_generate_warnings(clear_existing: bool = True):
return generate_warnings(clear_existing=clear_existing)
@app.post("/api/pipeline/refresh")
def api_refresh_trade_date(trade_date: str, regenerate_warnings: bool = True):
return refresh_trade_date(trade_date, regenerate_warnings=regenerate_warnings)
@app.get("/")
def index():
html_path = FRONTEND_DIST_DIR / "index.html"
if not html_path.exists():
return {
"name": "lhbfx API",
"status": "frontend_pending",
"message": "Frontend dist has not been built yet. Use the Vue dev server during development or /api/* endpoints.",
}
return FileResponse(str(html_path))

View File

@ -0,0 +1,63 @@
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import yaml
DEFAULT_CONFIG_PATH = Path(__file__).resolve().parents[2] / "config.yaml"
@dataclass(slots=True)
class DatabaseConfig:
driver: str
host: str
port: int
username: str
password: str
database: str
charset: str = "utf8mb4"
pool_size: int = 10
connect_timeout_seconds: int = 10
class AppConfig:
def __init__(self, raw: dict[str, Any], path: Path) -> None:
self.raw = raw
self.path = path
@property
def database(self) -> DatabaseConfig:
db = self.raw["database"]
return DatabaseConfig(
driver=db["driver"],
host=db["host"],
port=int(db["port"]),
username=db["username"],
password=db["password"],
database=db["database"],
charset=db.get("charset", "utf8mb4"),
pool_size=int(db.get("pool_size", 10)),
connect_timeout_seconds=int(db.get("connect_timeout_seconds", 10)),
)
@property
def traders(self) -> list[dict[str, Any]]:
return self.raw.get("traders", [])
@property
def monitoring(self) -> dict[str, Any]:
return self.raw.get("monitoring", {})
@property
def data_sources(self) -> dict[str, Any]:
return self.raw.get("data_sources", {})
def load_config(path: str | Path | None = None) -> AppConfig:
config_path = Path(path) if path else DEFAULT_CONFIG_PATH
raw = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {}
return AppConfig(raw=raw, path=config_path)

53
backend/src/lhbfx/db.py Normal file
View File

@ -0,0 +1,53 @@
from __future__ import annotations
from contextlib import contextmanager
from pathlib import Path
from typing import Iterator
import pymysql
from pymysql.connections import Connection
from pymysql.cursors import DictCursor
from .config import AppConfig, load_config
def get_connection(config: AppConfig | None = None, autocommit: bool = False) -> Connection:
cfg = config or load_config()
db = cfg.database
timeout = max(db.connect_timeout_seconds, 120)
return pymysql.connect(
host=db.host,
port=db.port,
user=db.username,
password=db.password,
database=db.database,
charset=db.charset,
cursorclass=DictCursor,
autocommit=autocommit,
connect_timeout=db.connect_timeout_seconds,
read_timeout=timeout,
write_timeout=timeout,
)
@contextmanager
def db_cursor(config: AppConfig | None = None, autocommit: bool = False) -> Iterator:
conn = get_connection(config=config, autocommit=autocommit)
try:
with conn.cursor() as cursor:
yield conn, cursor
if not autocommit:
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
def execute_schema(schema_path: str | Path, config: AppConfig | None = None) -> None:
sql = Path(schema_path).read_text(encoding="utf-8")
statements = [stmt.strip() for stmt in sql.split(";") if stmt.strip()]
with db_cursor(config=config, autocommit=False) as (_, cursor):
for statement in statements:
cursor.execute(statement)

View File

@ -0,0 +1,511 @@
from __future__ import annotations
import json
import time
from dataclasses import dataclass
from datetime import date, timedelta
from typing import Any
from .config import AppConfig, load_config
from .db import db_cursor
from .sources.tonghuashun import TongHuaShunClient
BUY_TABLE_TITLE = "买入金额最大的前5名营业部"
SELL_TABLE_TITLE = "卖出金额最大的前5名营业部"
@dataclass(slots=True)
class ImportResult:
trade_date: str
overview_count: int
detail_count: int
def build_seat_matchers(config: AppConfig) -> list[tuple[str, str, str]]:
matchers: list[tuple[str, str, str]] = []
for trader in config.traders:
trader_name = trader["name"]
for _, seats in trader.get("seats", {}).items():
for seat in seats or []:
matchers.append((trader_name, seat, "seat"))
for keyword in trader.get("match_keywords", []) or []:
matchers.append((trader_name, keyword, "keyword"))
return matchers
def normalize_seat_name(seat_name: str) -> str:
text = seat_name.replace("股份有限公司", "")
text = text.replace("有限责任公司", "")
text = text.replace("有限公司", "")
text = text.replace("证券营业部", "")
text = text.replace("证券", "")
text = text.replace("分公司", "")
text = text.replace("股份", "")
text = text.replace("责任公司", "")
return text.strip()
def match_trader(seat_name: str, matchers: list[tuple[str, str, str]]) -> tuple[str | None, str | None]:
normalized = normalize_seat_name(seat_name)
for trader_name, seat, _match_type in matchers:
if not seat:
continue
if seat in seat_name or seat in normalized:
return trader_name, seat
return None, None
def upsert_stock(cursor, row: dict[str, Any]) -> None:
cursor.execute(
"""
INSERT INTO stocks (stock_code, stock_name)
VALUES (%s, %s)
ON DUPLICATE KEY UPDATE
stock_name = VALUES(stock_name)
""",
(row["stock_code"], row["stock_name"]),
)
def upsert_overview(cursor, row: dict[str, Any]) -> None:
cursor.execute(
"""
INSERT INTO lhb_overview (
trade_date, stock_code, stock_name, flag, rid, price, pct_chg,
amount, net_buy, detail_url, source, raw_payload
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 'tonghuashun', %s)
ON DUPLICATE KEY UPDATE
stock_name = VALUES(stock_name),
flag = VALUES(flag),
price = VALUES(price),
pct_chg = VALUES(pct_chg),
amount = VALUES(amount),
net_buy = VALUES(net_buy),
detail_url = VALUES(detail_url),
raw_payload = VALUES(raw_payload)
""",
(
row["trade_date"],
row["stock_code"],
row["stock_name"],
row["flag"],
row["rid"],
row["price"],
row["pct_chg"],
row["amount"],
row["net_buy"],
row["detail_url"],
json.dumps(row, ensure_ascii=False),
),
)
def resolve_table_title(section_title: str) -> str:
if "买入金额最大的前5名" in section_title:
return BUY_TABLE_TITLE
return SELL_TABLE_TITLE
def upsert_detail_rows(cursor, detail: dict[str, Any], matchers: list[tuple[str, str, str]]) -> int:
count = 0
for section in detail["sections"]:
section_title = section.get("section_title", "")
table_title = resolve_table_title(section_title)
for row in section.get("rows", []):
trader_name, matched_seat = match_trader(row["seat_name"], matchers)
cursor.execute(
"""
INSERT INTO lhb_detail_seats (
trade_date, stock_code, stock_name, rid, section_title, table_title, seat_name,
buy_amount_wan, buy_ratio, sell_amount_wan, sell_ratio, net_amount_wan,
matched_trader_name, matched_seat, detail_url, source, raw_payload
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 'tonghuashun', %s)
ON DUPLICATE KEY UPDATE
buy_amount_wan = VALUES(buy_amount_wan),
buy_ratio = VALUES(buy_ratio),
sell_amount_wan = VALUES(sell_amount_wan),
sell_ratio = VALUES(sell_ratio),
net_amount_wan = VALUES(net_amount_wan),
matched_trader_name = VALUES(matched_trader_name),
matched_seat = VALUES(matched_seat),
detail_url = VALUES(detail_url),
raw_payload = VALUES(raw_payload)
""",
(
detail["trade_date"],
detail["stock_code"],
detail["title"].replace("龙虎榜数据", "").strip(),
detail["rid"],
section_title,
table_title,
row["seat_name"],
row["buy_amount_wan"],
row["buy_ratio"],
row["sell_amount_wan"],
row["sell_ratio"],
row["net_amount_wan"],
trader_name,
matched_seat,
detail["detail_url"],
json.dumps({"detail": detail, "row": row}, ensure_ascii=False),
),
)
count += 1
return count
def import_daily(
trade_date: str,
*,
config: AppConfig | None = None,
max_retries: int = 3,
retry_delay_seconds: float = 2.0,
) -> ImportResult:
cfg = config or load_config()
client = TongHuaShunClient(config=cfg)
matchers = build_seat_matchers(cfg)
last_error: Exception | None = None
for attempt in range(1, max_retries + 1):
try:
overview_rows = client.fetch_daily_overview(trade_date)
overview_count = 0
detail_count = 0
with db_cursor(config=cfg) as (_, cursor):
for row in overview_rows:
upsert_stock(cursor, row)
upsert_overview(cursor, row)
overview_count += 1
rid = row.get("rid") or ""
if not rid:
continue
detail = client.fetch_record_detail(
stock_code=row["stock_code"],
trade_date=trade_date,
rid=rid,
)
detail_count += upsert_detail_rows(cursor, detail, matchers)
return ImportResult(
trade_date=trade_date,
overview_count=overview_count,
detail_count=detail_count,
)
except Exception as exc:
last_error = exc
if attempt >= max_retries:
break
time.sleep(retry_delay_seconds)
if last_error is not None:
raise last_error
return ImportResult(trade_date=trade_date, overview_count=0, detail_count=0)
def iter_weekdays(year: int):
current = date(year, 1, 1)
end = date(year, 12, 31)
while current <= end:
if current.weekday() < 5:
yield current.isoformat()
current += timedelta(days=1)
def build_schedule(year: int, start: str | None = None, end: str | None = None) -> list[str]:
dates: list[str] = []
for trade_date in iter_weekdays(year):
if start and trade_date < start:
continue
if end and trade_date > end:
continue
dates.append(trade_date)
return dates
def import_range(
*,
year: int | None = None,
start: str | None = None,
end: str | None = None,
limit: int | None = None,
config: AppConfig | None = None,
) -> dict[str, Any]:
schedule = build_schedule(year or date.today().year, start=start, end=end)
if limit is not None:
schedule = schedule[:limit]
success: list[dict[str, Any]] = []
failed: list[dict[str, Any]] = []
for trade_date in schedule:
try:
result = import_daily(trade_date, config=config)
success.append(
{
"trade_date": result.trade_date,
"overview_count": result.overview_count,
"detail_count": result.detail_count,
}
)
except Exception as exc:
failed.append({"trade_date": trade_date, "error": repr(exc)})
return {
"requested_days": len(schedule),
"success_count": len(success),
"failed_count": len(failed),
"success": success,
"failed": failed,
}
def rematch_traders(*, config: AppConfig | None = None) -> dict[str, int]:
cfg = config or load_config()
matchers = build_seat_matchers(cfg)
updated = 0
with db_cursor(config=cfg) as (_, cursor):
cursor.execute("SELECT id, seat_name FROM lhb_detail_seats")
rows = cursor.fetchall()
matched_rows = []
for row in rows:
trader_name, matched_seat = match_trader(row["seat_name"], matchers)
if trader_name:
matched_rows.append((trader_name, matched_seat, row["id"]))
if matched_rows:
cursor.executemany(
"""
UPDATE lhb_detail_seats
SET matched_trader_name = %s,
matched_seat = %s
WHERE id = %s
""",
matched_rows,
)
updated = len(matched_rows)
return {"updated": updated}
def insert_warning(cursor, payload: dict[str, Any]) -> None:
cursor.execute(
"""
INSERT INTO warning_events (
trade_date, stock_code, stock_name, trader_name, warning_type,
warning_level, trigger_reason, current_price, pct_chg, status, suggestion
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, 'new', %s)
""",
(
payload["trade_date"],
payload["stock_code"],
payload["stock_name"],
payload["trader_name"],
payload["warning_type"],
payload["warning_level"],
payload["trigger_reason"],
payload.get("current_price"),
payload.get("pct_chg"),
payload["suggestion"],
),
)
def generate_sell_warnings(cursor) -> int:
cursor.execute(
"""
SELECT DISTINCT
d.trade_date,
d.stock_code,
COALESCE(o.stock_name, d.stock_name) AS stock_name,
d.matched_trader_name,
o.price,
o.pct_chg
FROM lhb_detail_seats d
LEFT JOIN lhb_overview o
ON d.trade_date = o.trade_date AND d.stock_code = o.stock_code
WHERE d.matched_trader_name IS NOT NULL
AND d.table_title = %s
AND CAST(COALESCE(NULLIF(d.sell_amount_wan, ''), '0') AS DECIMAL(18,2)) > 0
""",
(SELL_TABLE_TITLE,),
)
rows = cursor.fetchall()
count = 0
for row in rows:
insert_warning(
cursor,
{
"trade_date": row["trade_date"],
"stock_code": row["stock_code"],
"stock_name": row.get("stock_name"),
"trader_name": row["matched_trader_name"],
"warning_type": "sell_alert",
"warning_level": "high",
"trigger_reason": "目标游资出现卖出记录,触发卖出即时预警规则。",
"current_price": row["price"],
"pct_chg": row["pct_chg"],
"suggestion": "优先审查该股是否进入撤退阶段。",
},
)
count += 1
return count
def generate_slow_exit_warnings(cursor, warning_days_without_action: int) -> int:
cursor.execute("SELECT DISTINCT trade_date FROM lhb_overview WHERE trade_date IS NOT NULL ORDER BY trade_date")
trading_days = [row["trade_date"] for row in cursor.fetchall()]
trade_index = {trade_day: idx for idx, trade_day in enumerate(trading_days)}
if not trading_days:
return 0
latest_trade_day = trading_days[-1]
cursor.execute(
"""
SELECT
matched_trader_name,
stock_code,
COALESCE(MAX(stock_name), stock_code) AS stock_name,
MIN(trade_date) AS first_trade_date,
MAX(trade_date) AS last_trade_date
FROM lhb_detail_seats
WHERE matched_trader_name IS NOT NULL
GROUP BY matched_trader_name, stock_code
"""
)
rows = cursor.fetchall()
count = 0
for row in rows:
first_trade_date = row["first_trade_date"]
last_trade_date = row["last_trade_date"]
if first_trade_date is None or last_trade_date is None:
continue
if last_trade_date not in trade_index or latest_trade_day not in trade_index:
continue
distance = trade_index[latest_trade_day] - trade_index[last_trade_date]
if distance < warning_days_without_action:
continue
cursor.execute(
"""
SELECT price, pct_chg, trade_date
FROM lhb_overview
WHERE stock_code = %s
ORDER BY trade_date DESC
LIMIT 1
""",
(row["stock_code"],),
)
latest = cursor.fetchone()
current_price = latest["price"] if latest else None
pct_chg = latest["pct_chg"] if latest else None
insert_warning(
cursor,
{
"trade_date": latest_trade_day,
"stock_code": row["stock_code"],
"stock_name": row["stock_name"],
"trader_name": row["matched_trader_name"],
"warning_type": "slow_exit_watch",
"warning_level": "medium",
"trigger_reason": f"上榜后连续 {warning_days_without_action} 个交易日未出现新动作,触发慢流出观察。",
"current_price": current_price,
"pct_chg": pct_chg,
"suggestion": "继续观察是否转为明确卖出或破位。",
},
)
count += 1
return count
def generate_warnings(*, config: AppConfig | None = None, clear_existing: bool = True) -> dict[str, int]:
cfg = config or load_config()
warning_days = int(cfg.monitoring.get("warning_days_without_action", 4))
with db_cursor(config=cfg) as (_, cursor):
if clear_existing:
cursor.execute("DELETE FROM warning_events")
sell_count = generate_sell_warnings(cursor)
slow_exit_count = generate_slow_exit_warnings(cursor, warning_days)
return {
"sell_alert": sell_count,
"slow_exit_watch": slow_exit_count,
"total": sell_count + slow_exit_count,
}
def refresh_trade_date(
trade_date: str,
*,
config: AppConfig | None = None,
regenerate_warnings: bool = True,
) -> dict[str, Any]:
import_result = import_daily(trade_date, config=config)
rematch_result = rematch_traders(config=config)
warning_result = generate_warnings(config=config) if regenerate_warnings else None
return {
"trade_date": trade_date,
"import_result": {
"overview_count": import_result.overview_count,
"detail_count": import_result.detail_count,
},
"rematch_result": rematch_result,
"warning_result": warning_result,
}
def get_pipeline_status(*, config: AppConfig | None = None) -> dict[str, Any]:
cfg = config or load_config()
with db_cursor(config=cfg) as (_, cursor):
cursor.execute("SELECT COUNT(*) AS c FROM lhb_overview")
overview_total = cursor.fetchone()["c"]
cursor.execute("SELECT COUNT(*) AS c FROM lhb_detail_seats")
detail_total = cursor.fetchone()["c"]
cursor.execute("SELECT COUNT(*) AS c FROM warning_events")
warning_total = cursor.fetchone()["c"]
cursor.execute("SELECT COUNT(*) AS c FROM traders")
trader_total = cursor.fetchone()["c"]
cursor.execute("SELECT MAX(trade_date) AS latest_trade_date FROM lhb_overview")
latest_trade_date = cursor.fetchone()["latest_trade_date"]
cursor.execute(
"""
SELECT trade_date, COUNT(*) AS c
FROM lhb_overview
WHERE trade_date IS NOT NULL
GROUP BY trade_date
ORDER BY trade_date DESC
LIMIT 10
"""
)
recent_trade_days = cursor.fetchall()
return {
"overview_total": overview_total,
"detail_total": detail_total,
"warning_total": warning_total,
"trader_total": trader_total,
"latest_trade_date": latest_trade_date.isoformat() if latest_trade_date else None,
"recent_trade_days": [
{
"trade_date": row["trade_date"].isoformat() if row["trade_date"] else None,
"overview_count": row["c"],
}
for row in recent_trade_days
],
}

View File

@ -0,0 +1,534 @@
from __future__ import annotations
import json
from datetime import date, datetime
from decimal import Decimal
from typing import Any
from .db import db_cursor
from .sources.eastmoney import EastMoneyClient
from .sources.sina import SinaClient
def _normalize_value(value: Any) -> Any:
if isinstance(value, (datetime, date)):
return value.isoformat()
if isinstance(value, Decimal):
return float(value)
return value
def _normalize_row(row: dict[str, Any]) -> dict[str, Any]:
return {key: _normalize_value(value) for key, value in row.items()}
def _parse_json_list(value: Any) -> list[Any]:
if isinstance(value, list):
return value
if isinstance(value, str) and value:
try:
parsed = json.loads(value)
return parsed if isinstance(parsed, list) else []
except json.JSONDecodeError:
return []
return []
def _infer_market_label(stock_code: str) -> str:
if stock_code.startswith(("6", "9", "5", "688")):
return "沪A"
return "深A"
def _infer_board_label(stock_code: str) -> str:
if stock_code.startswith(("688", "689")):
return "绉戝垱鏉?"
if stock_code.startswith(("300", "301")):
return "鍒涗笟鏉?"
if stock_code.startswith(("8", "4", "920")):
return "鍖椾氦鎵€"
if stock_code.startswith(("60", "601", "603", "605", "900")):
return "娌富鏉?"
if stock_code.startswith(("000", "001", "002", "003", "200")):
return "娣变富鏉?"
return "A鑲?"
def fetch_summary() -> dict[str, Any]:
with db_cursor() as (_, cursor):
cursor.execute("SELECT COUNT(*) AS c FROM warning_events")
warning_total = cursor.fetchone()["c"]
cursor.execute(
"""
SELECT warning_level, COUNT(*) AS c
FROM warning_events
GROUP BY warning_level
"""
)
warning_by_level = {row["warning_level"]: row["c"] for row in cursor.fetchall()}
cursor.execute("SELECT COUNT(*) AS c FROM traders")
trader_total = cursor.fetchone()["c"]
cursor.execute("SELECT COUNT(DISTINCT stock_code) AS c FROM lhb_overview")
stock_total = cursor.fetchone()["c"]
cursor.execute("SELECT COUNT(DISTINCT trade_date) AS c FROM lhb_overview WHERE trade_date IS NOT NULL")
imported_days = cursor.fetchone()["c"]
return {
"warning_total": warning_total,
"warning_by_level": warning_by_level,
"trader_total": trader_total,
"stock_total": stock_total,
"imported_days": imported_days,
}
def fetch_warnings(limit: int = 50) -> list[dict[str, Any]]:
with db_cursor() as (_, cursor):
cursor.execute(
"""
SELECT
trade_date,
stock_code,
stock_name,
trader_name,
warning_type,
warning_level,
trigger_reason,
current_price,
pct_chg,
suggestion,
created_at
FROM warning_events
ORDER BY trade_date DESC, created_at DESC
LIMIT %s
""",
(limit,),
)
return [_normalize_row(row) for row in cursor.fetchall()]
def fetch_traders() -> list[dict[str, Any]]:
with db_cursor() as (_, cursor):
cursor.execute(
"""
SELECT
t.id,
t.name,
t.alias_name,
t.warning_weight,
t.style_tags,
COUNT(DISTINCT d.stock_code) AS stock_count,
COUNT(DISTINCT CASE WHEN w.warning_type = 'sell_alert' THEN CONCAT(w.trade_date, ':', w.stock_code) END) AS sell_alert_count,
COUNT(DISTINCT CASE WHEN w.warning_type = 'slow_exit_watch' THEN CONCAT(w.trade_date, ':', w.stock_code) END) AS slow_exit_count
FROM traders t
LEFT JOIN lhb_detail_seats d
ON d.matched_trader_name = t.name
LEFT JOIN warning_events w
ON w.trader_name = t.name
GROUP BY t.id, t.name, t.alias_name, t.warning_weight, t.style_tags
ORDER BY stock_count DESC, t.name
"""
)
rows = [_normalize_row(row) for row in cursor.fetchall()]
for row in rows:
row["style_tags"] = _parse_json_list(row.get("style_tags"))
return rows
def fetch_watchlist(include_archived: bool = False) -> list[dict[str, Any]]:
with db_cursor() as (_, cursor):
cursor.execute(
"""
SELECT
stock_code,
stock_name,
source_trade_date,
source_trader_name,
status,
added_at,
archived_at,
updated_at
FROM watchlist_entries
WHERE (%s = 1 OR status = 'active')
ORDER BY added_at DESC, updated_at DESC, stock_code
""",
(1 if include_archived else 0,),
)
return [_normalize_row(row) for row in cursor.fetchall()]
def upsert_watchlist_item(
stock_code: str,
stock_name: str,
source_trade_date: str | None,
source_trader_name: str | None,
) -> dict[str, Any]:
with db_cursor() as (_, cursor):
cursor.execute(
"""
INSERT INTO watchlist_entries (
stock_code,
stock_name,
source_trade_date,
source_trader_name,
status,
archived_at
)
VALUES (%s, %s, %s, %s, 'active', NULL)
ON DUPLICATE KEY UPDATE
stock_name = VALUES(stock_name),
source_trade_date = VALUES(source_trade_date),
source_trader_name = VALUES(source_trader_name),
status = 'active',
archived_at = NULL,
added_at = CURRENT_TIMESTAMP
""",
(stock_code, stock_name, source_trade_date, source_trader_name),
)
cursor.execute(
"""
SELECT
stock_code,
stock_name,
source_trade_date,
source_trader_name,
status,
added_at,
archived_at,
updated_at
FROM watchlist_entries
WHERE stock_code = %s
LIMIT 1
""",
(stock_code,),
)
row = cursor.fetchone()
return _normalize_row(row) if row else {}
def delete_watchlist_item(stock_code: str) -> bool:
with db_cursor() as (_, cursor):
cursor.execute(
"""
DELETE FROM watchlist_entries
WHERE stock_code = %s
""",
(stock_code,),
)
return cursor.rowcount > 0
def fetch_trader_actions(
trade_date: str | None = None,
date_from: str | None = None,
date_to: str | None = None,
trader_name: str | None = None,
limit: int = 200,
) -> dict[str, Any]:
with db_cursor() as (_, cursor):
selected_trade_date = trade_date
resolved_date_from = date_from
resolved_date_to = date_to
if not selected_trade_date and not (resolved_date_from and resolved_date_to):
cursor.execute(
"""
SELECT MAX(trade_date) AS latest_trade_date
FROM lhb_detail_seats
WHERE matched_trader_name IS NOT NULL
"""
)
latest = cursor.fetchone()
selected_trade_date = latest["latest_trade_date"] if latest else None
if selected_trade_date:
resolved_date_from = selected_trade_date
resolved_date_to = selected_trade_date
if not resolved_date_from or not resolved_date_to:
return {"trade_date": None, "date_from": None, "date_to": None, "actions": []}
trader_filter = ""
params: list[Any] = [resolved_date_from, resolved_date_to]
if trader_name:
trader_filter = "AND d.matched_trader_name = %s"
params.append(trader_name)
params.append(limit)
cursor.execute(
f"""
SELECT
d.trade_date,
d.stock_code,
COALESCE(o.stock_name, d.stock_name) AS stock_name,
d.matched_trader_name AS trader_name,
d.table_title,
d.seat_name,
d.buy_amount_wan,
d.sell_amount_wan,
d.net_amount_wan,
o.price AS current_price,
o.pct_chg,
s.industry,
s.market,
s.total_market_value,
s.circulating_market_value,
CASE
WHEN CAST(COALESCE(NULLIF(d.buy_amount_wan, ''), '0') AS DECIMAL(18,2)) > 0
AND CAST(COALESCE(NULLIF(d.sell_amount_wan, ''), '0') AS DECIMAL(18,2)) = 0
THEN 'buy'
WHEN CAST(COALESCE(NULLIF(d.sell_amount_wan, ''), '0') AS DECIMAL(18,2)) > 0
AND CAST(COALESCE(NULLIF(d.buy_amount_wan, ''), '0') AS DECIMAL(18,2)) = 0
THEN 'sell'
WHEN CAST(COALESCE(NULLIF(d.net_amount_wan, ''), '0') AS DECIMAL(18,2)) >= 0
THEN 'net_buy'
ELSE 'net_sell'
END AS action_side
FROM lhb_detail_seats d
LEFT JOIN (
SELECT
trade_date,
stock_code,
MAX(stock_name) AS stock_name,
MAX(price) AS price,
MAX(pct_chg) AS pct_chg
FROM lhb_overview
GROUP BY trade_date, stock_code
) o
ON o.trade_date = d.trade_date AND o.stock_code = d.stock_code
LEFT JOIN stocks s
ON s.stock_code = d.stock_code
WHERE d.trade_date BETWEEN %s AND %s
AND d.matched_trader_name IS NOT NULL
{trader_filter}
ORDER BY d.trade_date DESC, d.matched_trader_name, d.stock_code, d.table_title, d.id
LIMIT %s
""",
params,
)
actions = [_normalize_row(row) for row in cursor.fetchall()]
for action in actions:
action["market"] = action.get("market") or _infer_market_label(action["stock_code"])
action["board_label"] = _infer_board_label(action["stock_code"])
return {
"trade_date": _normalize_value(selected_trade_date or resolved_date_to),
"date_from": _normalize_value(resolved_date_from),
"date_to": _normalize_value(resolved_date_to),
"actions": actions,
}
def fetch_trader_detail(trader_id: int) -> dict[str, Any]:
with db_cursor() as (_, cursor):
cursor.execute(
"""
SELECT id, name, alias_name, warning_weight, style_tags
FROM traders
WHERE id = %s
""",
(trader_id,),
)
trader = cursor.fetchone()
if not trader:
return {}
trader_name = trader["name"]
trader = _normalize_row(trader)
trader["style_tags"] = _parse_json_list(trader.get("style_tags"))
cursor.execute(
"""
SELECT seat_name, seat_level
FROM trader_seats ts
JOIN traders t ON ts.trader_id = t.id
WHERE t.id = %s
ORDER BY seat_level, seat_name
""",
(trader_id,),
)
seats = [_normalize_row(row) for row in cursor.fetchall()]
cursor.execute(
"""
SELECT
d.stock_code,
MAX(COALESCE(o.stock_name, d.stock_name)) AS stock_name,
MAX(o.price) AS latest_price,
MAX(o.pct_chg) AS pct_chg,
COUNT(*) AS action_count,
MAX(d.trade_date) AS last_trade_date,
SUM(CASE WHEN CAST(COALESCE(NULLIF(d.buy_amount_wan, ''), '0') AS DECIMAL(18,2)) > 0 THEN 1 ELSE 0 END) AS buy_action_count,
SUM(CASE WHEN CAST(COALESCE(NULLIF(d.sell_amount_wan, ''), '0') AS DECIMAL(18,2)) > 0 THEN 1 ELSE 0 END) AS sell_action_count,
MAX(CASE WHEN w.warning_type = 'sell_alert' THEN 1 ELSE 0 END) AS has_sell_alert,
MAX(CASE WHEN w.warning_type = 'slow_exit_watch' THEN 1 ELSE 0 END) AS has_slow_exit
FROM lhb_detail_seats d
LEFT JOIN lhb_overview o
ON o.stock_code = d.stock_code AND o.trade_date = d.trade_date
LEFT JOIN warning_events w
ON w.stock_code = d.stock_code AND w.trader_name = d.matched_trader_name
WHERE d.matched_trader_name = %s
GROUP BY d.stock_code
ORDER BY last_trade_date DESC, action_count DESC
LIMIT 100
""",
(trader_name,),
)
stocks = [_normalize_row(row) for row in cursor.fetchall()]
cursor.execute(
"""
SELECT trade_date, stock_code, stock_name, warning_type, warning_level, trigger_reason
FROM warning_events
WHERE trader_name = %s
ORDER BY trade_date DESC, created_at DESC
LIMIT 20
""",
(trader_name,),
)
warnings = [_normalize_row(row) for row in cursor.fetchall()]
return {
"trader": trader,
"seats": seats,
"stocks": stocks,
"warnings": warnings,
}
def fetch_stock_detail(stock_code: str) -> dict[str, Any]:
market_daily: list[dict[str, Any]] = []
quote_snapshot: dict[str, Any] = {}
eastmoney = EastMoneyClient()
try:
market_daily = eastmoney.fetch_daily_kline(stock_code)
except Exception:
market_daily = []
try:
quote_snapshot = eastmoney.fetch_quote_snapshot(stock_code)
except Exception:
quote_snapshot = {}
if not market_daily:
try:
market_daily = SinaClient().fetch_daily_kline(stock_code)
except Exception:
market_daily = []
with db_cursor() as (_, cursor):
cursor.execute(
"""
SELECT stock_code, stock_name, market, industry, total_market_value, circulating_market_value
FROM stocks
WHERE stock_code = %s
""",
(stock_code,),
)
stock = cursor.fetchone()
if not stock:
return {}
stock = _normalize_row(stock)
if quote_snapshot:
if not stock.get("stock_name"):
stock["stock_name"] = quote_snapshot.get("stock_name")
if not stock.get("market"):
stock["market"] = _infer_market_label(stock_code)
if not stock.get("industry"):
stock["industry"] = quote_snapshot.get("industry")
if not stock.get("total_market_value"):
stock["total_market_value"] = quote_snapshot.get("total_market_value")
if not stock.get("circulating_market_value"):
stock["circulating_market_value"] = quote_snapshot.get("circulating_market_value")
if not stock.get("market"):
stock["market"] = _infer_market_label(stock_code)
cursor.execute(
"""
SELECT trade_date, price, pct_chg, amount, net_buy, flag
FROM lhb_overview
WHERE stock_code = %s
ORDER BY trade_date DESC
""",
(stock_code,),
)
overview = [_normalize_row(row) for row in cursor.fetchall()]
cursor.execute(
"""
SELECT
trade_date,
matched_trader_name,
table_title,
seat_name,
buy_amount_wan,
sell_amount_wan,
net_amount_wan
FROM lhb_detail_seats
WHERE stock_code = %s
AND matched_trader_name IS NOT NULL
ORDER BY trade_date DESC, id DESC
LIMIT 120
""",
(stock_code,),
)
trader_actions = [_normalize_row(row) for row in cursor.fetchall()]
cursor.execute(
"""
SELECT
matched_trader_name,
COUNT(*) AS action_count,
SUM(CASE WHEN CAST(COALESCE(NULLIF(buy_amount_wan, ''), '0') AS DECIMAL(18,2)) > 0 THEN 1 ELSE 0 END) AS buy_count,
SUM(CASE WHEN CAST(COALESCE(NULLIF(sell_amount_wan, ''), '0') AS DECIMAL(18,2)) > 0 THEN 1 ELSE 0 END) AS sell_count,
MAX(trade_date) AS last_trade_date,
SUM(CAST(COALESCE(NULLIF(buy_amount_wan, ''), '0') AS DECIMAL(18,2))) AS total_buy_amount_wan,
SUM(CAST(COALESCE(NULLIF(sell_amount_wan, ''), '0') AS DECIMAL(18,2))) AS total_sell_amount_wan,
SUM(CAST(COALESCE(NULLIF(net_amount_wan, ''), '0') AS DECIMAL(18,2))) AS total_net_amount_wan
FROM lhb_detail_seats
WHERE stock_code = %s
AND matched_trader_name IS NOT NULL
GROUP BY matched_trader_name
ORDER BY action_count DESC, last_trade_date DESC
""",
(stock_code,),
)
trader_summary = [_normalize_row(row) for row in cursor.fetchall()]
cursor.execute(
"""
SELECT trade_date, trader_name, warning_type, warning_level, trigger_reason, suggestion
FROM warning_events
WHERE stock_code = %s
ORDER BY trade_date DESC, created_at DESC
LIMIT 30
""",
(stock_code,),
)
warnings = [_normalize_row(row) for row in cursor.fetchall()]
circulating_shares = quote_snapshot.get("circulating_shares")
for row in market_daily:
close_price = float(row.get("close") or 0)
volume = float(row.get("volume") or 0)
if row.get("amount") in (None, "", "-") and close_price and volume:
row["amount"] = f"{close_price * volume / 100000000:.2f}亿"
if row.get("turnover") in (None, "", "-") and circulating_shares:
try:
turnover_pct = volume / float(circulating_shares) * 100
row["turnover"] = f"{turnover_pct:.2f}%"
except Exception:
pass
return {
"stock": stock,
"market_snapshot": {key: _normalize_value(value) for key, value in quote_snapshot.items()},
"overview": overview,
"market_daily": market_daily,
"trader_actions": trader_actions,
"trader_summary": trader_summary,
"warnings": warnings,
}

View File

@ -0,0 +1,112 @@
CREATE TABLE IF NOT EXISTS traders (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(64) NOT NULL UNIQUE,
alias_name VARCHAR(128) NULL,
warning_weight VARCHAR(16) NOT NULL DEFAULT 'medium',
style_tags JSON NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS trader_seats (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
trader_id BIGINT NOT NULL,
seat_name VARCHAR(255) NOT NULL,
seat_level VARCHAR(32) NOT NULL,
is_active TINYINT(1) NOT NULL DEFAULT 1,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY uniq_trader_seat (trader_id, seat_name),
CONSTRAINT fk_trader_seats_trader FOREIGN KEY (trader_id) REFERENCES traders(id)
);
CREATE TABLE IF NOT EXISTS stocks (
stock_code VARCHAR(16) PRIMARY KEY,
stock_name VARCHAR(64) NOT NULL,
market VARCHAR(32) NULL,
industry VARCHAR(128) NULL,
concept_tags JSON NULL,
total_market_value DECIMAL(20, 2) NULL,
circulating_market_value DECIMAL(20, 2) NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS lhb_overview (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
trade_date DATE NULL,
stock_code VARCHAR(16) NOT NULL,
stock_name VARCHAR(64) NOT NULL,
flag VARCHAR(32) NULL,
rid VARCHAR(32) NULL,
price VARCHAR(32) NULL,
pct_chg VARCHAR(32) NULL,
amount VARCHAR(32) NULL,
net_buy VARCHAR(32) NULL,
detail_url VARCHAR(255) NULL,
source VARCHAR(32) NOT NULL DEFAULT 'tonghuashun',
raw_payload JSON NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
KEY idx_lhb_overview_code (stock_code),
KEY idx_lhb_overview_date (trade_date),
UNIQUE KEY uniq_lhb_overview_record (trade_date, stock_code, rid)
);
CREATE TABLE IF NOT EXISTS lhb_detail_seats (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
trade_date DATE NULL,
stock_code VARCHAR(16) NOT NULL,
stock_name VARCHAR(64) NULL,
rid VARCHAR(32) NULL,
section_title VARCHAR(255) NULL,
table_title VARCHAR(128) NOT NULL,
seat_name VARCHAR(255) NOT NULL,
buy_amount_wan VARCHAR(32) NULL,
buy_ratio VARCHAR(32) NULL,
sell_amount_wan VARCHAR(32) NULL,
sell_ratio VARCHAR(32) NULL,
net_amount_wan VARCHAR(32) NULL,
matched_trader_name VARCHAR(64) NULL,
matched_seat VARCHAR(255) NULL,
detail_url VARCHAR(255) NULL,
source VARCHAR(32) NOT NULL DEFAULT 'tonghuashun',
raw_payload JSON NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
KEY idx_lhb_detail_code (stock_code),
KEY idx_lhb_detail_trade_date (trade_date),
KEY idx_lhb_detail_trader (matched_trader_name),
UNIQUE KEY uniq_lhb_detail_record (trade_date, stock_code, rid, table_title, seat_name)
);
CREATE TABLE IF NOT EXISTS warning_events (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
trade_date DATE NULL,
stock_code VARCHAR(16) NOT NULL,
stock_name VARCHAR(64) NULL,
trader_name VARCHAR(64) NULL,
warning_type VARCHAR(64) NOT NULL,
warning_level VARCHAR(32) NOT NULL,
trigger_reason TEXT NOT NULL,
current_price VARCHAR(32) NULL,
pct_chg VARCHAR(32) NULL,
status VARCHAR(32) NOT NULL DEFAULT 'new',
suggestion VARCHAR(255) NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
KEY idx_warning_events_code (stock_code),
KEY idx_warning_events_trade_date (trade_date),
KEY idx_warning_events_trader (trader_name)
);
CREATE TABLE IF NOT EXISTS watchlist_entries (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
stock_code VARCHAR(16) NOT NULL,
stock_name VARCHAR(64) NOT NULL,
source_trade_date DATE NULL,
source_trader_name VARCHAR(64) NULL,
status VARCHAR(32) NOT NULL DEFAULT 'active',
added_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
archived_at TIMESTAMP NULL DEFAULT NULL,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY uniq_watchlist_stock (stock_code),
KEY idx_watchlist_status_added (status, added_at)
);

46
backend/src/lhbfx/seed.py Normal file
View File

@ -0,0 +1,46 @@
from __future__ import annotations
import json
from .config import AppConfig, load_config
from .db import db_cursor
def seed_traders(config: AppConfig | None = None) -> None:
cfg = config or load_config()
with db_cursor(config=cfg) as (_, cursor):
for trader in cfg.traders:
cursor.execute(
"""
INSERT INTO traders (name, alias_name, warning_weight, style_tags)
VALUES (%s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
alias_name = VALUES(alias_name),
warning_weight = VALUES(warning_weight),
style_tags = VALUES(style_tags)
""",
(
trader["name"],
trader.get("alias") or None,
trader.get("warning_weight", "medium"),
json.dumps(trader.get("style_tags", []), ensure_ascii=False),
),
)
cursor.execute("SELECT id FROM traders WHERE name = %s", (trader["name"],))
trader_id = cursor.fetchone()["id"]
seat_groups = trader.get("seats", {})
for seat_level, seats in seat_groups.items():
for seat in seats or []:
cursor.execute(
"""
INSERT INTO trader_seats (trader_id, seat_name, seat_level)
VALUES (%s, %s, %s)
ON DUPLICATE KEY UPDATE
seat_level = VALUES(seat_level),
is_active = 1
""",
(trader_id, seat, seat_level),
)

View File

@ -0,0 +1,2 @@
"""Data sources."""

View File

@ -0,0 +1,94 @@
from __future__ import annotations
from typing import Any
import requests
DEFAULT_HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/135.0.0.0 Safari/537.36"
),
"Referer": "https://quote.eastmoney.com/",
}
def infer_secid(stock_code: str) -> str:
if stock_code.startswith(("6", "9", "5", "688")):
return f"1.{stock_code}"
return f"0.{stock_code}"
class EastMoneyClient:
def fetch_quote_snapshot(self, stock_code: str) -> dict[str, Any]:
secid = infer_secid(stock_code)
url = "https://push2.eastmoney.com/api/qt/stock/get"
params = {
"secid": secid,
"fields": "f43,f44,f45,f46,f47,f48,f57,f58,f60,f84,f85,f116,f117,f127,f168,f169,f170,f171",
}
response = requests.get(url, params=params, headers=DEFAULT_HEADERS, timeout=20)
response.raise_for_status()
payload = response.json()
data = payload.get("data") or {}
return {
"stock_code": data.get("f57") or stock_code,
"stock_name": data.get("f58"),
"industry": data.get("f127"),
"circulating_shares": data.get("f84"),
"circulating_market_value": data.get("f117"),
"total_market_value": data.get("f116"),
"latest_price": data.get("f43"),
"high_price": data.get("f44"),
"low_price": data.get("f45"),
"open_price": data.get("f46"),
"volume": data.get("f47"),
"amount": data.get("f48"),
"previous_close": data.get("f60"),
"turnover": data.get("f168"),
"price_chg": data.get("f169"),
"pct_chg": data.get("f170"),
"amplitude": data.get("f171"),
}
def fetch_daily_kline(self, stock_code: str, limit: int = 240) -> list[dict[str, Any]]:
secid = infer_secid(stock_code)
url = "https://push2his.eastmoney.com/api/qt/stock/kline/get"
params = {
"secid": secid,
"klt": "101",
"fqt": "1",
"lmt": str(limit),
"end": "20500000",
"fields1": "f1,f2,f3,f4,f5,f6",
"fields2": "f51,f52,f53,f54,f55,f56,f57,f58,f59,f60,f61",
}
response = requests.get(url, params=params, headers=DEFAULT_HEADERS, timeout=20)
response.raise_for_status()
payload = response.json()
data = payload.get("data") or {}
klines = data.get("klines") or []
rows: list[dict[str, Any]] = []
for item in klines:
parts = item.split(",")
if len(parts) < 11:
continue
rows.append(
{
"trade_date": parts[0],
"open": parts[1],
"close": parts[2],
"high": parts[3],
"low": parts[4],
"volume": parts[5],
"amount": parts[6],
"amplitude": parts[7],
"pct_chg": parts[8],
"price_chg": parts[9],
"turnover": parts[10],
}
)
return rows

View File

@ -0,0 +1,73 @@
from __future__ import annotations
from typing import Any
import requests
DEFAULT_HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/135.0.0.0 Safari/537.36"
),
"Referer": "https://finance.sina.com.cn/",
}
def infer_symbol(stock_code: str) -> str:
if stock_code.startswith(("6", "9", "5", "688")):
return f"sh{stock_code}"
return f"sz{stock_code}"
class SinaClient:
def fetch_daily_kline(self, stock_code: str, limit: int = 240) -> list[dict[str, Any]]:
symbol = infer_symbol(stock_code)
url = "https://money.finance.sina.com.cn/quotes_service/api/json_v2.php/CN_MarketData.getKLineData"
params = {
"symbol": symbol,
"scale": "240",
"ma": "no",
"datalen": str(limit),
}
response = requests.get(url, params=params, headers=DEFAULT_HEADERS, timeout=20)
response.raise_for_status()
payload = response.json()
rows: list[dict[str, Any]] = []
previous_close: float | None = None
for item in payload or []:
open_price = float(item.get("open", 0) or 0)
close_price = float(item.get("close", 0) or 0)
high_price = float(item.get("high", 0) or 0)
low_price = float(item.get("low", 0) or 0)
amplitude = "-"
pct_chg = "-"
price_chg = "-"
if previous_close and previous_close != 0:
price_diff = close_price - previous_close
pct_diff = price_diff / previous_close * 100
amp = (high_price - low_price) / previous_close * 100
price_chg = f"{price_diff:.3f}"
pct_chg = f"{pct_diff:.2f}%"
amplitude = f"{amp:.2f}%"
rows.append(
{
"trade_date": item.get("day", ""),
"open": item.get("open", ""),
"close": item.get("close", ""),
"high": item.get("high", ""),
"low": item.get("low", ""),
"volume": item.get("volume", ""),
"amount": "-",
"amplitude": amplitude,
"pct_chg": pct_chg,
"price_chg": price_chg,
"turnover": "-",
}
)
previous_close = close_price or previous_close
return rows

View File

@ -0,0 +1,196 @@
from __future__ import annotations
from datetime import datetime
from typing import Any
import requests
from bs4 import BeautifulSoup
from ..config import AppConfig, load_config
DEFAULT_HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/135.0.0.0 Safari/537.36"
),
"Referer": "http://data.10jqka.com.cn/",
}
class TongHuaShunClient:
def __init__(self, config: AppConfig | None = None) -> None:
self.config = config or load_config()
def fetch_html(self, url: str) -> str:
response = requests.get(url, headers=DEFAULT_HEADERS, timeout=20)
response.raise_for_status()
response.encoding = "gbk"
return response.text
def fetch_overview(self) -> list[dict[str, Any]]:
overview_url = "http://data.10jqka.com.cn/market/lhbcjmx/"
html = self.fetch_html(overview_url)
return self._parse_overview(html)
def fetch_daily_overview(self, trade_date: str) -> list[dict[str, Any]]:
table_url = (
"http://data.10jqka.com.cn/ifmarket/lhbtable/"
f"stock/all/report/{trade_date}/tab/all/field/STOCKCODE/sort/asc/"
)
html = self.fetch_html(table_url)
return self._parse_overview(html, trade_date=trade_date)
def fetch_stock_detail(self, stock_code: str) -> dict[str, Any]:
detail_url = f"http://data.10jqka.com.cn/market/longhu/code/{stock_code}/"
html = self.fetch_html(detail_url)
overview_rows = self._parse_overview(html)
detail_tables = self._parse_detail_tables(html)
return {
"fetched_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"stock_code": stock_code,
"detail_url": detail_url,
"overview_rows": overview_rows,
"detail_tables": detail_tables,
}
def fetch_record_detail(self, stock_code: str, trade_date: str, rid: str) -> dict[str, Any]:
rid = self._normalize_rid(rid)
detail_url = (
"http://data.10jqka.com.cn/ifmarket/getnewlh/"
f"code/{stock_code}/date/{trade_date}/rid/{rid}/"
)
html = self.fetch_html(detail_url)
return self._parse_record_detail(html, stock_code=stock_code, trade_date=trade_date, rid=rid, detail_url=detail_url)
def _parse_overview(self, html: str, limit: int = 50, trade_date: str | None = None) -> list[dict[str, Any]]:
soup = BeautifulSoup(html, "html.parser")
rows = []
target_table = None
for table in soup.select("table"):
headers = [th.get_text(" ", strip=True) for th in table.select("th")]
if {"代码", "名称", "现价", "涨跌幅", "成交金额", "净买入额"}.issubset(set(headers)):
next_table = table.find_next("table")
if next_table:
target_table = next_table
break
if target_table is None:
return rows
for tr in target_table.select("tbody tr")[:limit]:
tds = tr.select("td")
if len(tds) < 7:
continue
stock_link = tds[2].select_one("a.stock")
rid = stock_link.get("rid", "") if stock_link else ""
normalized_rid = self._normalize_rid(rid)
stock_code = tds[1].get_text(" ", strip=True)
detail_url = (
f"http://data.10jqka.com.cn/market/lhbgg/code/{stock_code}/date/{trade_date}/rid/{normalized_rid}/"
if trade_date and normalized_rid
else (stock_link.get("href", "") if stock_link else "")
)
rows.append(
{
"flag": tds[0].get_text(" ", strip=True),
"stock_code": stock_code,
"stock_name": tds[2].get_text(" ", strip=True),
"price": tds[3].get_text(" ", strip=True),
"pct_chg": tds[4].get_text(" ", strip=True),
"amount": tds[5].get_text(" ", strip=True),
"net_buy": tds[6].get_text(" ", strip=True),
"rid": normalized_rid,
"trade_date": trade_date,
"detail_url": detail_url,
}
)
return rows
def _parse_detail_tables(self, html: str) -> list[dict[str, Any]]:
soup = BeautifulSoup(html, "html.parser")
tables = []
for table in soup.select("table"):
headers = [th.get_text(" ", strip=True) for th in table.select("th")]
if not headers:
continue
if "买入金额最大的前5名营业部" not in headers and "卖出金额最大的前5名营业部" not in headers:
continue
rows = []
for tr in table.select("tr")[1:]:
tds = [td.get_text(" ", strip=True) for td in tr.select("td")]
if len(tds) < 4:
continue
rows.append(
{
"seat_name": tds[0],
"buy_amount_wan": tds[1],
"sell_amount_wan": tds[2],
"net_amount_wan": tds[3],
}
)
tables.append({"table_title": headers[0], "rows": rows})
return tables
def _parse_record_detail(
self,
html: str,
stock_code: str,
trade_date: str,
rid: str,
detail_url: str,
) -> dict[str, Any]:
soup = BeautifulSoup(html, "html.parser")
title = soup.select_one(".lhb-tipbox-hd-title")
date_node = soup.select_one(".lhb-tipbox-hd-date")
desc_node = soup.select_one(".lhb-tipbox-hd-desc")
table = soup.select_one("table")
headers = [th.get_text(" ", strip=True) for th in table.select("th")] if table else []
sections: list[dict[str, Any]] = []
current_section: dict[str, Any] | None = None
if table:
for tr in table.select("tr")[1:]:
classes = tr.get("class") or []
cells = [td.get_text(" ", strip=True) for td in tr.select("td")]
if "lhb_td_desc" in classes:
current_section = {
"section_title": cells[0] if cells else "",
"rows": [],
}
sections.append(current_section)
continue
if len(cells) >= 7:
row = {
"rank_no": cells[0],
"seat_name": cells[1],
"buy_amount_wan": cells[2],
"buy_ratio": cells[3],
"sell_amount_wan": cells[4],
"sell_ratio": cells[5],
"net_amount_wan": cells[6],
}
if current_section is None:
current_section = {"section_title": "unknown", "rows": []}
sections.append(current_section)
current_section["rows"].append(row)
return {
"stock_code": stock_code,
"trade_date": trade_date,
"rid": rid,
"detail_url": detail_url,
"title": title.get_text(" ", strip=True) if title else "",
"date_text": date_node.get_text(" ", strip=True) if date_node else trade_date,
"summary_text": desc_node.get_text(" ", strip=True) if desc_node else "",
"headers": headers,
"sections": sections,
}
@staticmethod
def _normalize_rid(rid: str) -> str:
if "_" in rid:
return rid.split("_")[-1]
return rid