Files
lhbfx/backend/scripts/after_market_update.py
2026-05-02 18:27:36 +08:00

46 lines
1.4 KiB
Python

from __future__ import annotations
import argparse
from datetime import datetime
from _bootstrap import add_src_to_path
add_src_to_path()
from lhbfx.after_market import default_trade_date, run_after_market_update
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Run lhbfx after-market update workflow")
parser.add_argument("--trade-date", help="Trade date in YYYY-MM-DD format, defaults to Asia/Shanghai today")
parser.add_argument("--send-email", action="store_true", help="Send completion email after report generation")
parser.add_argument("--force", action="store_true", help="Run even if the target date is not a weekday")
return parser.parse_args()
def is_weekday(trade_date: str) -> bool:
return datetime.fromisoformat(trade_date).weekday() < 5
def main() -> None:
args = parse_args()
trade_date = args.trade_date or default_trade_date()
if not args.force and not is_weekday(trade_date):
print(f"Skip {trade_date}: not a weekday, assumed non-trading day.")
return
result = run_after_market_update(
trade_date=trade_date,
send_report_email=args.send_email,
force=args.force,
)
if result["status"] == "skipped":
print(f"Skip {trade_date}: {result['reason']}")
return
print("After-market update finished:", result)
if __name__ == "__main__":
main()