Coverage for core/trading/engines/trading_session_engine.py: 30.64%

408 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-13 18:58 +0000

1""" 

2交易会话引擎 - 统一调度数据流 

3""" 

4 

5import asyncio 

6import threading 

7import time 

8from datetime import datetime, timedelta 

9from decimal import Decimal 

10from typing import Any, Dict, List, Optional 

11 

12from core.data_source.adapters.data_source_adapter import \ 

13 QuoteDataSourceAdapter 

14from core.data_source.adapters.quote_adapter import QuoteAdapter 

15from core.models.broker import FeeConfig 

16from core.models.trading import (AssetMode, MarketData, StrategyContext, 

17 TradingMode) 

18from core.trading.engines.simulation_engine import SimulationEngine 

19from core.trading.engines.strategy_engine import StrategyEngine 

20from core.trading.engines.trading_engine import TradingEngine 

21from core.trading.strategies import create_strategy 

22 

23from .time_series_controller import TimeSeriesController 

24 

25 

26class TradingSessionEngine: 

27 """量化交易系统 - 统一调度数据流,协调策略、交易模块、模拟交易模块""" 

28 

29 def __init__(self, user_id: str, session_id: str, session_config: Dict[str, Any]): 

30 self.user_id = user_id 

31 self.session_id = session_id 

32 self.session_config = session_config 

33 

34 # 交易模式 

35 self.trading_mode = TradingMode(session_config.get("trading_mode", "realtime")) 

36 self.asset_mode = AssetMode(session_config.get("asset_mode", "simulation")) 

37 

38 # 核心组件 

39 self.trading_engine: Optional[TradingEngine] = ( 

40 None # 交易模块:处理策略触发的交易信号 

41 ) 

42 self.strategy_engine: Optional[StrategyEngine] = ( 

43 None # 策略引擎:被动接收数据,产生交易信号 

44 ) 

45 self.simulation_engine: Optional[SimulationEngine] = ( 

46 None # 模拟交易模块:主动推送数据,模拟成交 

47 ) 

48 

49 # 数据源适配器 

50 self.quote_adapter = QuoteAdapter(user_id) 

51 self.quote_data_adapter = QuoteDataSourceAdapter(user_id) 

52 

53 # 状态管理 

54 self.is_running = False 

55 self.is_paused = False 

56 self.start_time: Optional[datetime] = None 

57 self.end_time: Optional[datetime] = None 

58 

59 # 数据订阅 

60 self.subscribed_symbols: List[str] = [] 

61 self.last_market_data_time = datetime.now() 

62 

63 # 回测相关 

64 self.current_time: Optional[datetime] = None 

65 self.backtest_start_time: Optional[datetime] = None 

66 self.backtest_end_time: Optional[datetime] = None 

67 self.time_series_controller: Optional[TimeSeriesController] = None 

68 

69 # 线程管理 

70 self.data_thread: Optional[threading.Thread] = None 

71 self.stop_event = threading.Event() 

72 

73 def initialize(self) -> bool: 

74 """初始化交易会话引擎""" 

75 try: 

76 # 1. 创建交易引擎 

77 from core.trading.engines import create_trading_engine 

78 

79 # 获取初始资金 - 支持多币种 

80 initial_capital = Decimal("10000") # 默认值 

81 currency = "USD" # 默认币种 

82 

83 if "initial_capital_by_currency" in self.session_config: 

84 # 优先使用USD资金 

85 usd_capital = self.session_config["initial_capital_by_currency"].get( 

86 "USD", {} 

87 ) 

88 if ( 

89 "cash_assets" in usd_capital 

90 and float(usd_capital["cash_assets"]) > 0 

91 ): 

92 initial_capital = Decimal(str(usd_capital["cash_assets"])) 

93 currency = "USD" 

94 else: 

95 # 如果没有USD资金,使用HKD资金 

96 hkd_capital = self.session_config[ 

97 "initial_capital_by_currency" 

98 ].get("HKD", {}) 

99 if ( 

100 "cash_assets" in hkd_capital 

101 and float(hkd_capital["cash_assets"]) > 0 

102 ): 

103 initial_capital = Decimal(str(hkd_capital["cash_assets"])) 

104 currency = "HKD" 

105 elif "initial_capital" in self.session_config: 

106 initial_capital = Decimal(str(self.session_config["initial_capital"])) 

