Coverage for core/trading/engines/strategy_engine.py: 41.71%

187 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-13 18:58 +0000

1""" 

2策略引擎 

3""" 

4 

5import asyncio 

6import json 

7import threading 

8import time 

9import uuid 

10from abc import ABC, abstractmethod 

11from datetime import datetime, timedelta 

12from decimal import Decimal 

13from typing import Any, Callable, Dict, List, Optional 

14 

15from core.data_source.adapters.data_source_adapter import \ 

16 QuoteDataSourceAdapter 

17from core.data_source.adapters.quote_adapter import QuoteAdapter 

18from core.models.trading import (MarketData, Order, OrderSide, OrderType, 

19 Portfolio, Position, RiskLimits, 

20 StrategyContext) 

21from core.trading.engines.trading_engine import TradingEngine 

22from core.trading.strategies import BaseStrategy, create_strategy 

23 

24 

25class StrategyEngine: 

26 """策略引擎""" 

27 

28 def __init__( 

29 self, 

30 user_id: str, 

31 session_id: str, 

32 trading_engine: TradingEngine, 

33 session_config: Optional[Dict[str, Any]] = None, 

34 trading_session_engine=None, 

35 ): 

36 self.user_id = user_id 

37 self.session_id = session_id 

38 self.trading_engine = trading_engine 

39 self.session_config = session_config or {} # 缓存会话配置 

40 self.strategies: Dict[str, BaseStrategy] = {} 

41 self.is_running = False 

42 self.strategy_thread: Optional[threading.Thread] = None 

43 self.stop_event = threading.Event() 

44 

45 # 引用TradingSessionEngine,用于统一日志管理 

46 self.trading_session_engine = trading_session_engine 

47 

48 # 策略引擎不需要知道交易模式,统一处理 

49 

50 # 市场数据订阅 

51 self.subscribed_symbols: List[str] = [] 

52 self._cached_tradable_symbols: Optional[List[str]] = None # 缓存可交易股票列表 

53 self.quote_adapter = QuoteAdapter(user_id) 

54 self.quote_data_adapter = QuoteDataSourceAdapter(user_id) 

55 

56 # 统一日志序号计数器 

57 self.log_sequence = 0 

58 

59 # 策略模块初始化状态管理 

60 self.is_initialized = False 

61 self.initialization_lock = threading.Lock() 

62 self.initialization_event = threading.Event() 

63 self.initialization_error = None 

64 

65 # 策略执行状态管理 

66 self.strategy_execution_lock = threading.Lock() 

67 self.strategy_execution_event = threading.Event() 

68 self.strategy_execution_event.set() # 初始状态为可执行 

69 

70 def add_strategy(self, strategy: BaseStrategy) -> bool: 

71 """添加策略""" 

72 try: 

73 # 创建策略上下文 

74 context = StrategyContext( 

75 user_id=self.user_id, 

76 trading_session_id=self.session_id, 

77 start_time=datetime.now(), 

78 current_time=datetime.now(), 

79 market_data_cache={}, 

80 portfolio_cache=None, 

81 config=self.session_config, # 传递会话配置,包含可交易股票列表 

82 ) 

83 context.trading_engine = self.trading_engine 

84 context.quote_adapter = self.quote_adapter 

85 

86 # 如果是回测模式,添加回测开始时间到配置中 

87 if "backtest_start_time" in self.session_config: 

88 context.config["backtest_start_time"] = self.session_config[ 

89 "backtest_start_time" 

90 ] 

91 

92 # 设置策略的StrategyEngine引用,用于统一日志管理 

93 strategy.strategy_engine = self 

94 

95 # 初始化策略 

96 strategy.initialize(context) 

97 

98 self.strategies[strategy.name] = strategy 

99 self._send_websocket_log(f"【策略模块】✅ 策略已添加: {strategy.name}") 

100 return True 

101 

102 except Exception as e: 

103 print(f"❌ 添加策略失败: {e}") 

104 return False 

105 

106 def remove_strategy(self, strategy_name: str) -> bool: 

107 """移除策略""" 

108 if strategy_name in self.strategies: 

109 del self.strategies[strategy_name] 

110 print(f"✅ 策略已移除: {strategy_name}") 

111 return True 

112 return False 

113 

114 def start(self) -> bool: 

115 """启动策略引擎""" 

116 if self.is_running: 

117 return True 

118 

119 try: 

120 self.is_running = True 

121 self.stop_event.clear() 

122 

123 self._send_websocket_log( 

124 f"【策略模块】🚀 策略引擎启动中,会话: {self.session_id}" 

125 ) 

126 

127 # 启动初始化流程 

128 self._start_initialization() 

129 

130 return True 

131 

