Coverage for core/trading/engines/trading_session_engine.py: 30.64%
408 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-13 18:58 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-13 18:58 +0000
1"""
2交易会话引擎 - 统一调度数据流
3"""
5import asyncio
6import threading
7import time
8from datetime import datetime, timedelta
9from decimal import Decimal
10from typing import Any, Dict, List, Optional
12from core.data_source.adapters.data_source_adapter import \
13 QuoteDataSourceAdapter
14from core.data_source.adapters.quote_adapter import QuoteAdapter
15from core.models.broker import FeeConfig
16from core.models.trading import (AssetMode, MarketData, StrategyContext,
17 TradingMode)
18from core.trading.engines.simulation_engine import SimulationEngine
19from core.trading.engines.strategy_engine import StrategyEngine
20from core.trading.engines.trading_engine import TradingEngine
21from core.trading.strategies import create_strategy
23from .time_series_controller import TimeSeriesController
26class TradingSessionEngine:
27 """量化交易系统 - 统一调度数据流,协调策略、交易模块、模拟交易模块"""
29 def __init__(self, user_id: str, session_id: str, session_config: Dict[str, Any]):
30 self.user_id = user_id
31 self.session_id = session_id
32 self.session_config = session_config
34 # 交易模式
35 self.trading_mode = TradingMode(session_config.get("trading_mode", "realtime"))
36 self.asset_mode = AssetMode(session_config.get("asset_mode", "simulation"))
38 # 核心组件
39 self.trading_engine: Optional[TradingEngine] = (
40 None # 交易模块:处理策略触发的交易信号
41 )
42 self.strategy_engine: Optional[StrategyEngine] = (
43 None # 策略引擎:被动接收数据,产生交易信号
44 )
45 self.simulation_engine: Optional[SimulationEngine] = (
46 None # 模拟交易模块:主动推送数据,模拟成交
47 )
49 # 数据源适配器
50 self.quote_adapter = QuoteAdapter(user_id)
51 self.quote_data_adapter = QuoteDataSourceAdapter(user_id)
53 # 状态管理
54 self.is_running = False
55 self.is_paused = False
56 self.start_time: Optional[datetime] = None
57 self.end_time: Optional[datetime] = None
59 # 数据订阅
60 self.subscribed_symbols: List[str] = []
61 self.last_market_data_time = datetime.now()
63 # 回测相关
64 self.current_time: Optional[datetime] = None
65 self.backtest_start_time: Optional[datetime] = None
66 self.backtest_end_time: Optional[datetime] = None
67 self.time_series_controller: Optional[TimeSeriesController] = None
69 # 线程管理
70 self.data_thread: Optional[threading.Thread] = None
71 self.stop_event = threading.Event()
73 def initialize(self) -> bool:
74 """初始化交易会话引擎"""
75 try:
76 # 1. 创建交易引擎
77 from core.trading.engines import create_trading_engine
79 # 获取初始资金 - 支持多币种
80 initial_capital = Decimal("10000") # 默认值
81 currency = "USD" # 默认币种
83 if "initial_capital_by_currency" in self.session_config:
84 # 优先使用USD资金
85 usd_capital = self.session_config["initial_capital_by_currency"].get(
86 "USD", {}
87 )
88 if (
89 "cash_assets" in usd_capital
90 and float(usd_capital["cash_assets"]) > 0
91 ):
92 initial_capital = Decimal(str(usd_capital["cash_assets"]))
93 currency = "USD"
94 else:
95 # 如果没有USD资金,使用HKD资金
96 hkd_capital = self.session_config[
97 "initial_capital_by_currency"
98 ].get("HKD", {})
99 if (
100 "cash_assets" in hkd_capital
101 and float(hkd_capital["cash_assets"]) > 0
102 ):
103 initial_capital = Decimal(str(hkd_capital["cash_assets"]))
104 currency = "HKD"
105 elif "initial_capital" in self.session_config:
106 initial_capital = Decimal(str(self.session_config["initial_capital"]))
108 self._send_websocket_log(
109 f"【量化交易系统】💰 设置初始资金: {initial_capital} {currency}"
110 )
112 # 获取费用配置(从会话配置或使用默认配置)
113 fee_config = None
114 if "fee_config" in self.session_config:
115 fee_config_dict = self.session_config["fee_config"]
116 try:
117 fee_config = FeeConfig(**fee_config_dict)
118 self._send_websocket_log(f"【量化交易系统】💵 使用自定义费用配置")
119 except Exception as e:
120 self._send_websocket_log(
121 f"【量化交易系统】⚠️ 费用配置解析失败,使用默认配置: {e}"
122 )
123 fee_config = None
125 if not fee_config:
126 fee_config = FeeConfig() # 使用默认的长桥费率
127 self._send_websocket_log(
128 f"【量化交易系统】💵 使用默认费用配置(长桥费率)"
129 )
131 self.trading_engine = create_trading_engine(
132 user_id=self.user_id,
133 session_id=self.session_id,
134 trading_mode=self.trading_mode,
135 asset_mode=self.asset_mode,
136 initial_capital=initial_capital,
137 currency=currency,
138 fee_config=fee_config,
139 )
141 if not self.trading_engine.initialize():
142 print(f"❌ 交易引擎初始化失败")
143 return False
145 # 2. 创建策略引擎
146 self.strategy_engine = StrategyEngine(
147 user_id=self.user_id,
148 session_id=self.session_id,
149 trading_engine=self.trading_engine,
150 session_config=self.session_config,
151 trading_session_engine=self, # 传递TradingSessionEngine引用,用于统一日志管理
152 )
154 # 3. 设置TradingEngine的策略引擎引用,用于统一日志管理
155 self.trading_engine.set_strategy_engine(self.strategy_engine)
157 # 4. 添加策略
158 strategy_name = self.session_config.get("strategy_name")
159 strategy_params = self.session_config.get("strategy_params", {})
161 if strategy_name:
162 strategy = create_strategy(strategy_name, strategy_params)
163 if strategy:
164 self.strategy_engine.add_strategy(strategy)
165 else:
166 print(f"❌ 策略创建失败: {strategy_name}")
167 return False
169 # 5. 初始化回测参数(如果是回测模式)
170 if self.trading_mode == TradingMode.BACKTEST:
171 self._initialize_backtest_params()
172 self._initialize_time_series_controller()
174 # 6. 获取可交易股票列表
175 self.subscribed_symbols = self._get_tradable_symbols()
177 self._send_websocket_log(f"【量化交易系统】✅ 量化交易系统初始化成功")
178 self._send_websocket_log(
179 f"【量化交易系统】 交易模式: {self.trading_mode.value}"
180 )
181 self._send_websocket_log(
182 f"【量化交易系统】 资产模式: {self.asset_mode.value}"
183 )
184 self._send_websocket_log(f"【量化交易系统】 策略: {strategy_name}")
185 self._send_websocket_log(
186 f"【量化交易系统】 可交易股票: {self.subscribed_symbols}"
187 )
189 return True
191 except Exception as e:
192 self._send_websocket_log(f"【量化交易系统】❌ 量化交易系统初始化失败: {e}")
193 return False
195 def start(self) -> bool:
196 """启动交易会话引擎"""
197 if self.is_running:
198 return True
200 try:
201 # 设置交易模式环境变量,用于控制长桥客户端初始化
202 import os
204 os.environ["TRADING_MODE"] = self.trading_mode.value
206 # 标记为启动中状态
207 self.is_running = True
208 self.start_time = datetime.now()
209 self.stop_event.clear()
211 self._send_websocket_log(
212 f"【量化交易系统】🚀 量化交易系统启动中: {self.session_id}"
213 )
215 # 异步进行完整初始化
216 def async_initialization():
217 try:
218 # 启动交易引擎
219 if not self.trading_engine.start():
220 self._send_websocket_log(f"【量化交易系统】❌ 交易引擎启动失败")
221 self.is_running = False
222 return
224 # 启动策略引擎
225 if not self.strategy_engine.start():
226 self._send_websocket_log(f"【量化交易系统】❌ 策略引擎启动失败")
227 self.is_running = False
228 return
230 # 等待策略模块初始化完成
231 self._send_websocket_log(
232 f"【量化交易系统】⏳ 等待策略模块初始化完成..."
233 )
234 if not self.strategy_engine.wait_for_initialization(timeout=60.0):
235 self._send_websocket_log(
236 f"【量化交易系统】❌ 策略模块初始化失败或超时"
237 )
238 self.is_running = False
239 return
241 # 启动数据流处理线程
242 self.data_thread = threading.Thread(
243 target=self._data_flow_loop, daemon=True
244 )
245 self.data_thread.start()
247 self._send_websocket_log(
248 f"【量化交易系统】✅ 量化交易系统已启动: {self.session_id}"
249 )
250 self._send_websocket_log(
251 f"【量化交易系统】📊 交易模式: {self.trading_mode.value}"
252 )
253 self._send_websocket_log(
254 f"【量化交易系统】💰 资产模式: {self.asset_mode.value}"
255 )
256 self._send_websocket_log(
257 f"【量化交易系统】📈 可交易股票: {self.subscribed_symbols}"
258 )
260 except Exception as e:
261 self.is_running = False
262 self._send_websocket_log(f"【量化交易系统】❌ 异步初始化失败: {e}")
264 # 启动异步初始化线程
265 threading.Thread(target=async_initialization, daemon=True).start()
267 # 立即返回成功,让前端状态快速更新
268 return True
270 except Exception as e:
271 self.is_running = False
272 self._send_websocket_log(f"【量化交易系统】❌ 启动量化交易系统失败: {e}")
273 return False
275 def stop(self) -> bool:
276 """停止交易会话引擎"""
277 if not self.is_running:
278 return True
280 try:
281 self.is_running = False
282 self.end_time = datetime.now()
283 self.stop_event.set()
285 # 停止策略引擎
286 if self.strategy_engine:
287 self.strategy_engine.stop()
289 # 停止交易引擎
290 if self.trading_engine:
291 self.trading_engine.stop()
293 # 等待数据线程结束
294 import threading
296 if (
297 self.data_thread
298 and self.data_thread.is_alive()
299 and self.data_thread != threading.current_thread()
300 ):
301 self.data_thread.join(timeout=5)
303 self._send_websocket_log(
304 f"【量化交易系统】✅ 量化交易系统已停止: {self.session_id}"
305 )
306 return True
308 except Exception as e:
309 self._send_websocket_log(f"【量化交易系统】❌ 停止量化交易系统失败: {e}")
310 return False
312 def pause(self) -> bool:
313 """暂停交易会话引擎"""
314 if not self.is_running or self.is_paused:
315 return True
317 try:
318 self.is_paused = True
319 if self.strategy_engine:
320 self.strategy_engine.pause()
321 self._send_websocket_log(
322 f"【量化交易系统】✅ 量化交易系统已暂停: {self.session_id}"
323 )
324 return True
325 except Exception as e:
326 self._send_websocket_log(f"【量化交易系统】❌ 暂停量化交易系统失败: {e}")
327 return False
329 def resume(self) -> bool:
330 """恢复交易会话引擎"""
331 if not self.is_running or not self.is_paused:
332 return True
334 try:
335 self.is_paused = False
336 if self.strategy_engine:
337 self.strategy_engine.resume()
338 self._send_websocket_log(
339 f"【量化交易系统】✅ 量化交易系统已恢复: {self.session_id}"
340 )
341 return True
342 except Exception as e:
343 self._send_websocket_log(f"【量化交易系统】❌ 恢复量化交易系统失败: {e}")
344 return False
346 def _data_flow_loop(self):
347 """量化交易系统数据流处理循环"""
348 while self.is_running and not self.stop_event.is_set():
349 try:
350 if self.is_paused:
351 time.sleep(1)
352 continue
354 if self.trading_mode == TradingMode.REALTIME:
355 # 实时交易:只在数据缺失时主动获取
356 self._check_and_fetch_realtime_data()
357 time.sleep(2) # 实时交易需要间隔,避免频繁检查
358 elif self.trading_mode == TradingMode.BACKTEST:
359 # 回测模式:连续处理历史数据,追求速度
360 self._process_backtest_data()
361 # 回测模式不需要sleep,连续处理
363 except Exception as e:
364 self._send_websocket_log(f"【量化交易系统】❌ 数据流处理失败: {e}")
365 time.sleep(1) # 出错时短暂等待
367 def _check_and_fetch_realtime_data(self):
368 """检查并获取实时数据 - 只在数据缺失时主动获取"""
369 try:
370 # 检查是否有新的市场数据
371 now = datetime.now()
372 time_since_last_data = (now - self.last_market_data_time).total_seconds()
374 # 如果超过30秒没有数据,才主动获取
375 if time_since_last_data > 30:
376 self._send_websocket_log(
377 f"【量化交易系统】⚠️ 超过30秒未收到数据,主动获取实时数据"
378 )
379 self._process_realtime_data()
380 else:
381 # 正常情况,等待数据推送
382 if time_since_last_data > 10: # 超过10秒记录一次状态
383 self._send_websocket_log(
384 f"【量化交易系统】📊 等待数据推送中... ({time_since_last_data:.1f}秒)"
385 )
387 except Exception as e:
388 self._send_websocket_log(f"【量化交易系统】❌ 检查实时数据失败: {e}")
390 def _process_realtime_data(self):
391 """处理实时数据"""
392 try:
393 if self.asset_mode == AssetMode.SIMULATION:
394 # 实时交易 + 模拟资产:通过抽象接口订阅获取实时数据
395 self._process_realtime_simulation_data()
396 elif self.asset_mode == AssetMode.REAL:
397 # 实时交易 + 真实资产:通过抽象接口订阅获取实时数据
398 self._process_realtime_real_data()
400 except Exception as e:
401 self._send_websocket_log(f"【量化交易系统】❌ 处理实时数据失败: {e}")
403 def _process_realtime_simulation_data(self):
404 """处理实时交易+模拟资产数据"""
405 try:
406 # 从数据源获取实时行情(通过抽象接口)
407 quotes = self.quote_data_adapter.get_quote(self.subscribed_symbols)
409 if quotes:
410 for quote in quotes:
411 if hasattr(quote, "symbol") and hasattr(quote, "last_done"):
412 market_data = MarketData(
413 symbol=quote.symbol,
414 timestamp=datetime.now(),
415 open=(
416 Decimal(str(quote.open))
417 if hasattr(quote, "open") and quote.open
418 else Decimal("0")
419 ),
420 high=(
421 Decimal(str(quote.high))
422 if hasattr(quote, "high") and quote.high
423 else Decimal("0")
424 ),
425 low=(
426 Decimal(str(quote.low))
427 if hasattr(quote, "low") and quote.low
428 else Decimal("0")
429 ),
430 close=Decimal(str(quote.last_done)),
431 volume=(
432 int(quote.volume)
433 if hasattr(quote, "volume") and quote.volume
434 else 0
435 ),
436 )
438 # 推送给策略引擎
439 self.strategy_engine.process_market_data(market_data)
440 # 推送给交易引擎(模拟交易引擎需要市场数据来处理订单成交)
441 self.trading_engine.process_market_data(market_data)
442 else:
443 print("【量化交易系统】⚠️ 未获取到实时行情数据,等待数据源推送")
445 except Exception as e:
446 self._send_websocket_log(f"【量化交易系统】❌ 处理实时模拟数据失败: {e}")
448 def _process_realtime_real_data(self):
449 """处理实时交易+真实资产数据"""
450 try:
451 # 从数据源获取实时行情(通过抽象接口)
452 quotes = self.quote_data_adapter.get_quote(self.subscribed_symbols)
454 if quotes:
455 for quote in quotes:
456 if hasattr(quote, "symbol") and hasattr(quote, "last_done"):
457 market_data = MarketData(
458 symbol=quote.symbol,
459 timestamp=datetime.now(),
460 open=(
461 Decimal(str(quote.open))
462 if hasattr(quote, "open") and quote.open
463 else Decimal("0")
464 ),
465 high=(
466 Decimal(str(quote.high))
467 if hasattr(quote, "high") and quote.high
468 else Decimal("0")
469 ),
470 low=(
471 Decimal(str(quote.low))
472 if hasattr(quote, "low") and quote.low
473 else Decimal("0")
474 ),
475 close=Decimal(str(quote.last_done)),
476 volume=(
477 int(quote.volume)
478 if hasattr(quote, "volume") and quote.volume
479 else 0
480 ),
481 )
483 # 推送给策略引擎
484 self.strategy_engine.process_market_data(market_data)
485 # 推送给交易引擎(真实交易引擎也需要市场数据)
486 self.trading_engine.process_market_data(market_data)
487 else:
488 print("【量化交易系统】⚠️ 未获取到实时行情数据,等待数据源推送")
490 except Exception as e:
491 self._send_websocket_log(f"【量化交易系统】❌ 处理实时真实数据失败: {e}")
493 def _process_backtest_data(self):
494 """处理回测数据 - 回测模式+模拟资产,连续处理追求速度"""
495 try:
496 if not self.current_time or not self.backtest_end_time:
497 self._send_websocket_log(f"【量化交易系统】❌ 回测时间范围未设置")
498 return
500 if self.current_time > self.backtest_end_time:
501 self._send_websocket_log(
502 f"【量化交易系统】✅ 回测完成: {self.session_id}"
503 )
505 # 更新数据库中的会话状态为已完成
506 try:
507 from core.services.trading_service import TradingService
509 trading_service = TradingService(self.user_id)
510 updated_session = trading_service.stop_trading_session(
511 self.session_id
512 )
513 if updated_session:
514 self._send_websocket_log(
515 f"【量化交易系统】✅ 会话状态已更新为已完成"
516 )
517 else:
518 self._send_websocket_log(f"【量化交易系统】⚠️ 会话状态更新失败")
519 except Exception as e:
520 self._send_websocket_log(
521 f"【量化交易系统】❌ 更新会话状态失败: {e}"
522 )
524 self.stop()
525 return
527 # 连续处理多个时间点,提高回测速度
528 processed_count = 0
529 max_batch_size = 100 # 每次最多处理100个时间点
531 while (
532 self.current_time <= self.backtest_end_time
533 and processed_count < max_batch_size
534 and self.is_running
535 and not self.stop_event.is_set()
536 ):
538 # 获取当前时间点的历史数据
539 market_data_list = self._get_backtest_historical_data(self.current_time)
541 if market_data_list:
542 # 记录处理前的交易数量
543 initial_trade_count = self._get_current_trade_count()
545 # 有数据就处理
546 for market_data in market_data_list:
547 # 推送给策略引擎
548 self.strategy_engine.process_market_data(market_data)
549 # 推送给交易引擎(模拟交易引擎需要市场数据来处理订单成交)
550 self.trading_engine.process_market_data(market_data)
552 # 等待策略执行完成
553 if not self.strategy_engine.wait_for_strategy_execution(
554 timeout=10.0
555 ):
556 self._send_websocket_log(
557 f"【量化交易系统】⚠️ 策略执行超时,继续下一个时间点"
558 )
560 # 检查是否发生了交易
561 current_trade_count = self._get_current_trade_count()
562 if current_trade_count > initial_trade_count:
563 trade_count = current_trade_count - initial_trade_count
564 self._send_websocket_log(
565 f"【量化交易系统】💼 检测到 {trade_count} 笔交易发生"
566 )
567 # 订单处理是同步的,不需要等待
569 processed_count += 1
571 # 每处理10个时间点输出一次进度
572 if processed_count % 10 == 0:
573 self._send_websocket_log(
574 f"【量化交易系统】📊 回测进度: {self.current_time}, 已处理: {processed_count}个时间点"
575 )
577 # 推进到下一个时间点,保持时区信息
578 self.current_time += timedelta(minutes=1)
580 # 确保时区信息保持一致
581 if self.backtest_start_time.tzinfo and not self.current_time.tzinfo:
582 self.current_time = self.current_time.replace(
583 tzinfo=self.backtest_start_time.tzinfo
584 )
586 # 输出批量处理结果
587 if processed_count > 0:
588 self._send_websocket_log(
589 f"【量化交易系统】⚡ 批量处理完成: {processed_count}个时间点"
590 )
592 except Exception as e:
593 self._send_websocket_log(f"【量化交易系统】❌ 处理回测数据失败: {e}")
594 self._send_websocket_log(f"【量化交易系统】❌ 处理回测数据失败: {e}")
596 def _send_websocket_log(self, message: str, log_type: str = "log"):
597 """发送WebSocket日志 - 通过StrategyEngine统一处理"""
598 if self.strategy_engine:
599 self.strategy_engine.log_message(message, log_type, "量化交易系统")
600 else:
601 # 如果StrategyEngine还没有初始化,使用备用方法
602 try:
603 from core.services.websocket_service_factory import \
604 send_websocket_log
606 send_websocket_log(self.session_id, message, log_type)
607 except Exception as e:
608 print(f"❌ 发送WebSocket日志失败: {e}")
609 # 移除print语句,避免日志打到控制台
611 def _get_current_trade_count(self) -> int:
612 """获取当前交易数量,用于检测是否发生了交易"""
613 try:
614 if self.trading_engine and hasattr(
615 self.trading_engine, "simulation_engine"
616 ):
617 simulation_engine = self.trading_engine.simulation_engine
618 if simulation_engine and hasattr(simulation_engine, "trades"):
619 return len(simulation_engine.trades)
620 return 0
621 except Exception as e:
622 self._send_websocket_log(f"【量化交易系统】❌ 获取交易数量失败: {e}")
623 return 0
625 def _get_backtest_historical_data(self, timestamp: datetime) -> List[MarketData]:
626 """获取回测历史数据 - 模拟交易模块读取历史数据推送"""
627 market_data_list = []
628 missing_data_symbols = []
630 for symbol in self.subscribed_symbols:
631 try:
632 # 1. 优先从数据库/缓存读取历史数据
633 historical_data = self._get_historical_data(symbol, timestamp)
635 if historical_data:
636 market_data = MarketData(
637 symbol=symbol,
638 timestamp=timestamp,
639 open=Decimal(str(historical_data.get("open", 150))),
640 high=Decimal(str(historical_data.get("high", 155))),
641 low=Decimal(str(historical_data.get("low", 148))),
642 close=Decimal(str(historical_data.get("close", 152))),
643 volume=int(historical_data.get("volume", 1000)),
644 is_market_open=True, # 有数据表示市场开盘
645 )
646 self._send_websocket_log(
647 f"【量化交易系统】📊 从数据库获取历史数据: {symbol} at {timestamp} = ${market_data.close:.2f}"
648 )
649 market_data_list.append(market_data)
650 else:
651 # 2. 如果没有历史数据,记录缺失的股票
652 missing_data_symbols.append(symbol)
653 self._send_websocket_log(
654 f"【量化交易系统】⏰ 无历史数据: {symbol} at {timestamp}"
655 )
657 except Exception as e:
658 self._send_websocket_log(
659 f"【量化交易系统】❌ 获取回测数据失败: {symbol} - {e}"
660 )
661 missing_data_symbols.append(symbol)
662 continue
664 # 如果任何股票缺少数据,记录日志但不创建无数据推送
665 if missing_data_symbols:
666 self._send_websocket_log(
667 f"【量化交易系统】⏰ 无数据时刻,跳过推送: {missing_data_symbols} at {timestamp}"
668 )
670 # 只返回有真实数据的市场数据,不推送无数据时刻
671 return market_data_list
673 def _get_historical_data(
674 self, symbol: str, timestamp: datetime
675 ) -> Optional[Dict[str, Any]]:
676 """从数据库获取历史数据"""
677 try:
678 # 使用StockRepository的抽象接口获取历史数据
679 import asyncio
681 from core.repositories.stock_repository import StockRepository
682 from infrastructure.database.redis_client import get_redis
684 stock_repo = StockRepository(get_redis().client)
686 # 将datetime转换为时间戳,确保时区处理正确
687 from datetime import timezone
689 if timestamp.tzinfo is None:
690 # 如果没有时区信息,假设是UTC时间
691 timestamp_int = int(timestamp.replace(tzinfo=timezone.utc).timestamp())
692 else:
693 # 如果有时区信息,直接转换
694 timestamp_int = int(timestamp.timestamp())
696 # 使用StockRepository的get_stock_data方法获取数据
697 # 注意:这里需要同步调用异步方法
698 try:
699 # 尝试获取当前事件循环
700 try:
701 loop = asyncio.get_running_loop()
702 # 如果已经有运行中的循环,使用线程池执行
703 import concurrent.futures
705 with concurrent.futures.ThreadPoolExecutor() as executor:
706 future = executor.submit(
707 asyncio.run,
708 stock_repo.get_stock_data(symbol, timestamp_int),
709 )
710 stock_data = future.result()
711 except RuntimeError:
712 # 没有运行中的循环,直接使用asyncio.run
713 stock_data = asyncio.run(
714 stock_repo.get_stock_data(symbol, timestamp_int)
715 )
716 except Exception as e:
717 self._send_websocket_log(
718 f"【量化交易系统】❌ 异步调用失败: {symbol} - {e}"
719 )
720 return None
722 if stock_data:
723 # 转换为字典格式
724 result = {
725 "open": float(stock_data.open),
726 "high": float(stock_data.high),
727 "low": float(stock_data.low),
728 "close": float(stock_data.close),
729 "volume": int(stock_data.volume),
730 "turnover": float(stock_data.turnover),
731 "trade_session": stock_data.trade_session,
732 }
733 return result
735 # 没有历史数据是正常的,市场可能没有开盘
736 self._send_websocket_log(
737 f"【量化交易系统】📊 无历史数据: {symbol} at {timestamp} (市场可能未开盘)"
738 )
739 return None
741 except Exception as e:
742 self._send_websocket_log(
743 f"【量化交易系统】❌ 获取历史数据失败: {symbol} at {timestamp} - {e}"
744 )
745 return None
747 def _initialize_backtest_params(self):
748 """初始化回测参数"""
749 try:
750 # session_config 本身就是配置对象,不需要再访问 'config' 字段
751 config = self.session_config
752 self._send_websocket_log(
753 f"【量化交易系统】🔧 初始化回测参数,配置: {config}"
754 )
756 # 解析开始和结束时间
757 start_date_str = config.get("start_date")
758 end_date_str = config.get("end_date")
759 timezone_str = config.get("timezone", "UTC")
761 self._send_websocket_log(
762 f"【量化交易系统】📅 原始时间字符串: start={start_date_str}, end={end_date_str}"
763 )
765 if start_date_str:
766 # 处理时区信息
767 if start_date_str.endswith("Z"):
768 start_date_str = start_date_str.replace("Z", "+00:00")
769 self.backtest_start_time = datetime.fromisoformat(start_date_str)
770 self.current_time = self.backtest_start_time
771 self._send_websocket_log(
772 f"【量化交易系统】✅ 设置开始时间: {self.backtest_start_time} (时区: {self.backtest_start_time.tzinfo})"
773 )
774 else:
775 self._send_websocket_log(f"【量化交易系统】❌ 未找到开始时间")
777 if end_date_str:
778 # 处理时区信息
779 if end_date_str.endswith("Z"):
780 end_date_str = end_date_str.replace("Z", "+00:00")
781 self.backtest_end_time = datetime.fromisoformat(end_date_str)
782 self._send_websocket_log(
783 f"【量化交易系统】✅ 设置结束时间: {self.backtest_end_time} (时区: {self.backtest_end_time.tzinfo})"
784 )
785 else:
786 self._send_websocket_log(f"【量化交易系统】❌ 未找到结束时间")
788 self._send_websocket_log(
789 f"【量化交易系统】📅 回测时间范围: {self.backtest_start_time} - {self.backtest_end_time}"
790 )
792 except Exception as e:
793 self._send_websocket_log(f"【量化交易系统】❌ 初始化回测参数失败: {e}")
794 import traceback
796 traceback.print_exc()
798 def _initialize_time_series_controller(self):
799 """初始化时序控制器"""
800 try:
801 if self.backtest_start_time and self.backtest_end_time:
802 self.time_series_controller = TimeSeriesController(
803 start_time=self.backtest_start_time,
804 end_time=self.backtest_end_time,
805 timezone=self.session_config.get("config", {}).get(
806 "timezone", "UTC"
807 ),
808 strategy_engine=self.strategy_engine, # 传递策略引擎引用
809 )
811 # 设置数据源适配器
812 self.time_series_controller.set_quote_adapter(self.quote_data_adapter)
814 # 设置回调函数
815 self.time_series_controller.set_callbacks(
816 on_progress=self._on_backtest_progress,
817 on_complete=self._on_backtest_complete,
818 on_error=self._on_backtest_error,
819 )
821 self._send_websocket_log(f"【量化交易系统】✅ 时序控制器初始化成功")
823 except Exception as e:
824 self._send_websocket_log(f"【量化交易系统】❌ 初始化时序控制器失败: {e}")
826 def _on_backtest_progress(self, progress: float, current_time: datetime):
827 """回测进度回调"""
828 self._send_websocket_log(
829 f"【量化交易系统】📈 回测进度: {progress:.1f}% - {current_time}"
830 )
832 def _on_backtest_complete(self):
833 """回测完成回调"""
834 self._send_websocket_log(f"【量化交易系统】✅ 回测完成")
835 self.stop()
837 def _on_backtest_error(self, error: Exception):
838 """回测错误回调"""
839 self._send_websocket_log(f"【量化交易系统】❌ 回测错误: {error}")
840 self.stop()
842 def _get_tradable_symbols(self) -> List[str]:
843 """获取可交易股票列表"""
844 try:
845 # 从会话配置中获取
846 risk_config = self.session_config.get("risk_config", {})
848 # 优先从per_stock_config获取
849 per_stock_config = risk_config.get("per_stock_config", {})
850 if per_stock_config:
851 symbols = list(per_stock_config.keys())
852 if symbols:
853 self._send_websocket_log(
854 f"✅ 从per_stock_config获取可交易股票: {symbols}", "info"
855 )
856 return symbols
858 # 从allowed_symbols获取
859 allowed_symbols = risk_config.get("allowed_symbols", [])
860 if allowed_symbols:
861 self._send_websocket_log(
862 f"✅ 从allowed_symbols获取可交易股票: {allowed_symbols}", "info"
863 )
864 return allowed_symbols
866 # 默认股票列表
867 default_symbols = ["YINN.US", "YANG.US"]
868 self._send_websocket_log(f"⚠️ 使用默认可交易股票: {default_symbols}", "warn")
869 return default_symbols
871 except Exception as e:
872 self._send_websocket_log(f"❌ 获取可交易股票失败: {e}", "error")
873 return ["YINN.US", "YANG.US"]