107 

108 self._send_websocket_log( 

109 f"【量化交易系统】💰 设置初始资金: {initial_capital} {currency}" 

110 ) 

111 

112 # 获取费用配置(从会话配置或使用默认配置) 

113 fee_config = None 

114 if "fee_config" in self.session_config: 

115 fee_config_dict = self.session_config["fee_config"] 

116 try: 

117 fee_config = FeeConfig(**fee_config_dict) 

118 self._send_websocket_log(f"【量化交易系统】💵 使用自定义费用配置") 

119 except Exception as e: 

120 self._send_websocket_log( 

121 f"【量化交易系统】⚠️ 费用配置解析失败,使用默认配置: {e}" 

122 ) 

123 fee_config = None 

124 

125 if not fee_config: 

126 fee_config = FeeConfig() # 使用默认的长桥费率 

127 self._send_websocket_log( 

128 f"【量化交易系统】💵 使用默认费用配置(长桥费率)" 

129 ) 

130 

131 self.trading_engine = create_trading_engine( 

132 user_id=self.user_id, 

133 session_id=self.session_id, 

134 trading_mode=self.trading_mode, 

135 asset_mode=self.asset_mode, 

136 initial_capital=initial_capital, 

137 currency=currency, 

138 fee_config=fee_config, 

139 ) 

140 

141 if not self.trading_engine.initialize(): 

142 print(f"❌ 交易引擎初始化失败") 

143 return False 

144 

145 # 2. 创建策略引擎 

146 self.strategy_engine = StrategyEngine( 

147 user_id=self.user_id, 

148 session_id=self.session_id, 

149 trading_engine=self.trading_engine, 

150 session_config=self.session_config, 

151 trading_session_engine=self, # 传递TradingSessionEngine引用,用于统一日志管理 

152 ) 

153 

154 # 3. 设置TradingEngine的策略引擎引用,用于统一日志管理 

155 self.trading_engine.set_strategy_engine(self.strategy_engine) 

156 

157 # 4. 添加策略 

158 strategy_name = self.session_config.get("strategy_name") 

159 strategy_params = self.session_config.get("strategy_params", {}) 

160 

161 if strategy_name: 

162 strategy = create_strategy(strategy_name, strategy_params) 

163 if strategy: 

164 self.strategy_engine.add_strategy(strategy) 

165 else: 

166 print(f"❌ 策略创建失败: {strategy_name}") 

167 return False 

168 

169 # 5. 初始化回测参数(如果是回测模式) 

170 if self.trading_mode == TradingMode.BACKTEST: 

171 self._initialize_backtest_params() 

172 self._initialize_time_series_controller() 

173 

174 # 6. 获取可交易股票列表 

175 self.subscribed_symbols = self._get_tradable_symbols() 

176 

177 self._send_websocket_log(f"【量化交易系统】✅ 量化交易系统初始化成功") 

178 self._send_websocket_log( 

179 f"【量化交易系统】 交易模式: {self.trading_mode.value}" 

180 ) 

181 self._send_websocket_log( 

182 f"【量化交易系统】 资产模式: {self.asset_mode.value}" 

183 ) 

184 self._send_websocket_log(f"【量化交易系统】 策略: {strategy_name}") 

185 self._send_websocket_log( 

186 f"【量化交易系统】 可交易股票: {self.subscribed_symbols}" 

187 ) 

188 

189 return True 

190 

191 except Exception as e: 

192 self._send_websocket_log(f"【量化交易系统】❌ 量化交易系统初始化失败: {e}") 

193 return False 

194 

195 def start(self) -> bool: 

196 """启动交易会话引擎""" 

197 if self.is_running: 

198 return True 

199 

200 try: 

201 # 设置交易模式环境变量,用于控制长桥客户端初始化 

202 import os 

203 

204 os.environ["TRADING_MODE"] = self.trading_mode.value 

205 

206 # 标记为启动中状态 

207 self.is_running = True 

208 self.start_time = datetime.now() 

209 self.stop_event.clear() 

210 

211 self._send_websocket_log( 

212 f"【量化交易系统】🚀 量化交易系统启动中: {self.session_id}" 

213 ) 

214 

215 # 异步进行完整初始化 

216 def async_initialization(): 

217 try: 

218 # 启动交易引擎 

219 if not self.trading_engine.start(): 

