Files
zjjk/backend/app/services/analysis_service.py

1531 lines
65 KiB
Python
Raw Normal View History

2026-04-08 20:04:40 +08:00
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
from statistics import mean
from typing import Any
from app.api.analysis_schemas import (
AbcPoint,
AbcStructure,
AlertItem,
AnalysisChartResponse,
AnalysisReportResponse,
ChartCandle,
ChartPriceMarker,
ChartTimeMarker,
ChartToolLayer,
ConclusionSummary,
CycleSummary,
EvidenceItem,
FibonacciLevel,
FibonacciSpace,
FibonacciTime,
IndicatorSnapshot,
MaValues,
MacdValues,
MonitoringTask,
ResonanceItem,
RsiValues,
SnapshotInfo,
StrategyScenario,
SymbolInfo,
TimeSequenceBundle,
TimeSequenceTrack,
ToolPreset,
)
from app.clients.eastmoney_client import EastmoneyClient
@dataclass
class Candle:
timestamp: str
open: float
close: float
high: float
low: float
volume: float
amount: float
@dataclass
class SwingAnchor:
start_index: int
end_index: int
start_label: str
end_label: str
start_price: float
end_price: float
start_time: str
end_time: str
move_direction: str
class AnalysisService:
SPACE_RATIOS = [0.191, 0.382, 0.5, 0.618, 0.809, 1.0, 1.382, 1.618, 2.0, 2.618]
TIME_COUNTS = [13, 21, 34, 55, 89]
CYCLE_MAP: dict[str, int] = {"week": 102, "day": 101, "15m": 15, "30m": 30, "60m": 60, "90m": 90, "120m": 120}
NAME_MAP = {
"科创50": ("000688", "科创50", "sh", "index"),
"上证50": ("000016", "上证50", "sh", "index"),
"沪深300": ("000300", "沪深300", "sz", "index"),
}
def __init__(self) -> None:
self.client = EastmoneyClient()
def build_report(self, query: str) -> AnalysisReportResponse:
symbol, quote, candles_by_cycle = self._load_symbol_bundle(query.strip())
cycle_order = ["week", "day", "15m", "30m", "60m", "90m", "120m"]
cycles = [self._build_cycle_summary(cycle, candles_by_cycle[cycle]) for cycle in cycle_order]
abc_structures = [self._detect_abc(cycle, candles_by_cycle[cycle]) for cycle in cycle_order]
fibonacci_space = [
self._build_fibonacci_space(cycle, candles_by_cycle[cycle], abc)
for cycle, abc in zip(cycle_order, abc_structures)
]
fibonacci_time = [
self._build_fibonacci_time(cycle, candles_by_cycle[cycle], abc)
for cycle, abc in zip(cycle_order, abc_structures)
if cycle in {"15m", "30m", "60m", "90m", "120m"}
]
resonance = self._build_resonance(cycles, fibonacci_space, fibonacci_time, abc_structures)
conclusion_summary = self._build_conclusion_summary(cycles, resonance, fibonacci_space)
alerts = self._build_alerts(cycles, resonance, fibonacci_space, fibonacci_time)
evidence_chain = self._build_evidence_chain(cycles, abc_structures, fibonacci_space, fibonacci_time, resonance)
strategy_scenarios = self._build_strategy_scenarios(fibonacci_space, fibonacci_time, cycles)
monitoring_tasks = self._build_monitoring_tasks(fibonacci_space, fibonacci_time, resonance, cycles)
tool_presets = self._build_tool_presets(fibonacci_space, abc_structures)
return AnalysisReportResponse(
symbol=symbol,
snapshot=SnapshotInfo(
latest_price=self._safe_float(quote.get("f43"), divide=100),
change_percent=self._safe_float(quote.get("f170"), divide=100),
updated_at=datetime.now().isoformat(timespec="seconds"),
source_name="东方财富",
),
cycles=cycles,
abc_structures=abc_structures,
fibonacci_space=fibonacci_space,
fibonacci_time=fibonacci_time,
resonance=resonance,
conclusion_summary=conclusion_summary,
alerts=alerts,
evidence_chain=evidence_chain,
strategy_scenarios=strategy_scenarios,
monitoring_tasks=monitoring_tasks,
tool_presets=tool_presets,
tomorrow_strategy=self._build_tomorrow_strategy(fibonacci_space, fibonacci_time, resonance),
follow_strategy=self._build_follow_strategy(cycles, fibonacci_space, abc_structures),
signal_conclusion=self._build_signal_conclusion(cycles, resonance, fibonacci_space, fibonacci_time),
calculation_steps=self._build_calculation_steps(symbol, abc_structures, fibonacci_space, fibonacci_time),
)
def build_chart(self, query: str, cycle: str) -> AnalysisChartResponse:
normalized_cycle = cycle if cycle in self.CYCLE_MAP else "day"
symbol, _, candles_by_cycle = self._load_symbol_bundle(query.strip())
candles = candles_by_cycle[normalized_cycle]
abc = self._detect_abc(normalized_cycle, candles)
fib_space = self._build_fibonacci_space(normalized_cycle, candles, abc)
fib_time = (
self._build_fibonacci_time(normalized_cycle, candles, abc)
if normalized_cycle in {"15m", "30m", "60m", "90m", "120m"}
else None
)
time_sequences = (
self._build_time_sequence_bundle(normalized_cycle, candles, abc)
if normalized_cycle in {"15m", "30m", "60m", "90m", "120m"}
else None
)
return AnalysisChartResponse(
symbol=symbol,
cycle=normalized_cycle,
candles=self._build_chart_candles(candles),
abc_structure=abc,
fibonacci_space=fib_space,
fibonacci_time=fib_time,
time_sequences=time_sequences,
time_markers=self._build_time_markers(candles, time_sequences),
price_markers=self._build_price_markers(candles, fib_space),
tool_layers=self._build_chart_tool_layers(normalized_cycle, fib_space, abc),
signal_tags=self._build_chart_signal_tags(normalized_cycle, fib_space, fib_time, abc, candles),
)
def _load_symbol_bundle(self, query: str) -> tuple[SymbolInfo, dict[str, Any], dict[str, list[Candle]]]:
symbol = self._resolve_symbol(query.strip())
quote = self.client.fetch_quote(symbol.secid).get("data") or {}
candles_by_cycle = {
cycle: self._parse_klines(self.client.fetch_stock_kline(symbol.secid, limit=160, klt=klt))
for cycle, klt in self.CYCLE_MAP.items()
}
return symbol, quote, candles_by_cycle
def _resolve_symbol(self, query: str) -> SymbolInfo:
if query in self.NAME_MAP:
code, name, market, security_type = self.NAME_MAP[query]
else:
code = query
market = "sh" if code.startswith(("6", "5", "9")) or code == "000688" else "sz"
security_type = "etf" if code.startswith(("51", "15", "56")) else "stock"
name = query
if code == "000688":
market = "sh"
security_type = "index"
name = "科创50"
secid_market = "1" if market == "sh" else "0"
return SymbolInfo(query=query, code=code, name=name, market=market, security_type=security_type, secid=f"{secid_market}.{code}")
def _parse_klines(self, payload: dict[str, Any]) -> list[Candle]:
rows = ((payload.get("data") or {}).get("klines")) or []
candles: list[Candle] = []
for row in rows:
parts = row.split(",")
if len(parts) < 7:
continue
candles.append(
Candle(
timestamp=parts[0],
open=self._safe_float(parts[1]),
close=self._safe_float(parts[2]),
high=self._safe_float(parts[3]),
low=self._safe_float(parts[4]),
volume=self._safe_float(parts[5]),
amount=self._safe_float(parts[6]),
)
)
return candles
def _build_cycle_summary(self, cycle: str, candles: list[Candle]) -> CycleSummary:
if not candles:
return CycleSummary(
cycle=cycle,
close=None,
trend_label="暂无可用K线数据",
ma_status="暂无均线判断",
volume_status="暂无成交量判断",
ma_values=MaValues(),
indicator_snapshot=IndicatorSnapshot(
macd=MacdValues(),
rsi=RsiValues(),
signal_summary="暂无指标判断",
),
)
closes = [item.close for item in candles]
close = closes[-1]
macd_series = self._macd_series(closes)
rsi5 = self._rsi_series(closes, 5)
rsi13 = self._rsi_series(closes, 13)
rsi21 = self._rsi_series(closes, 21)
ma_values = MaValues(
ma5=self._ma(closes, 5),
ma13=self._ma(closes, 13),
ma21=self._ma(closes, 21),
ma34=self._ma(closes, 34),
ma55=self._ma(closes, 55),
ma89=self._ma(closes, 89),
)
return CycleSummary(
cycle=cycle,
close=close,
trend_label=self._trend_label(closes),
ma_status=self._ma_status(close, ma_values),
volume_status=self._volume_status(candles),
ma_values=ma_values,
indicator_snapshot=IndicatorSnapshot(
macd=MacdValues(
dif=self._round_or_none(macd_series[-1]["dif"]),
dea=self._round_or_none(macd_series[-1]["dea"]),
histogram=self._round_or_none(macd_series[-1]["histogram"]),
),
rsi=RsiValues(
rsi5=self._round_or_none(rsi5[-1] if rsi5 else None),
rsi13=self._round_or_none(rsi13[-1] if rsi13 else None),
rsi21=self._round_or_none(rsi21[-1] if rsi21 else None),
),
signal_summary=self._indicator_summary(macd_series, rsi5, rsi13, rsi21),
),
)
def _detect_abc(self, cycle: str, candles: list[Candle]) -> AbcStructure:
if len(candles) < 10:
return AbcStructure(
cycle=cycle,
direction="neutral",
status="data_insufficient",
reasoning=["K线数量不足无法识别ABC结构。"],
)
if cycle in {"week", "day"}:
return self._detect_large_cycle_abc(cycle, candles)
return self._detect_intraday_abc(cycle, candles)
def _detect_large_cycle_abc(self, cycle: str, candles: list[Candle]) -> AbcStructure:
anchor = self._select_dominant_anchor(candles, 60 if cycle == "week" else 120)
a_point = AbcPoint(label="A", timestamp=anchor.start_time, price=round(anchor.start_price, 2), k_index=anchor.start_index)
b_point = AbcPoint(label="B", timestamp=anchor.end_time, price=round(anchor.end_price, 2), k_index=anchor.end_index)
rebound_index = self._index_of_max_high(candles, anchor.end_index, len(candles) - 1)
if anchor.move_direction != "down" or rebound_index <= anchor.end_index:
return AbcStructure(
cycle=cycle,
direction="neutral",
status="candidate_only",
a_point=a_point,
b_point=b_point,
reasoning=[
f"{cycle} 主波段按 {anchor.start_price:.2f} -> {anchor.end_price:.2f} 固定识别。",
"当前只确认了大周期主下跌波段,低点后的反抽结构仍在形成中。",
],
)
c_point = AbcPoint(
label="C",
timestamp=candles[rebound_index].timestamp,
price=round(candles[rebound_index].high, 2),
k_index=rebound_index,
)
return AbcStructure(
cycle=cycle,
direction="bullish",
status="candidate_only",
a_point=a_point,
b_point=b_point,
c_point=c_point,
reasoning=[
f"{cycle} 主波段固定按 {anchor.start_price:.2f} -> {anchor.end_price:.2f} 识别,不再漂移到别的历史波段。",
f"低点后的反抽高点暂按 C 点处理:{c_point.price:.2f}",
"大周期更适合作为修复/反转框架是否完成反转仍以关键黄金位和后续K线确认。",
],
)
def _detect_intraday_abc(self, cycle: str, candles: list[Candle]) -> AbcStructure:
anchor = self._select_active_intraday_swing(candles, cycle)
if anchor is None:
return AbcStructure(
cycle=cycle,
direction="neutral",
status="candidate_only",
reasoning=["当前小周期未识别到稳定有效波段。"],
)
a_point = AbcPoint(label="A", timestamp=anchor.start_time, price=round(anchor.start_price, 2), k_index=anchor.start_index)
b_point = AbcPoint(label="B", timestamp=anchor.end_time, price=round(anchor.end_price, 2), k_index=anchor.end_index)
if anchor.move_direction == "up":
c_index = self._index_of_min_low(candles, anchor.end_index, len(candles) - 1)
c_point = None
status = "candidate_only"
if c_index > anchor.end_index:
c_point = AbcPoint(
label="C",
timestamp=candles[c_index].timestamp,
price=round(candles[c_index].low, 2),
k_index=c_index,
)
if candles[c_index].low > anchor.start_price:
status = "confirmed"
return AbcStructure(
cycle=cycle,
direction="bullish",
status=status,
a_point=a_point,
b_point=b_point,
c_point=c_point,
reasoning=[
f"{cycle} 当前有效上升波段按 {anchor.start_price:.2f} -> {anchor.end_price:.2f} 识别。",
"空间测算围绕当前有效上升段做回撤分析,更贴近明日支撑/承压位。",
],
)
c_index = self._index_of_max_high(candles, anchor.end_index, len(candles) - 1)
c_point = None
status = "candidate_only"
if c_index > anchor.end_index:
c_point = AbcPoint(
label="C",
timestamp=candles[c_index].timestamp,
price=round(candles[c_index].high, 2),
k_index=c_index,
)
if candles[c_index].high < anchor.start_price:
status = "confirmed"
return AbcStructure(
cycle=cycle,
direction="bearish",
status=status,
a_point=a_point,
b_point=b_point,
c_point=c_point,
reasoning=[
f"{cycle} 当前有效下跌波段按 {anchor.start_price:.2f} -> {anchor.end_price:.2f} 识别。",
"空间测算围绕当前有效下跌段做反弹分析,更贴近明日反抽位和反转观察位。",
],
)
def _build_fibonacci_space(self, cycle: str, candles: list[Candle], abc: AbcStructure) -> FibonacciSpace:
if not candles:
return FibonacciSpace(
cycle=cycle,
anchor_start_label="na",
anchor_start_time="",
anchor_start_price=0.0,
anchor_end_label="na",
anchor_end_time="",
anchor_end_price=0.0,
levels=[],
current_position_summary="当前周期暂无可用K线数据无法计算黄金位。",
)
if cycle in {"week", "day"}:
anchor = self._select_dominant_anchor(candles, 60 if cycle == "week" else 120)
else:
anchor = self._select_active_intraday_swing(candles, cycle) or self._select_recent_range_anchor(candles)
current_price = candles[-1].close
move = anchor.end_price - anchor.start_price
levels: list[FibonacciLevel] = []
for ratio in self.SPACE_RATIOS:
value = anchor.end_price - move * ratio if move >= 0 else anchor.end_price + abs(move) * ratio
value = round(value, 2)
levels.append(
FibonacciLevel(
ratio=ratio,
label=str(ratio),
value=value,
distance_to_price=round(current_price - value, 2),
)
)
nearest = min(levels, key=lambda item: abs(item.distance_to_price or 0))
comment = self._reversal_comment(levels, current_price, anchor.move_direction)
return FibonacciSpace(
cycle=cycle,
anchor_start_label=anchor.start_label,
anchor_start_time=anchor.start_time,
anchor_start_price=round(anchor.start_price, 2),
anchor_end_label=anchor.end_label,
anchor_end_time=anchor.end_time,
anchor_end_price=round(anchor.end_price, 2),
levels=levels,
current_position_summary=f"当前价格最接近 {nearest.label} 位,距离 {nearest.distance_to_price:.2f}{comment}",
)
def _build_fibonacci_time(self, cycle: str, candles: list[Candle], abc: AbcStructure) -> FibonacciTime:
if not candles:
return FibonacciTime(
cycle=cycle,
start_point_time="",
start_point_label="na",
current_count=0,
current_hit=[],
next_key_counts=self.TIME_COUNTS[:3],
next_window_summary="当前周期暂无可用K线数据无法计算时间斐波那契。",
)
start_index, start_label = self._select_time_start(cycle, candles, abc)
start_time = candles[start_index].timestamp
current_count = len(candles) - start_index
current_hit = [item for item in self.TIME_COUNTS if item == current_count]
next_counts = [item for item in self.TIME_COUNTS if item > current_count][:3]
summary = f"起点按 {start_label} 计数起点K线本身算第 1 根,当前位于第 {current_count} 根K线"
if next_counts:
summary += f",下一时间窗关注 {' / '.join(map(str, next_counts))}"
else:
summary += ",已超过当前预设时间窗范围"
return FibonacciTime(
cycle=cycle,
start_point_time=start_time,
start_point_label=start_label,
current_count=current_count,
current_hit=current_hit,
next_key_counts=next_counts,
next_window_summary=summary,
)
def _build_time_sequence_track(
self,
cycle: str,
candles: list[Candle],
start_index: int,
start_label: str,
track_type: str,
) -> TimeSequenceTrack:
if not candles:
return TimeSequenceTrack(
track_type=track_type,
cycle=cycle,
start_point_time="",
start_point_label="na",
current_count=0,
current_hit=[],
next_key_counts=self.TIME_COUNTS[:3],
next_window_summary="当前周期暂无可用K线数据无法计算时间序列。",
)
safe_start_index = min(max(start_index, 0), len(candles) - 1)
start_time = candles[safe_start_index].timestamp
current_count = len(candles) - safe_start_index
current_hit = [item for item in self.TIME_COUNTS if item == current_count]
next_counts = [item for item in self.TIME_COUNTS if item > current_count][:3]
summary = f"{track_type} 起点按 {start_label} 计数,当前运行到第 {current_count}"
if next_counts:
summary += f",下一窗口关注 {' / '.join(map(str, next_counts))}"
else:
summary += ",已超过当前预设时间窗口范围"
return TimeSequenceTrack(
track_type=track_type,
cycle=cycle,
start_point_time=start_time,
start_point_label=start_label,
current_count=current_count,
current_hit=current_hit,
next_key_counts=next_counts,
next_window_summary=summary,
)
def _build_time_sequence_bundle(
self,
cycle: str,
candles: list[Candle],
abc: AbcStructure,
) -> TimeSequenceBundle:
major_start_index, major_start_label = self._select_time_start(cycle, candles, abc)
major = self._build_time_sequence_track(cycle, candles, major_start_index, major_start_label, "major")
minor_anchor = self._select_minor_time_start(cycle, candles, abc)
minor = None
if minor_anchor is not None:
minor = self._build_time_sequence_track(cycle, candles, minor_anchor[0], minor_anchor[1], "minor")
return TimeSequenceBundle(major=major, minor=minor)
def _build_resonance(
self,
cycles: list[CycleSummary],
fib_spaces: list[FibonacciSpace],
fib_times: list[FibonacciTime],
abc_structures: list[AbcStructure],
) -> list[ResonanceItem]:
items: list[ResonanceItem] = []
time_hits = [item for item in fib_times if item.current_hit]
if len(time_hits) >= 2:
items.append(
ResonanceItem(
level="strong",
type="time",
cycles=[item.cycle for item in time_hits],
summary="多个分钟周期同时命中时间斐波那契窗口。",
bias="neutral",
)
)
space_ready = []
for item in fib_spaces:
level_0191 = next((entry for entry in item.levels if entry.label == "0.191"), None)
if level_0191 and level_0191.distance_to_price is not None:
if abs(level_0191.distance_to_price) <= max(8, level_0191.value * 0.005):
space_ready.append(item.cycle)
if len(space_ready) >= 2:
items.append(
ResonanceItem(
level="medium",
type="space",
cycles=space_ready,
summary="多个周期同时靠近 0.191 初步反转确认位。",
bias="bullish",
)
)
candidate_abc = [item.cycle for item in abc_structures if item.status in {"candidate_only", "confirmed"}]
if len(candidate_abc) >= 3:
items.append(
ResonanceItem(
level="medium",
type="abc",
cycles=candidate_abc,
summary="多个周期均给出了可复现的ABC候选结构。",
bias="neutral",
)
)
bullish_cycles = [item.cycle for item in cycles if "上升" in item.trend_label or "多头" in item.ma_status]
if len(bullish_cycles) >= 3:
items.append(
ResonanceItem(
level="medium",
type="trend",
cycles=bullish_cycles,
summary="多个周期趋势和均线状态同时转强。",
bias="bullish",
)
)
indicator_cycles = [
item.cycle
for item in cycles
if any(keyword in item.indicator_snapshot.signal_summary for keyword in ("金叉", "多头", "由弱转强", "超卖修复"))
]
if len(indicator_cycles) >= 3:
items.append(
ResonanceItem(
level="strong",
type="trend",
cycles=indicator_cycles,
summary="多个周期的 MACD / RSI 同步转强,指标共振增强。",
bias="bullish",
)
)
if not items:
items.append(
ResonanceItem(
level="normal",
type="trend",
cycles=["day"],
summary="当前未形成明显共振先观察关键黄金位与K线确认。",
bias="neutral",
)
)
return items
def _build_tomorrow_strategy(
self, fib_spaces: list[FibonacciSpace], fib_times: list[FibonacciTime], resonance: list[ResonanceItem]
) -> list[str]:
result = [f"{item.cycle}{item.next_window_summary}" for item in fib_times]
minute_space = next((item for item in fib_spaces if item.cycle == "120m"), None) or next(
(item for item in fib_spaces if item.cycle == "30m"),
None,
)
if minute_space and minute_space.levels:
focus = []
for label in ("0.382", "0.5", "0.618"):
level = next((entry for entry in minute_space.levels if entry.label == label), None)
if level:
focus.append(f"{label}{level.value:.2f}")
if focus:
result.append(f"120m明日优先观察 {' / '.join(focus)} 附近的承压或支撑反馈。")
if any(item.type == "time" for item in resonance):
result.append("分钟周期已出现时间共振明日更适合等关键时间窗内的K线确认后再决策。")
result.append("若价格靠近关键黄金位,同时进入 13 / 21 / 34 时间窗,优先观察放量突破还是冲高回落。")
return result[:6]
def _build_follow_strategy(
self, cycles: list[CycleSummary], fib_spaces: list[FibonacciSpace], abc_structures: list[AbcStructure]
) -> list[str]:
result: list[str] = []
for cycle_name in ("week", "day"):
cycle = next((item for item in cycles if item.cycle == cycle_name), None)
fib = next((item for item in fib_spaces if item.cycle == cycle_name), None)
abc = next((item for item in abc_structures if item.cycle == cycle_name), None)
if cycle:
result.append(
f"{cycle_name}{cycle.trend_label}{cycle.ma_status};指标状态:{cycle.indicator_snapshot.signal_summary}"
)
if fib and fib.levels:
for label, text in (
("0.191", "初步反转确认位"),
("0.382", "初步恢复位"),
("0.5", "中轴位"),
("0.618", "强弱分界位"),
("0.809", "深恢复/强反转确认位"),
):
level = next((item for item in fib.levels if item.label == label), None)
if level:
result.append(f"{cycle_name}{text}{level.value:.2f}")
if abc:
result.append(f"{cycle_name}ABC 状态为 {abc.status},请结合 calculation_steps 中的起点说明理解。")
return result[:8]
def _build_signal_conclusion(
self,
cycles: list[CycleSummary],
resonance: list[ResonanceItem],
fib_spaces: list[FibonacciSpace],
fib_times: list[FibonacciTime],
) -> list[str]:
result: list[str] = []
weekly = next((item for item in cycles if item.cycle == "week"), None)
daily = next((item for item in cycles if item.cycle == "day"), None)
short_cycle = next((item for item in cycles if item.cycle == "30m"), None) or next(
(item for item in cycles if item.cycle == "60m"),
None,
)
if weekly:
result.append(f"周线结论:{weekly.trend_label}{weekly.indicator_snapshot.signal_summary}")
if daily:
result.append(f"日线结论:{daily.trend_label}{daily.indicator_snapshot.signal_summary}")
if short_cycle:
result.append(f"短线节奏:{short_cycle.cycle} {short_cycle.indicator_snapshot.signal_summary}")
strong_resonance = [item.summary for item in resonance if item.level in {"strong", "very_strong"}]
if strong_resonance:
result.append(f"共振结论:{''.join(strong_resonance[:2])}")
key_space = next((item for item in fib_spaces if item.cycle == "day"), None)
if key_space:
result.append(f"空间结论:{key_space.current_position_summary}")
key_time = next((item for item in fib_times if item.cycle == "120m"), None) or next(
(item for item in fib_times if item.cycle == "30m"),
None,
)
if key_time:
result.append(f"时间结论:{key_time.next_window_summary}")
return result[:6]
def _build_conclusion_summary(
self,
cycles: list[CycleSummary],
resonance: list[ResonanceItem],
fib_spaces: list[FibonacciSpace],
) -> ConclusionSummary:
weekly = next((item for item in cycles if item.cycle == "week"), None)
daily = next((item for item in cycles if item.cycle == "day"), None)
bullish_score = 0
bearish_score = 0
for cycle in (weekly, daily):
if cycle is None:
continue
if "上升" in cycle.trend_label or "多头" in cycle.ma_status or "由弱转强" in cycle.indicator_snapshot.signal_summary:
bullish_score += 2
if "下降" in cycle.trend_label or "空头" in cycle.ma_status or "死叉" in cycle.indicator_snapshot.signal_summary:
bearish_score += 2
for item in resonance:
if item.bias == "bullish":
bullish_score += 1
elif item.bias == "bearish":
bearish_score += 1
bias = "neutral"
if bullish_score > bearish_score + 1:
bias = "bullish"
elif bearish_score > bullish_score + 1:
bias = "bearish"
confidence = min(92, 48 + (abs(bullish_score - bearish_score) * 8) + (len(resonance) * 4))
stage = "等待确认"
headline = "多周期仍在等待关键位确认"
summary = "当前更适合将空间位、时间窗和指标共振合并观察,不宜单凭一个信号直接下结论。"
tags = ["多周期", "时空分析"]
day_space = next((item for item in fib_spaces if item.cycle == "day"), None)
if bias == "bullish":
stage = "修复推进"
headline = "大周期修复结构延续,观察突破确认"
summary = "周线与日线偏向修复,若分钟级同步站稳关键位,短线更容易进入向上确认。"
tags.extend(["偏多", "观察突破"])
elif bias == "bearish":
stage = "下跌修复"
headline = "大周期仍偏弱,当前先按修复反弹处理"
summary = "周线与日线尚未完成强反转确认,若关键位受压或分钟级转弱,优先按反弹后的再整理看待。"
tags.extend(["偏弱", "先看修复"])
if day_space:
level_0191 = next((item for item in day_space.levels if item.label == "0.191"), None)
level_0809 = next((item for item in day_space.levels if item.label == "0.809"), None)
if level_0191 and abs(level_0191.distance_to_price or 0) <= max(8, level_0191.value * 0.005):
tags.append("临近0.191触发位")
if level_0809 and (level_0809.distance_to_price or 0) <= 0:
tags.append("进入0.809确认区")
return ConclusionSummary(
stage=stage,
bias=bias,
confidence=confidence,
headline=headline,
summary=summary,
tags=tags[:5],
)
def _build_alerts(
self,
cycles: list[CycleSummary],
resonance: list[ResonanceItem],
fib_spaces: list[FibonacciSpace],
fib_times: list[FibonacciTime],
) -> list[AlertItem]:
alerts: list[AlertItem] = []
for item in resonance[:4]:
alerts.append(
AlertItem(
level=item.level,
trigger_type=item.type,
title=f"{item.type}共振提醒",
summary=item.summary,
action="保持该周期联动观察,等待价格或时间窗确认。",
)
)
day_space = next((item for item in fib_spaces if item.cycle == "day"), None)
if day_space:
for label, title, action in (
("0.191", "初步反转触发位", "若收盘站稳,可将结论从修复上调到初步反转。"),
("0.5", "中轴博弈位", "若放量突破,中期重心上移;若冲高回落,维持震荡判断。"),
("0.618", "强弱分界位", "若跌破则减弱短线预期,若止跌则继续观察修复。"),
("0.809", "强反转确认位", "若突破并站稳,可上调为强反转跟踪。"),
):
level = next((entry for entry in day_space.levels if entry.label == label), None)
if level is None:
continue
alerts.append(
AlertItem(
level="medium" if label in {"0.191", "0.618"} else "strong",
trigger_type="space",
title=f"{title} {level.value:.2f}",
summary=f"当前价格距 {label}{level.distance_to_price:.2f},该位置属于结论触发器。",
action=action,
)
)
for item in fib_times[:2]:
window = item.next_key_counts[0] if item.next_key_counts else item.current_count
alerts.append(
AlertItem(
level="normal",
trigger_type="time",
title=f"{item.cycle} 时间窗预警",
summary=f"当前为第 {item.current_count} 根,下一关键窗口为 {window}",
action="到达关键根数后,观察是否出现确认阳线或确认阴线。",
)
)
return alerts[:8]
def _build_evidence_chain(
self,
cycles: list[CycleSummary],
abc_structures: list[AbcStructure],
fib_spaces: list[FibonacciSpace],
fib_times: list[FibonacciTime],
resonance: list[ResonanceItem],
) -> list[EvidenceItem]:
evidence: list[EvidenceItem] = []
for cycle in cycles[:4]:
evidence.append(
EvidenceItem(
title=f"{cycle.cycle} 结构与指标",
detail=f"{cycle.trend_label}{cycle.ma_status}{cycle.indicator_snapshot.signal_summary}",
cycles=[cycle.cycle],
score=72 if "上升" in cycle.trend_label or "下降" in cycle.trend_label else 58,
)
)
for abc in abc_structures[:4]:
if abc.a_point and abc.b_point:
detail = f"A={abc.a_point.price:.2f}B={abc.b_point.price:.2f}"
if abc.c_point:
detail += f"C={abc.c_point.price:.2f}"
evidence.append(
EvidenceItem(
title=f"{abc.cycle} ABC 识别依据",
detail=f"{abc.status}{detail}{' / '.join(abc.reasoning[:2])}",
cycles=[abc.cycle],
score=78 if abc.status == "confirmed" else 63,
)
)
for fib in fib_spaces[:3]:
level = min(fib.levels, key=lambda item: abs(item.distance_to_price or 0)) if fib.levels else None
if level:
evidence.append(
EvidenceItem(
title=f"{fib.cycle} 空间定位",
detail=f"当前最接近 {level.label}{level.value:.2f}{fib.current_position_summary}",
cycles=[fib.cycle],
score=68,
)
)
for item in resonance[:2]:
evidence.append(
EvidenceItem(
title="多周期共振",
detail=item.summary,
cycles=item.cycles,
score=82 if item.level in {"strong", "very_strong"} else 70,
)
)
if fib_times:
time_item = fib_times[0]
evidence.append(
EvidenceItem(
title=f"{time_item.cycle} 时间计数",
detail=time_item.next_window_summary,
cycles=[time_item.cycle],
score=66,
)
)
return evidence[:8]
def _build_strategy_scenarios(
self,
fib_spaces: list[FibonacciSpace],
fib_times: list[FibonacciTime],
cycles: list[CycleSummary],
) -> list[StrategyScenario]:
short_space = next((item for item in fib_spaces if item.cycle == "120m"), None) or next(
(item for item in fib_spaces if item.cycle == "30m"),
None,
)
day_space = next((item for item in fib_spaces if item.cycle == "day"), None)
short_time = next((item for item in fib_times if item.cycle == "30m"), None) or next(
(item for item in fib_times if item.cycle == "120m"),
None,
)
short_cycle = next((item for item in cycles if item.cycle == "30m"), None) or next(
(item for item in cycles if item.cycle == "60m"),
None,
)
level_0191 = self._find_level(day_space, "0.191")
level_05 = self._find_level(short_space, "0.5")
level_0618 = self._find_level(short_space, "0.618")
level_0809 = self._find_level(day_space, "0.809")
time_desc = short_time.next_window_summary if short_time else "关注下一关键时间窗"
short_signal = short_cycle.indicator_snapshot.signal_summary if short_cycle else "等待分钟级指标确认"
scenarios = [
StrategyScenario(
key="A",
title="强势路径",
trigger_condition=f"若价格站上 {level_0191.value:.2f} 并在时间窗内保持强势。" if level_0191 else "若价格突破关键恢复位。",
system_view=f"系统将结论上调为修复推进,重点看 {short_signal}",
user_action="回踩不破时以观察跟随为主,不追高,等待放量确认。",
next_watch=f"下一观察位 {level_0809.value:.2f}" if level_0809 else "下一观察位看更高一级压力位。",
),
StrategyScenario(
key="B",
title="震荡路径",
trigger_condition=f"若价格围绕 {level_05.value:.2f} 附近反复拉锯。" if level_05 else "若价格维持中轴震荡。",
system_view=f"系统维持震荡整理,{time_desc}",
user_action="等支撑和压力两端反馈,不在中间位置重仓表态。",
next_watch=f"支撑观察 {level_0618.value:.2f}" if level_0618 else "继续观察 0.618 一带反馈。",
),
StrategyScenario(
key="C",
title="失败路径",
trigger_condition=f"若跌回 {level_0618.value:.2f} 下方并伴随分钟级转弱。" if level_0618 else "若跌破关键支撑。",
system_view="系统将结论降级为修复失败或弱势延续。",
user_action="先以防守和等待为主,等待下一次时间窗与支撑共振。",
next_watch="观察是否重新出现超卖修复与确认阳线。",
),
]
return scenarios
def _build_monitoring_tasks(
self,
fib_spaces: list[FibonacciSpace],
fib_times: list[FibonacciTime],
resonance: list[ResonanceItem],
cycles: list[CycleSummary],
) -> list[MonitoringTask]:
tasks: list[MonitoringTask] = []
day_space = next((item for item in fib_spaces if item.cycle == "day"), None)
short_time = next((item for item in fib_times if item.cycle == "30m"), None) or next(
(item for item in fib_times if item.cycle == "120m"),
None,
)
short_cycle = next((item for item in cycles if item.cycle == "15m"), None)
if day_space:
for label in ("0.191", "0.618", "0.809"):
level = self._find_level(day_space, label)
if level:
tasks.append(
MonitoringTask(
title=f"日线 {label} 触发器监控",
cadence="收盘前 + 关键异动时",
focus=f"监控价格与 {level.value:.2f} 的相对位置变化。",
trigger_condition="触碰、突破、跌破后刷新结论与策略分支。",
)
)
if short_time:
tasks.append(
MonitoringTask(
title=f"{short_time.cycle} 时间窗监控",
cadence="盘中滚动",
focus=f"当前第 {short_time.current_count} 根,跟踪下一关键窗口。",
trigger_condition="到达 13/21/34 等关键根数后判断是否出现确认浪。",
)
)
if short_cycle:
tasks.append(
MonitoringTask(
title="短周期指标修正监控",
cadence="每个新K线更新",
focus=short_cycle.indicator_snapshot.signal_summary,
trigger_condition="MACD 状态翻转或 RSI 穿越 50 后,自动修正结论等级。",
)
)
for item in resonance[:2]:
tasks.append(
MonitoringTask(
title=f"{item.type} 共振跟踪",
cadence="触发条件满足时",
focus=item.summary,
trigger_condition="若新增周期加入共振,则上调提示等级。",
)
)
return tasks[:6]
def _build_tool_presets(
self,
fib_spaces: list[FibonacciSpace],
abc_structures: list[AbcStructure],
) -> list[ToolPreset]:
presets: list[ToolPreset] = []
mapping = [
("黄金分割尺", "fibonacci_ruler", "day"),
("黄金扩展尺", "extension_ruler", "week"),
("波浪尺", "wave_ruler", "120m"),
("趋势线", "trend_line", "30m"),
("水平线", "horizontal_line", "day"),
("区间测量", "range_measure", "15m"),
]
for name, tool_type, cycle in mapping:
fib = next((item for item in fib_spaces if item.cycle == cycle), None)
abc = next((item for item in abc_structures if item.cycle == cycle), None)
anchors = []
if fib:
anchors = [f"{fib.anchor_start_label}:{fib.anchor_start_price:.2f}", f"{fib.anchor_end_label}:{fib.anchor_end_price:.2f}"]
elif abc and abc.a_point and abc.b_point:
anchors = [f"A:{abc.a_point.price:.2f}", f"B:{abc.b_point.price:.2f}"]
presets.append(
ToolPreset(
name=name,
cycle=cycle,
tool_type=tool_type,
mode="manual_ready" if tool_type in {"trend_line", "range_measure", "wave_ruler"} else "auto",
anchors=anchors,
summary=f"{cycle} 周期已生成 {name} 的自动锚点,可继续手动修正。",
)
)
return presets
def _build_calculation_steps(
self, symbol: SymbolInfo, abc_structures: list[AbcStructure], fib_spaces: list[FibonacciSpace], fib_times: list[FibonacciTime]
) -> list[str]:
steps = [f"标的 {symbol.name}({symbol.code}) 使用 secid={symbol.secid}"]
for fib in fib_spaces:
if fib.levels:
steps.append(
f"{fib.cycle}:空间位按 {fib.anchor_start_label}({fib.anchor_start_time}, {fib.anchor_start_price:.2f}) -> "
f"{fib.anchor_end_label}({fib.anchor_end_time}, {fib.anchor_end_price:.2f}) 计算。"
)
for abc in abc_structures:
if abc.a_point and abc.b_point:
text = f"{abc.cycle}ABC 状态为 {abc.status}A={abc.a_point.price:.2f}B={abc.b_point.price:.2f}"
if abc.c_point:
text += f"C={abc.c_point.price:.2f}"
steps.append(text + "")
else:
steps.append(f"{abc.cycle}当前仅能输出候选结构暂未形成稳定ABC。")
for fib in fib_times:
steps.append(
f"{fib.cycle}:时间从 {fib.start_point_label}({fib.start_point_time}) 起算起点K线本身记作第 1 根,当前为第 {fib.current_count} 根。"
)
steps.append("空间位统一输出 0.191 / 0.382 / 0.5 / 0.618 / 0.809 / 1.0 / 1.382 / 1.618 / 2.0 / 2.618。")
steps.append("反转判断分两层0.191 为初步反转确认位0.809 为深恢复/强反转确认位。")
return steps[:12]
def _build_chart_candles(self, candles: list[Candle]) -> list[ChartCandle]:
closes = [item.close for item in candles]
macd_series = self._macd_series(closes)
rsi5 = self._rsi_series(closes, 5)
rsi13 = self._rsi_series(closes, 13)
rsi21 = self._rsi_series(closes, 21)
result: list[ChartCandle] = []
for index, candle in enumerate(candles):
current = closes[: index + 1]
result.append(
ChartCandle(
timestamp=candle.timestamp,
open=candle.open,
close=candle.close,
high=candle.high,
low=candle.low,
volume=candle.volume,
ma5=self._ma(current, 5),
ma13=self._ma(current, 13),
ma21=self._ma(current, 21),
ma34=self._ma(current, 34),
ma55=self._ma(current, 55),
ma89=self._ma(current, 89),
dif=self._round_or_none(macd_series[index]["dif"]),
dea=self._round_or_none(macd_series[index]["dea"]),
macd_histogram=self._round_or_none(macd_series[index]["histogram"]),
rsi5=self._round_or_none(rsi5[index] if index < len(rsi5) else None),
rsi13=self._round_or_none(rsi13[index] if index < len(rsi13) else None),
rsi21=self._round_or_none(rsi21[index] if index < len(rsi21) else None),
)
)
return result
def _build_time_markers(
self,
candles: list[Candle],
time_sequences: TimeSequenceBundle | None,
) -> list[ChartTimeMarker]:
if time_sequences is None:
return []
markers: list[ChartTimeMarker] = []
for track in [time_sequences.major, time_sequences.minor]:
if track is None:
continue
start_index = next((idx for idx, candle in enumerate(candles) if candle.timestamp == track.start_point_time), None)
for target in track.current_hit + track.next_key_counts:
candle_index = None
if start_index is not None:
target_index = start_index + target - 1
candle_index = target_index if 0 <= target_index < len(candles) else None
markers.append(
ChartTimeMarker(
track_type=track.track_type,
target_count=target,
current_count=track.current_count,
candle_index=candle_index,
reached=target <= track.current_count,
label=f"{'' if track.track_type == 'major' else ''}T{target}",
)
)
return markers
def _build_price_markers(self, candles: list[Candle], fib_space: FibonacciSpace) -> list[ChartPriceMarker]:
if not candles:
return []
markers = [
ChartPriceMarker(
label="当前价",
value=round(candles[-1].close, 2),
kind="current",
emphasis="strong",
)
]
for label, kind, emphasis in (
("0.191", "trigger", "medium"),
("0.382", "support", "normal"),
("0.5", "trigger", "medium"),
("0.618", "support", "strong"),
("0.809", "trigger", "strong"),
("1.382", "target", "medium"),
):
level = self._find_level(fib_space, label)
if level:
markers.append(
ChartPriceMarker(
label=f"{label}",
value=level.value,
kind=kind,
emphasis=emphasis,
)
)
return markers
def _build_chart_tool_layers(
self,
cycle: str,
fib_space: FibonacciSpace,
abc: AbcStructure,
) -> list[ChartToolLayer]:
anchors = f"{fib_space.anchor_start_label}->{fib_space.anchor_end_label}"
layers = [
ChartToolLayer(
name="黄金分割尺",
tool_type="fibonacci_ruler",
mode="auto",
summary=f"已按 {anchors} 自动落图。",
),
ChartToolLayer(
name="黄金扩展尺",
tool_type="extension_ruler",
mode="auto",
summary="按当前主波段自动给出扩展目标位。",
),
ChartToolLayer(
name="波浪尺",
tool_type="wave_ruler",
mode="manual_ready",
summary="已根据 ABC 结构准备波浪尺锚点,可继续手动修正。",
),
ChartToolLayer(
name="趋势线",
tool_type="trend_line",
mode="manual_ready",
summary=f"{cycle} 周期趋势线已根据高低点生成候选路径。",
),
ChartToolLayer(
name="水平线",
tool_type="horizontal_line",
mode="auto",
summary="关键黄金位已同步生成水平线。",
),
ChartToolLayer(
name="区间测量",
tool_type="range_measure",
mode="manual_ready",
summary="可用来量化 A-B 或 B-C 的价格与时间长度。",
),
]
if abc.status == "confirmed":
layers[2] = ChartToolLayer(
name="波浪尺",
tool_type="wave_ruler",
mode="auto",
summary="ABC 已确认,波浪尺已自动锁定 A/B/C 三点。",
)
return layers
def _build_chart_signal_tags(
self,
cycle: str,
fib_space: FibonacciSpace,
fib_time: FibonacciTime | None,
abc: AbcStructure,
candles: list[Candle],
) -> list[str]:
tags = [f"{cycle} 图层联动"]
nearest = min(fib_space.levels, key=lambda item: abs(item.distance_to_price or 0)) if fib_space.levels else None
if nearest:
tags.append(f"最接近 {nearest.label}")
if fib_time:
tags.append(f"当前第 {fib_time.current_count}")
if fib_time.current_hit:
tags.append(f"命中 T{'/'.join(map(str, fib_time.current_hit))}")
if abc.status == "confirmed":
tags.append("ABC 已确认")
elif abc.status == "candidate_only":
tags.append("ABC 候选")
if candles:
latest = candles[-1]
if latest.close >= latest.open:
tags.append("最新K线收阳")
else:
tags.append("最新K线收阴")
return tags[:6]
def _select_dominant_anchor(self, candles: list[Candle], lookback: int) -> SwingAnchor:
scoped_start = max(len(candles) - lookback, 0)
high_index = scoped_start
best_pair: tuple[int, int] | None = None
best_move = float("-inf")
for index in range(scoped_start + 1, len(candles)):
if candles[index - 1].high > candles[high_index].high:
high_index = index - 1
move = candles[high_index].high - candles[index].low
if high_index < index and move > best_move:
best_pair = (high_index, index)
best_move = move
if best_pair is None:
low_index = min(range(scoped_start, len(candles)), key=lambda idx: candles[idx].low)
high_after_low = max(range(low_index, len(candles)), key=lambda idx: candles[idx].high)
return SwingAnchor(
start_index=low_index,
end_index=high_after_low,
start_label="major_low",
end_label="major_high",
start_price=candles[low_index].low,
end_price=candles[high_after_low].high,
start_time=candles[low_index].timestamp,
end_time=candles[high_after_low].timestamp,
move_direction="up",
)
start_index, end_index = best_pair
return SwingAnchor(
start_index=start_index,
end_index=end_index,
start_label="major_high",
end_label="major_low",
start_price=candles[start_index].high,
end_price=candles[end_index].low,
start_time=candles[start_index].timestamp,
end_time=candles[end_index].timestamp,
move_direction="down",
)
def _select_active_intraday_swing(self, candles: list[Candle], cycle: str) -> SwingAnchor | None:
if len(candles) < 8:
return None
lookback = {"15m": 36, "30m": 30, "60m": 24, "90m": 22, "120m": 20}.get(cycle, 20)
start = max(len(candles) - lookback, 0)
low_index = min(range(start, len(candles)), key=lambda idx: candles[idx].low)
high_after_low = self._index_of_max_high(candles, low_index, len(candles) - 1)
if high_after_low > low_index and candles[high_after_low].high > candles[low_index].low:
return SwingAnchor(
start_index=low_index,
end_index=high_after_low,
start_label="swing_low",
end_label="swing_high",
start_price=candles[low_index].low,
end_price=candles[high_after_low].high,
start_time=candles[low_index].timestamp,
end_time=candles[high_after_low].timestamp,
move_direction="up",
)
high_index = max(range(start, len(candles)), key=lambda idx: candles[idx].high)
low_after_high = self._index_of_min_low(candles, high_index, len(candles) - 1)
if low_after_high > high_index and candles[high_index].high > candles[low_after_high].low:
return SwingAnchor(
start_index=high_index,
end_index=low_after_high,
start_label="swing_high",
end_label="swing_low",
start_price=candles[high_index].high,
end_price=candles[low_after_high].low,
start_time=candles[high_index].timestamp,
end_time=candles[low_after_high].timestamp,
move_direction="down",
)
return None
def _select_recent_range_anchor(self, candles: list[Candle]) -> SwingAnchor:
start = max(len(candles) - 16, 0)
high_index = max(range(start, len(candles)), key=lambda idx: candles[idx].high)
low_index = min(range(start, len(candles)), key=lambda idx: candles[idx].low)
if low_index < high_index:
return SwingAnchor(
start_index=low_index,
end_index=high_index,
start_label="range_low",
end_label="range_high",
start_price=candles[low_index].low,
end_price=candles[high_index].high,
start_time=candles[low_index].timestamp,
end_time=candles[high_index].timestamp,
move_direction="up",
)
return SwingAnchor(
start_index=high_index,
end_index=low_index,
start_label="range_high",
end_label="range_low",
start_price=candles[high_index].high,
end_price=candles[low_index].low,
start_time=candles[high_index].timestamp,
end_time=candles[low_index].timestamp,
move_direction="down",
)
def _select_time_start(self, cycle: str, candles: list[Candle], abc: AbcStructure) -> tuple[int, str]:
if cycle in {"15m", "30m", "60m", "90m", "120m"}:
anchor = self._select_active_intraday_swing(candles, cycle)
if anchor is not None:
return anchor.start_index, anchor.start_label
if abc.a_point:
return abc.a_point.k_index, abc.a_point.label
return max(len(candles) - 13, 0), "fallback"
def _select_minor_time_start(self, cycle: str, candles: list[Candle], abc: AbcStructure) -> tuple[int, str] | None:
if cycle not in {"15m", "30m", "60m", "90m", "120m"} or len(candles) < 6:
return None
if abc.direction == "bullish" and abc.b_point is not None:
for index in range(abc.b_point.k_index + 1, len(candles)):
previous = candles[index - 1]
current = candles[index]
if current.close > previous.high:
return index, "minor_turn_up"
if abc.direction == "bearish" and abc.b_point is not None:
for index in range(abc.b_point.k_index + 1, len(candles)):
previous = candles[index - 1]
current = candles[index]
if current.close < previous.low:
return index, "minor_turn_down"
if abc.c_point is not None:
fallback_index = min(abc.c_point.k_index + 1, len(candles) - 1)
return fallback_index, "minor_after_c"
return None
def _index_of_max_high(self, candles: list[Candle], start: int, end: int) -> int:
return max(range(start, end + 1), key=lambda idx: candles[idx].high)
def _index_of_min_low(self, candles: list[Candle], start: int, end: int) -> int:
return min(range(start, end + 1), key=lambda idx: candles[idx].low)
def _reversal_comment(self, levels: list[FibonacciLevel], current_price: float, move_direction: str) -> str:
level_0191 = next((item for item in levels if item.label == "0.191"), None)
level_0809 = next((item for item in levels if item.label == "0.809"), None)
if level_0191 is None:
return "当前缺少关键反转位,暂不做反转判断。"
if move_direction == "down":
if level_0809 and current_price >= level_0809.value:
return "价格已进入 0.809 深恢复区,可按强反转确认观察。"
if current_price >= level_0191.value:
return "价格已越过 0.191 初步反转确认位,可按初步反转观察。"
return "价格仍位于 0.191 初步反转确认位之下,当前更偏向修复或震荡。"
level_0382 = next((item for item in levels if item.label == "0.382"), None)
level_0618 = next((item for item in levels if item.label == "0.618"), None)
if level_0382 and current_price >= level_0382.value:
return "价格仍处于上升波段浅回撤区间内,趋势尚未明显转弱。"
if level_0618 and current_price >= level_0618.value:
return "价格正在测试上升波段黄金回撤区,关注是否止跌。"
return "价格已回到深回撤区,需观察是否继续下探或形成新的支撑。"
def _indicator_summary(
self,
macd_series: list[dict[str, float | None]],
rsi5: list[float | None],
rsi13: list[float | None],
rsi21: list[float | None],
) -> str:
latest_macd = macd_series[-1] if macd_series else {"dif": None, "dea": None, "histogram": None}
latest_rsi5 = rsi5[-1] if rsi5 else None
latest_rsi13 = rsi13[-1] if rsi13 else None
latest_rsi21 = rsi21[-1] if rsi21 else None
signals: list[str] = []
dif = latest_macd["dif"]
dea = latest_macd["dea"]
hist = latest_macd["histogram"]
if dif is not None and dea is not None:
signals.append("MACD金叉" if dif >= dea else "MACD死叉")
if hist is not None:
if hist > 0:
signals.append("红柱扩张" if len(macd_series) >= 2 and (macd_series[-2]["histogram"] or 0) < hist else "红柱运行")
elif hist < 0:
signals.append("绿柱收缩" if len(macd_series) >= 2 and (macd_series[-2]["histogram"] or 0) < hist else "绿柱运行")
if latest_rsi5 is not None:
if latest_rsi5 < 20:
signals.append("短线超卖")
elif latest_rsi5 > 80:
signals.append("短线超买")
if latest_rsi13 is not None and latest_rsi21 is not None:
if latest_rsi13 >= 50 and latest_rsi21 >= 50:
signals.append("由弱转强")
elif latest_rsi13 < 50 and latest_rsi21 < 50:
signals.append("仍处弱势")
if not signals:
return "指标信号不足"
return "".join(signals[:4])
def _ema_series(self, values: list[float], period: int) -> list[float]:
if not values:
return []
multiplier = 2 / (period + 1)
result = [values[0]]
for value in values[1:]:
result.append((value - result[-1]) * multiplier + result[-1])
return result
def _macd_series(self, closes: list[float]) -> list[dict[str, float | None]]:
if not closes:
return []
ema12 = self._ema_series(closes, 12)
ema26 = self._ema_series(closes, 26)
dif = [short - long for short, long in zip(ema12, ema26)]
dea = self._ema_series(dif, 9)
return [
{
"dif": dif_value,
"dea": dea_value,
"histogram": (dif_value - dea_value) * 2,
}
for dif_value, dea_value in zip(dif, dea)
]
def _rsi_series(self, closes: list[float], period: int) -> list[float | None]:
if not closes:
return []
if len(closes) == 1:
return [None]
gains = [0.0]
losses = [0.0]
for index in range(1, len(closes)):
delta = closes[index] - closes[index - 1]
gains.append(max(delta, 0.0))
losses.append(abs(min(delta, 0.0)))
result: list[float | None] = []
avg_gain = 0.0
avg_loss = 0.0
for index in range(len(closes)):
if index < period:
result.append(None)
continue
if index == period:
avg_gain = mean(gains[1 : period + 1])
avg_loss = mean(losses[1 : period + 1])
else:
avg_gain = ((avg_gain * (period - 1)) + gains[index]) / period
avg_loss = ((avg_loss * (period - 1)) + losses[index]) / period
if avg_loss == 0:
result.append(100.0)
continue
rs = avg_gain / avg_loss
result.append(100 - (100 / (1 + rs)))
return result
def _round_or_none(self, value: float | None, digits: int = 2) -> float | None:
if value is None:
return None
return round(value, digits)
def _find_level(self, fib_space: FibonacciSpace | None, label: str) -> FibonacciLevel | None:
if fib_space is None:
return None
return next((item for item in fib_space.levels if item.label == label), None)
def _ma(self, values: list[float], period: int) -> float | None:
if len(values) < period:
return None
return round(mean(values[-period:]), 2)
def _trend_label(self, closes: list[float]) -> str:
if len(closes) < 8:
return "数据不足"
recent = closes[-5:]
earlier = closes[-10:-5] if len(closes) >= 10 else closes[:-5]
if recent and earlier and mean(recent) > mean(earlier) * 1.01:
return "上升趋势"
if recent and earlier and mean(recent) < mean(earlier) * 0.99:
return "下降趋势"
return "震荡整理"
def _ma_status(self, close: float | None, ma: MaValues) -> str:
if close is None:
return "暂无均线判断"
values = [item for item in [ma.ma5, ma.ma13, ma.ma21, ma.ma34] if item is not None]
if values and close > max(values):
return "价格位于短中期均线上方,多头占优"
if values and close < min(values):
return "价格位于短中期均线下方,空头占优"
return "价格与均线缠绕,等待方向确认"
def _volume_status(self, candles: list[Candle]) -> str:
if len(candles) < 6:
return "成交量样本不足"
latest = candles[-1].volume
recent = mean([item.volume for item in candles[-5:-1]])
if latest > recent * 1.25:
return "最近一根K线明显放量"
if latest < recent * 0.8:
return "最近一根K线明显缩量"
return "成交量整体平稳"
def _safe_float(self, value: Any, *, divide: float = 1) -> float:
try:
return float(value) / divide
except Exception:
return 0.0
analysis_service = AnalysisService()