132 except Exception as e: 

133 self._send_websocket_log(f"【策略模块】❌ 启动策略引擎失败: {e}") 

134 return False 

135 

136 def _start_initialization(self): 

137 """启动策略模块初始化流程""" 

138 

139 def initialization_thread(): 

140 try: 

141 with self.initialization_lock: 

142 if self.is_initialized: 

143 return 

144 

145 self._send_websocket_log(f"【策略模块】📊 开始策略模块初始化...") 

146 

147 # 1. 加载策略历史数据 

148 self._load_strategies_historical_data() 

149 

150 # 2. 等待所有策略完成初始化 

151 self._wait_for_strategies_initialization() 

152 

153 # 3. 标记初始化完成 

154 self.is_initialized = True 

155 self.initialization_event.set() 

156 

157 self._send_websocket_log( 

158 f"【策略模块】✅ 策略模块初始化完成,会话: {self.session_id}" 

159 ) 

160 

161 except Exception as e: 

162 self.initialization_error = e 

163 self.initialization_event.set() 

164 self._send_websocket_log(f"【策略模块】❌ 策略模块初始化失败: {e}") 

165 

166 threading.Thread(target=initialization_thread, daemon=True).start() 

167 

168 def _wait_for_strategies_initialization(self): 

169 """等待所有策略完成初始化""" 

170 for strategy in self.strategies.values(): 

171 if hasattr(strategy, "is_initialized") and not strategy.is_initialized: 

172 self._send_websocket_log( 

173 f"【策略模块】⏳ 等待策略 {strategy.name} 初始化完成..." 

174 ) 

175 # 这里可以添加策略初始化的等待逻辑 

176 # 暂时使用简单的等待 

177 import time 

178 

179 time.sleep(0.1) 

180 

181 def wait_for_initialization(self, timeout: float = 30.0) -> bool: 

182 """等待策略模块初始化完成""" 

183 if self.is_initialized: 

184 return True 

185 

186 self._send_websocket_log(f"【策略模块】⏳ 等待策略模块初始化完成...") 

187 

188 # 等待初始化完成 

189 success = self.initialization_event.wait(timeout) 

190 

191 if success and not self.initialization_error: 

192 self._send_websocket_log(f"【策略模块】✅ 策略模块初始化完成") 

193 return True 

194 elif self.initialization_error: 

195 self._send_websocket_log( 

196 f"【策略模块】❌ 策略模块初始化失败: {self.initialization_error}" 

197 ) 

198 return False 

199 else: 

200 self._send_websocket_log(f"【策略模块】⏰ 策略模块初始化超时") 

201 return False 

202 

203 def stop(self) -> bool: 

204 """停止策略引擎""" 

205 if not self.is_running: 

206 return True 

207 

208 try: 

209 self.is_running = False 

210 self.stop_event.set() 

211 

212 # 等待策略线程结束 

213 if self.strategy_thread and self.strategy_thread.is_alive(): 

214 self.strategy_thread.join(timeout=5) 

215 

216 self._send_websocket_log( 

217 f"【策略模块】🛑 策略引擎已停止,会话: {self.session_id}" 

218 ) 

219 return True 

220 

221 except Exception as e: 

222 print(f"❌ 停止策略引擎失败: {e}") 

223 return False 

224 

225 def pause(self) -> bool: 

226 """暂停策略引擎""" 

227 # 策略引擎暂停逻辑 

228 self._send_websocket_log( 

229 f"【策略模块】⏸️ 策略引擎已暂停,会话: {self.session_id}" 

230 ) 

231 return True 

232 

233 def resume(self) -> bool: 

234 """恢复策略引擎""" 

235 # 策略引擎恢复逻辑 

236 self._send_websocket_log( 

237 f"【策略模块】▶️ 策略引擎已恢复,会话: {self.session_id}" 

238 ) 

239 return True 

240 

241 def process_market_data(self, market_data: MarketData) -> None: 

242 """处理市场数据 - 分发给所有策略""" 

243 if not self.is_running: 

244 return 

245 

246 # 等待策略执行完成 

247 self.strategy_execution_event.wait() 

248 

249 # 设置策略执行状态为进行中 

250 self.strategy_execution_event.clear() 

251 

252 try: 

253 self._send_websocket_log( 

254 f"【策略模块】📈 处理市场数据: {market_data.symbol} at {market_data.timestamp} = ${market_data.close}" 

255 ) 

256 

257 # 分发给所有策略 

258 for strategy in self.strategies.values(): 

259 try: 

260 strategy.on_market_data(market_data) 

261 self._send_websocket_log( 

262 f"【策略模块】⚡ 策略 {strategy.name} 执行完成,数据时间: {market_data.timestamp}" 

263 ) 