220 self._send_websocket_log(f"【量化交易系统】❌ 交易引擎启动失败") 

221 self.is_running = False 

222 return 

223 

224 # 启动策略引擎 

225 if not self.strategy_engine.start(): 

226 self._send_websocket_log(f"【量化交易系统】❌ 策略引擎启动失败") 

227 self.is_running = False 

228 return 

229 

230 # 等待策略模块初始化完成 

231 self._send_websocket_log( 

232 f"【量化交易系统】⏳ 等待策略模块初始化完成..." 

233 ) 

234 if not self.strategy_engine.wait_for_initialization(timeout=60.0): 

235 self._send_websocket_log( 

236 f"【量化交易系统】❌ 策略模块初始化失败或超时" 

237 ) 

238 self.is_running = False 

239 return 

240 

241 # 启动数据流处理线程 

242 self.data_thread = threading.Thread( 

243 target=self._data_flow_loop, daemon=True 

244 ) 

245 self.data_thread.start() 

246 

247 self._send_websocket_log( 

248 f"【量化交易系统】✅ 量化交易系统已启动: {self.session_id}" 

249 ) 

250 self._send_websocket_log( 

251 f"【量化交易系统】📊 交易模式: {self.trading_mode.value}" 

252 ) 

253 self._send_websocket_log( 

254 f"【量化交易系统】💰 资产模式: {self.asset_mode.value}" 

255 ) 

256 self._send_websocket_log( 

257 f"【量化交易系统】📈 可交易股票: {self.subscribed_symbols}" 

258 ) 

259 

260 except Exception as e: 

261 self.is_running = False 

262 self._send_websocket_log(f"【量化交易系统】❌ 异步初始化失败: {e}") 

263 

264 # 启动异步初始化线程 

265 threading.Thread(target=async_initialization, daemon=True).start() 

266 

267 # 立即返回成功,让前端状态快速更新 

268 return True 

269 

270 except Exception as e: 

271 self.is_running = False 

272 self._send_websocket_log(f"【量化交易系统】❌ 启动量化交易系统失败: {e}") 

273 return False 

274 

275 def stop(self) -> bool: 

276 """停止交易会话引擎""" 

277 if not self.is_running: 

278 return True 

279 

280 try: 

281 self.is_running = False 

282 self.end_time = datetime.now() 

283 self.stop_event.set() 

284 

285 # 停止策略引擎 

286 if self.strategy_engine: 

287 self.strategy_engine.stop() 

288 

289 # 停止交易引擎 

290 if self.trading_engine: 

291 self.trading_engine.stop() 

292 

293 # 等待数据线程结束 

294 import threading 

295 

296 if ( 

297 self.data_thread 

298 and self.data_thread.is_alive() 

299 and self.data_thread != threading.current_thread() 

300 ): 

301 self.data_thread.join(timeout=5) 

302 

303 self._send_websocket_log( 

304 f"【量化交易系统】✅ 量化交易系统已停止: {self.session_id}" 

305 ) 

306 return True 

307 

308 except Exception as e: 

309 self._send_websocket_log(f"【量化交易系统】❌ 停止量化交易系统失败: {e}") 

310 return False 

311 

312 def pause(self) -> bool: 

313 """暂停交易会话引擎""" 

314 if not self.is_running or self.is_paused: 

315 return True 

316 

317 try: 

318 self.is_paused = True 

319 if self.strategy_engine: 

320 self.strategy_engine.pause() 

321 self._send_websocket_log( 

322 f"【量化交易系统】✅ 量化交易系统已暂停: {self.session_id}" 

323 ) 

324 return True 

325 except Exception as e: 

326 self._send_websocket_log(f"【量化交易系统】❌ 暂停量化交易系统失败: {e}") 

327 return False 

328 

329 def resume(self) -> bool: 

330 """恢复交易会话引擎""" 

331 if not self.is_running or not self.is_paused: 

332 return True 

333 

334 try: 

335 self.is_paused = False 

336 if self.strategy_engine: 

337 self.strategy_engine.resume() 

338 self._send_websocket_log( 

339 f"【量化交易系统】✅ 量化交易系统已恢复: {self.session_id}" 

340 ) 

341 return True 

342 except Exception as e: 

343 self._send_websocket_log(f"【量化交易系统】❌ 恢复量化交易系统失败: {e}") 

344 return False 

345 

346 def _data_flow_loop(self): 

347 """量化交易系统数据流处理循环""" 

348 while self.is_running and not self.stop_event.is_set(): 

349 try: 

350 if self.is_paused: 

351 time.sleep(1) 

352 continue 

353 

354 if self.trading_mode == TradingMode.REALTIME: 

355 # 实时交易:只在数据缺失时主动获取 

356 self._check_and_fetch_realtime_data() 

357 time.sleep(2) # 实时交易需要间隔,避免频繁检查 

358 elif self.trading_mode == TradingMode.BACKTEST: 

359 # 回测模式:连续处理历史数据,追求速度 

360 self._process_backtest_data() 

361 # 回测模式不需要sleep,连续处理 

362 

363 except Exception as e: 

364 self._send_websocket_log(f"【量化交易系统】❌ 数据流处理失败: {e}") 

365 time.sleep(1) # 出错时短暂等待 

366 

367 def _check_and_fetch_realtime_data(self): 

368 """检查并获取实时数据 - 只在数据缺失时主动获取""" 

369 try: 

370 # 检查是否有新的市场数据 

371 now = datetime.now() 

372 time_since_last_data = (now - self.last_market_data_time).total_seconds() 

373 

374 # 如果超过30秒没有数据,才主动获取 

375 if time_since_last_data > 30: 

376 self._send_websocket_log( 

377 f"【量化交易系统】⚠️ 超过30秒未收到数据,主动获取实时数据" 

378 ) 

379 self._process_realtime_data() 

380 else: 

381 # 正常情况,等待数据推送 

382 if time_since_last_data > 10: # 超过10秒记录一次状态 

383 self._send_websocket_log( 

384 f"【量化交易系统】📊 等待数据推送中... ({time_since_last_data:.1f}秒)" 

385 ) 

386 

387 except Exception as e: 

388 self._send_websocket_log(f"【量化交易系统】❌ 检查实时数据失败: {e}") 

389 

390 def _process_realtime_data(self): 

391 """处理实时数据""" 

392 try: 

393 if self.asset_mode == AssetMode.SIMULATION: 

394 # 实时交易 + 模拟资产:通过抽象接口订阅获取实时数据 

395 self._process_realtime_simulation_data() 

396 elif self.asset_mode == AssetMode.REAL: 

397 # 实时交易 + 真实资产:通过抽象接口订阅获取实时数据 

398 self._process_realtime_real_data() 

399 

400 except Exception as e: 

401 self._send_websocket_log(f"【量化交易系统】❌ 处理实时数据失败: {e}") 

402 

403 def _process_realtime_simulation_data(self): 

404 """处理实时交易+模拟资产数据""" 

405 try: 

406 # 从数据源获取实时行情(通过抽象接口) 

407 quotes = self.quote_data_adapter.get_quote(self.subscribed_symbols) 

408 

409 if quotes: 

410 for quote in quotes: 

411 if hasattr(quote, "symbol") and hasattr(quote, "last_done"): 

412 market_data = MarketData( 

413 symbol=quote.symbol, 

414 timestamp=datetime.now(), 

415 open=( 

416 Decimal(str(quote.open)) 

417 if hasattr(quote, "open") and quote.open 

418 else Decimal("0") 

419 ), 

420 high=( 

421 Decimal(str(quote.high)) 

422 if hasattr(quote, "high") and quote.high 

423 else Decimal("0") 

424 ), 

425 low=( 

426 Decimal(str(quote.low)) 

427 if hasattr(quote, "low") and quote.low 

428 else Decimal("0") 

429 ), 

430 close=Decimal(str(quote.last_done)), 

431 volume=( 

432 int(quote.volume) 

433 if hasattr(quote, "volume") and quote.volume 

434 else 0 

435 ), 

436 ) 

437 

438 # 推送给策略引擎 

439 self.strategy_engine.process_market_data(market_data) 

440 # 推送给交易引擎(模拟交易引擎需要市场数据来处理订单成交) 

441 self.trading_engine.process_market_data(market_data) 

442 else: 

443 print("【量化交易系统】⚠️ 未获取到实时行情数据,等待数据源推送") 

444 

445 except Exception as e: 

446 self._send_websocket_log(f"【量化交易系统】❌ 处理实时模拟数据失败: {e}") 

447 