264 except Exception as e: 

265 self._send_websocket_log( 

266 f"【策略模块】❌ 策略 {strategy.name} 执行失败: {e}" 

267 ) 

268 finally: 

269 # 设置策略执行状态为完成 

270 self.strategy_execution_event.set() 

271 

272 def wait_for_strategy_execution(self, timeout: float = 5.0) -> bool: 

273 """等待策略执行完成""" 

274 return self.strategy_execution_event.wait(timeout) 

275 

276 # 更新最后市场数据时间 

277 self.last_market_data_time = market_data.timestamp 

278 

279 def _strategy_loop(self): 

280 """策略循环 - 被动等待数据推送""" 

281 while self.is_running and not self.stop_event.is_set(): 

282 try: 

283 # 策略引擎完全被动化,不主动获取数据 

284 # 只等待数据推送 

285 time.sleep(1) # 短暂休眠,避免CPU占用过高 

286 

287 except Exception as e: 

288 self._send_websocket_log(f"【策略模块】❌ 策略循环错误: {e}") 

289 time.sleep(5) # 出错时等待更长时间 

290 

291 def _load_strategies_historical_data(self): 

292 """加载所有策略的历史数据""" 

293 try: 

294 for strategy in self.strategies.values(): 

295 if hasattr(strategy, "_load_historical_data"): 

296 self._send_websocket_log( 

297 f"【策略模块】📊 开始加载策略 {strategy.name} 的历史数据" 

298 ) 

299 strategy._load_historical_data() 

300 except Exception as e: 

301 self._send_websocket_log(f"【策略模块】❌ 加载策略历史数据失败: {e}") 

302 

303 def get_historical_data( 

304 self, symbol: str, start_time: datetime, end_time: datetime 

305 ) -> List[MarketData]: 

306 """获取历史数据 - 供策略使用""" 

307 try: 

308 # 定义日志回调函数 

309 def log_callback(message: str, log_type: str = "info"): 

310 self._send_websocket_log(f"【数据适配器】{message}", log_type) 

311 

312 return self.quote_data_adapter.get_historical_data( 

313 symbol, start_time, end_time, log_callback 

314 ) 

315 except Exception as e: 

316 self._send_websocket_log(f"【策略模块】❌ 获取历史数据失败: {symbol} - {e}") 

317 return [] 

318 

319 def log_message( 

320 self, message: str, log_type: str = "log", component: str = "strategy_engine" 

321 ): 

322 """公共日志接口 - 供其他模块调用""" 

323 self._send_websocket_log(message, log_type, component) 

324 

325 def _send_websocket_log( 

326 self, message: str, log_type: str = "log", component: str = "strategy_engine" 

327 ): 

328 """发送WebSocket日志""" 

329 # 增加日志序号 

330 self.log_sequence += 1 

331 

332 # 保存到Redis 

333 self._save_log_to_redis(message, log_type, component) 

334 

335 # 推送到WebSocket - 使用工厂模式 

336 try: 

337 from core.services.websocket_service_factory import \ 

338 send_websocket_log 

339 

340 send_websocket_log(self.session_id, message, log_type) 

341 except Exception as e: 

342 print(f"❌ 发送WebSocket日志失败: {e}") 

343 print(f"📝 [{component}] {message}") 

344 

345 def _save_log_to_redis(self, message: str, log_type: str, component: str): 

346 """保存日志到Redis - 通过TradingRepository""" 

347 try: 

348 from core.repositories.trading_repository import TradingRepository 

349 

350 trading_repo = TradingRepository() 

351 

352 log_entry = { 

353 "id": str(uuid.uuid4()), 

354 "message": message, 

355 "log_type": log_type, 

356 "component": component, 

357 "timestamp": datetime.now().isoformat(timespec="milliseconds"), 

358 "sequence": self.log_sequence, 

359 } 

360 

361 # 通过TradingRepository保存日志 

362 success = trading_repo.add_session_log(self.session_id, log_entry) 

363 if not success: 

364 print(f"❌ 保存日志到Redis失败") 

365 

366 except Exception as e: 

367 print(f"❌ 保存日志到Redis失败: {e}") 

368 

369 def get_strategies_status(self) -> Dict[str, Any]: 

370 """获取策略状态""" 

371 return { 

372 "is_running": self.is_running, 

373 "strategies_count": len(self.strategies), 

374 "strategies": [strategy.name for strategy in self.strategies.values()], 

375 "subscribed_symbols": self.subscribed_symbols, 

376 "last_market_data_time": ( 

377 self.last_market_data_time.isoformat() 

378 if hasattr(self, "last_market_data_time") 

379 else None 

380 ), 

381 }