448 def _process_realtime_real_data(self): 

449 """处理实时交易+真实资产数据""" 

450 try: 

451 # 从数据源获取实时行情(通过抽象接口) 

452 quotes = self.quote_data_adapter.get_quote(self.subscribed_symbols) 

453 

454 if quotes: 

455 for quote in quotes: 

456 if hasattr(quote, "symbol") and hasattr(quote, "last_done"): 

457 market_data = MarketData( 

458 symbol=quote.symbol, 

459 timestamp=datetime.now(), 

460 open=( 

461 Decimal(str(quote.open)) 

462 if hasattr(quote, "open") and quote.open 

463 else Decimal("0") 

464 ), 

465 high=( 

466 Decimal(str(quote.high)) 

467 if hasattr(quote, "high") and quote.high 

468 else Decimal("0") 

469 ), 

470 low=( 

471 Decimal(str(quote.low)) 

472 if hasattr(quote, "low") and quote.low 

473 else Decimal("0") 

474 ), 

475 close=Decimal(str(quote.last_done)), 

476 volume=( 

477 int(quote.volume) 

478 if hasattr(quote, "volume") and quote.volume 

479 else 0 

480 ), 

481 ) 

482 

483 # 推送给策略引擎 

484 self.strategy_engine.process_market_data(market_data) 

485 # 推送给交易引擎(真实交易引擎也需要市场数据) 

486 self.trading_engine.process_market_data(market_data) 

487 else: 

488 print("【量化交易系统】⚠️ 未获取到实时行情数据,等待数据源推送") 

489 

490 except Exception as e: 

491 self._send_websocket_log(f"【量化交易系统】❌ 处理实时真实数据失败: {e}") 

492 

493 def _process_backtest_data(self): 

494 """处理回测数据 - 回测模式+模拟资产,连续处理追求速度""" 

495 try: 

496 if not self.current_time or not self.backtest_end_time: 

497 self._send_websocket_log(f"【量化交易系统】❌ 回测时间范围未设置") 

498 return 

499 

500 if self.current_time > self.backtest_end_time: 

501 self._send_websocket_log( 

502 f"【量化交易系统】✅ 回测完成: {self.session_id}" 

503 ) 

504 

505 # 更新数据库中的会话状态为已完成 

506 try: 

507 from core.services.trading_service import TradingService 

508 

509 trading_service = TradingService(self.user_id) 

510 updated_session = trading_service.stop_trading_session( 

511 self.session_id 

512 ) 

513 if updated_session: 

514 self._send_websocket_log( 

515 f"【量化交易系统】✅ 会话状态已更新为已完成" 

516 ) 

517 else: 

518 self._send_websocket_log(f"【量化交易系统】⚠️ 会话状态更新失败") 

519 except Exception as e: 

520 self._send_websocket_log( 

521 f"【量化交易系统】❌ 更新会话状态失败: {e}" 

522 ) 

523 

524 self.stop() 

525 return 

526 

527 # 连续处理多个时间点,提高回测速度 

528 processed_count = 0 

529 max_batch_size = 100 # 每次最多处理100个时间点 

530 

531 while ( 

532 self.current_time <= self.backtest_end_time 

533 and processed_count < max_batch_size 

534 and self.is_running 

535 and not self.stop_event.is_set() 

536 ): 

537 

538 # 获取当前时间点的历史数据 

539 market_data_list = self._get_backtest_historical_data(self.current_time) 

540 

541 if market_data_list: 

542 # 记录处理前的交易数量 

543 initial_trade_count = self._get_current_trade_count() 

544 

545 # 有数据就处理 

546 for market_data in market_data_list: 

547 # 推送给策略引擎 

548 self.strategy_engine.process_market_data(market_data) 

549 # 推送给交易引擎(模拟交易引擎需要市场数据来处理订单成交) 

550 self.trading_engine.process_market_data(market_data) 

551 

552 # 等待策略执行完成 

553 if not self.strategy_engine.wait_for_strategy_execution( 

554 timeout=10.0 

555 ): 

556 self._send_websocket_log( 

557 f"【量化交易系统】⚠️ 策略执行超时,继续下一个时间点" 

558 ) 

559 

560 # 检查是否发生了交易 

561 current_trade_count = self._get_current_trade_count() 

562 if current_trade_count > initial_trade_count: 

563 trade_count = current_trade_count - initial_trade_count 

564 self._send_websocket_log( 

565 f"【量化交易系统】💼 检测到 {trade_count} 笔交易发生" 

566 ) 

567 # 订单处理是同步的,不需要等待 

568 

569 processed_count += 1 

570 

571 # 每处理10个时间点输出一次进度 

572 if processed_count % 10 == 0: 

573 self._send_websocket_log( 

574 f"【量化交易系统】📊 回测进度: {self.current_time}, 已处理: {processed_count}个时间点" 

575 ) 

576 

577 # 推进到下一个时间点,保持时区信息 

578 self.current_time += timedelta(minutes=1) 

579 

580 # 确保时区信息保持一致 

581 if self.backtest_start_time.tzinfo and not self.current_time.tzinfo: 

582 self.current_time = self.current_time.replace( 

583 tzinfo=self.backtest_start_time.tzinfo 

584 ) 

585 

586 # 输出批量处理结果 

587 if processed_count > 0: 

588 self._send_websocket_log( 

589 f"【量化交易系统】⚡ 批量处理完成: {processed_count}个时间点" 

590 ) 

591 

592 except Exception as e: 

593 self._send_websocket_log(f"【量化交易系统】❌ 处理回测数据失败: {e}") 

594 self._send_websocket_log(f"【量化交易系统】❌ 处理回测数据失败: {e}") 

595 

596 def _send_websocket_log(self, message: str, log_type: str = "log"): 

597 """发送WebSocket日志 - 通过StrategyEngine统一处理""" 

598 if self.strategy_engine: 

599 self.strategy_engine.log_message(message, log_type, "量化交易系统") 

600 else: 

601 # 如果StrategyEngine还没有初始化,使用备用方法 

602 try: 

603 from core.services.websocket_service_factory import \ 

604 send_websocket_log 

605 

606 send_websocket_log(self.session_id, message, log_type) 

607 except Exception as e: 

608 print(f"❌ 发送WebSocket日志失败: {e}") 

609 # 移除print语句,避免日志打到控制台 

610 

611 def _get_current_trade_count(self) -> int: 

612 """获取当前交易数量,用于检测是否发生了交易""" 

613 try: 

614 if self.trading_engine and hasattr( 

615 self.trading_engine, "simulation_engine" 

616 ): 

617 simulation_engine = self.trading_engine.simulation_engine 

618 if simulation_engine and hasattr(simulation_engine, "trades"): 

619 return len(simulation_engine.trades) 

620 return 0 

621 except Exception as e: 

622 self._send_websocket_log(f"【量化交易系统】❌ 获取交易数量失败: {e}") 

623 return 0 

624 

625 def _get_backtest_historical_data(self, timestamp: datetime) -> List[MarketData]: 

626 """获取回测历史数据 - 模拟交易模块读取历史数据推送""" 

627 market_data_list = [] 

628 missing_data_symbols = [] 

629 

630 for symbol in self.subscribed_symbols: 

631 try: 

632 # 1. 优先从数据库/缓存读取历史数据 

633 historical_data = self._get_historical_data(symbol, timestamp) 

634 

635 if historical_data: 

636 market_data = MarketData( 

637 symbol=symbol, 

638 timestamp=timestamp, 

639 open=Decimal(str(historical_data.get("open", 150))), 

640 high=Decimal(str(historical_data.get("high", 155))), 

641 low=Decimal(str(historical_data.get("low", 148))), 

642 close=Decimal(str(historical_data.get("close", 152))), 

643 volume=int(historical_data.get("volume", 1000)), 

644 is_market_open=True, # 有数据表示市场开盘 

645 ) 

646 self._send_websocket_log( 

647 f"【量化交易系统】📊 从数据库获取历史数据: {symbol} at {timestamp} = ${market_data.close:.2f}" 

648 ) 

649 market_data_list.append(market_data) 

650 else: 

651 # 2. 如果没有历史数据,记录缺失的股票 

652 missing_data_symbols.append(symbol) 

653 self._send_websocket_log( 

654 f"【量化交易系统】⏰ 无历史数据: {symbol} at {timestamp}" 

655 ) 

656 

657 except Exception as e: 

658 self._send_websocket_log( 

659 f"【量化交易系统】❌ 获取回测数据失败: {symbol} - {e}" 

660 ) 

661 missing_data_symbols.append(symbol) 

662 continue 

663 

664 # 如果任何股票缺少数据,记录日志但不创建无数据推送 

665 if missing_data_symbols: 

666 self._send_websocket_log( 

667 f"【量化交易系统】⏰ 无数据时刻,跳过推送: {missing_data_symbols} at {timestamp}" 

668 ) 

669 

670 # 只返回有真实数据的市场数据,不推送无数据时刻 

671 return market_data_list 

672 

673 def _get_historical_data( 

674 self, symbol: str, timestamp: datetime 

675 ) -> Optional[Dict[str, Any]]: 

676 """从数据库获取历史数据""" 

677 try: 

678 # 使用StockRepository的抽象接口获取历史数据 

679 import asyncio 

680 

681 from core.repositories.stock_repository import StockRepository 

682 from infrastructure.database.redis_client import get_redis 

683 

684 stock_repo = StockRepository(get_redis().client) 

685 

686 # 将datetime转换为时间戳,确保时区处理正确 

687 from datetime import timezone 

688 

689 if timestamp.tzinfo is None: 

690 # 如果没有时区信息,假设是UTC时间 

691 timestamp_int = int(timestamp.replace(tzinfo=timezone.utc).timestamp()) 

692 else: 

693 # 如果有时区信息,直接转换 

694 timestamp_int = int(timestamp.timestamp()) 

695 

696 # 使用StockRepository的get_stock_data方法获取数据 

697 # 注意:这里需要同步调用异步方法 

698 try: 

699 # 尝试获取当前事件循环 

700 try: 

701 loop = asyncio.get_running_loop() 

702 # 如果已经有运行中的循环,使用线程池执行 

703 import concurrent.futures 

704 

705 with concurrent.futures.ThreadPoolExecutor() as executor: 

706 future = executor.submit( 

707 asyncio.run, 

708 stock_repo.get_stock_data(symbol, timestamp_int), 

709 ) 

710 stock_data = future.result() 

711 except RuntimeError: 

712 # 没有运行中的循环,直接使用asyncio.run 

713 stock_data = asyncio.run( 

714 stock_repo.get_stock_data(symbol, timestamp_int) 

715 ) 

716 except Exception as e: 

717 self._send_websocket_log( 

718 f"【量化交易系统】❌ 异步调用失败: {symbol} - {e}" 

719 ) 

720 return None 

721 

722 if stock_data: 

723 # 转换为字典格式 

724 result = { 

725 "open": float(stock_data.open), 

726 "high": float(stock_data.high), 

727 "low": float(stock_data.low), 

728 "close": float(stock_data.close), 

729 "volume": int(stock_data.volume), 

730 "turnover": float(stock_data.turnover), 

731 "trade_session": stock_data.trade_session, 

732 } 

733 return result 

734 

735 # 没有历史数据是正常的,市场可能没有开盘 

736 self._send_websocket_log( 

737 f"【量化交易系统】📊 无历史数据: {symbol} at {timestamp} (市场可能未开盘)" 

738 ) 

739 return None 

740 

741 except Exception as e: 

742 self._send_websocket_log( 

743 f"【量化交易系统】❌ 获取历史数据失败: {symbol} at {timestamp} - {e}" 

744 ) 

745 return None 

746 

747 def _initialize_backtest_params(self): 

748 """初始化回测参数""" 

749 try: 

750 # session_config 本身就是配置对象,不需要再访问 'config' 字段 

751 config = self.session_config 

752 self._send_websocket_log( 

753 f"【量化交易系统】🔧 初始化回测参数,配置: {config}" 

754 ) 

755 

756 # 解析开始和结束时间 

757 start_date_str = config.get("start_date") 

758 end_date_str = config.get("end_date") 

759 timezone_str = config.get("timezone", "UTC") 

760 

761 self._send_websocket_log( 

762 f"【量化交易系统】📅 原始时间字符串: start={start_date_str}, end={end_date_str}" 

763 ) 

764 

765 if start_date_str: 

766 # 处理时区信息 

767 if start_date_str.endswith("Z"): 

768 start_date_str = start_date_str.replace("Z", "+00:00") 

769 self.backtest_start_time = datetime.fromisoformat(start_date_str) 

770 self.current_time = self.backtest_start_time 

771 self._send_websocket_log( 

772 f"【量化交易系统】✅ 设置开始时间: {self.backtest_start_time} (时区: {self.backtest_start_time.tzinfo})" 

773 ) 

774 else: 

775 self._send_websocket_log(f"【量化交易系统】❌ 未找到开始时间") 

776 

777 if end_date_str: 

778 # 处理时区信息 

779 if end_date_str.endswith("Z"): 

780 end_date_str = end_date_str.replace("Z", "+00:00") 

781 self.backtest_end_time = datetime.fromisoformat(end_date_str) 

782 self._send_websocket_log( 

783 f"【量化交易系统】✅ 设置结束时间: {self.backtest_end_time} (时区: {self.backtest_end_time.tzinfo})" 

784 ) 

785 else: 

786 self._send_websocket_log(f"【量化交易系统】❌ 未找到结束时间") 

787 

788 self._send_websocket_log( 

789 f"【量化交易系统】📅 回测时间范围: {self.backtest_start_time} - {self.backtest_end_time}" 

790 ) 

791 

792 except Exception as e: 

793 self._send_websocket_log(f"【量化交易系统】❌ 初始化回测参数失败: {e}") 

794 import traceback 

795 

796 traceback.print_exc() 

797 

798 def _initialize_time_series_controller(self): 

799 """初始化时序控制器""" 

800 try: 

801 if self.backtest_start_time and self.backtest_end_time: 

802 self.time_series_controller = TimeSeriesController( 

803 start_time=self.backtest_start_time, 

804 end_time=self.backtest_end_time, 

805 timezone=self.session_config.get("config", {}).get( 

806 "timezone", "UTC" 

807 ), 

808 strategy_engine=self.strategy_engine, # 传递策略引擎引用 

809 ) 

810 

811 # 设置数据源适配器 

812 self.time_series_controller.set_quote_adapter(self.quote_data_adapter) 

813 

814 # 设置回调函数 

815 self.time_series_controller.set_callbacks( 

816 on_progress=self._on_backtest_progress, 

817 on_complete=self._on_backtest_complete, 

818 on_error=self._on_backtest_error, 

819 ) 

820 

821 self._send_websocket_log(f"【量化交易系统】✅ 时序控制器初始化成功") 

822 

823 except Exception as e: 

824 self._send_websocket_log(f"【量化交易系统】❌ 初始化时序控制器失败: {e}") 

825 

826 def _on_backtest_progress(self, progress: float, current_time: datetime): 

827 """回测进度回调""" 

828 self._send_websocket_log( 

829 f"【量化交易系统】📈 回测进度: {progress:.1f}% - {current_time}" 

830 ) 

831 

832 def _on_backtest_complete(self): 

833 """回测完成回调""" 

834 self._send_websocket_log(f"【量化交易系统】✅ 回测完成") 

835 self.stop() 

836 

837 def _on_backtest_error(self, error: Exception): 

838 """回测错误回调""" 

839 self._send_websocket_log(f"【量化交易系统】❌ 回测错误: {error}") 

840 self.stop() 

841 

842 def _get_tradable_symbols(self) -> List[str]: 

843 """获取可交易股票列表""" 

844 try: 

845 # 从会话配置中获取 

846 risk_config = self.session_config.get("risk_config", {}) 

847 

848 # 优先从per_stock_config获取 

849 per_stock_config = risk_config.get("per_stock_config", {}) 

850 if per_stock_config: 

851 symbols = list(per_stock_config.keys()) 

852 if symbols: 

853 self._send_websocket_log( 

854 f"✅ 从per_stock_config获取可交易股票: {symbols}", "info" 

855 ) 

856 return symbols 

857 

858 # 从allowed_symbols获取 

859 allowed_symbols = risk_config.get("allowed_symbols", []) 

860 if allowed_symbols: 

861 self._send_websocket_log( 

862 f"✅ 从allowed_symbols获取可交易股票: {allowed_symbols}", "info" 

863 ) 

864 return allowed_symbols 

865 

866 # 默认股票列表 

867 default_symbols = ["YINN.US", "YANG.US"] 

868 self._send_websocket_log(f"⚠️ 使用默认可交易股票: {default_symbols}", "warn") 

869 return default_symbols 

870 

871 except Exception as e: 

872 self._send_websocket_log(f"❌ 获取可交易股票失败: {e}", "error") 

873 return ["YINN.US", "YANG.US"]