Coverage for core/trading/engines/strategy_engine.py: 41.71%
187 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-13 18:58 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-13 18:58 +0000
1"""
2策略引擎
3"""
5import asyncio
6import json
7import threading
8import time
9import uuid
10from abc import ABC, abstractmethod
11from datetime import datetime, timedelta
12from decimal import Decimal
13from typing import Any, Callable, Dict, List, Optional
15from core.data_source.adapters.data_source_adapter import \
16 QuoteDataSourceAdapter
17from core.data_source.adapters.quote_adapter import QuoteAdapter
18from core.models.trading import (MarketData, Order, OrderSide, OrderType,
19 Portfolio, Position, RiskLimits,
20 StrategyContext)
21from core.trading.engines.trading_engine import TradingEngine
22from core.trading.strategies import BaseStrategy, create_strategy
25class StrategyEngine:
26 """策略引擎"""
28 def __init__(
29 self,
30 user_id: str,
31 session_id: str,
32 trading_engine: TradingEngine,
33 session_config: Optional[Dict[str, Any]] = None,
34 trading_session_engine=None,
35 ):
36 self.user_id = user_id
37 self.session_id = session_id
38 self.trading_engine = trading_engine
39 self.session_config = session_config or {} # 缓存会话配置
40 self.strategies: Dict[str, BaseStrategy] = {}
41 self.is_running = False
42 self.strategy_thread: Optional[threading.Thread] = None
43 self.stop_event = threading.Event()
45 # 引用TradingSessionEngine,用于统一日志管理
46 self.trading_session_engine = trading_session_engine
48 # 策略引擎不需要知道交易模式,统一处理
50 # 市场数据订阅
51 self.subscribed_symbols: List[str] = []
52 self._cached_tradable_symbols: Optional[List[str]] = None # 缓存可交易股票列表
53 self.quote_adapter = QuoteAdapter(user_id)
54 self.quote_data_adapter = QuoteDataSourceAdapter(user_id)
56 # 统一日志序号计数器
57 self.log_sequence = 0
59 # 策略模块初始化状态管理
60 self.is_initialized = False
61 self.initialization_lock = threading.Lock()
62 self.initialization_event = threading.Event()
63 self.initialization_error = None
65 # 策略执行状态管理
66 self.strategy_execution_lock = threading.Lock()
67 self.strategy_execution_event = threading.Event()
68 self.strategy_execution_event.set() # 初始状态为可执行
70 def add_strategy(self, strategy: BaseStrategy) -> bool:
71 """添加策略"""
72 try:
73 # 创建策略上下文
74 context = StrategyContext(
75 user_id=self.user_id,
76 trading_session_id=self.session_id,
77 start_time=datetime.now(),
78 current_time=datetime.now(),
79 market_data_cache={},
80 portfolio_cache=None,
81 config=self.session_config, # 传递会话配置,包含可交易股票列表
82 )
83 context.trading_engine = self.trading_engine
84 context.quote_adapter = self.quote_adapter
86 # 如果是回测模式,添加回测开始时间到配置中
87 if "backtest_start_time" in self.session_config:
88 context.config["backtest_start_time"] = self.session_config[
89 "backtest_start_time"
90 ]
92 # 设置策略的StrategyEngine引用,用于统一日志管理
93 strategy.strategy_engine = self
95 # 初始化策略
96 strategy.initialize(context)
98 self.strategies[strategy.name] = strategy
99 self._send_websocket_log(f"【策略模块】✅ 策略已添加: {strategy.name}")
100 return True
102 except Exception as e:
103 print(f"❌ 添加策略失败: {e}")
104 return False
106 def remove_strategy(self, strategy_name: str) -> bool:
107 """移除策略"""
108 if strategy_name in self.strategies:
109 del self.strategies[strategy_name]
110 print(f"✅ 策略已移除: {strategy_name}")
111 return True
112 return False
114 def start(self) -> bool:
115 """启动策略引擎"""
116 if self.is_running:
117 return True
119 try:
120 self.is_running = True
121 self.stop_event.clear()
123 self._send_websocket_log(
124 f"【策略模块】🚀 策略引擎启动中,会话: {self.session_id}"
125 )
127 # 启动初始化流程
128 self._start_initialization()
130 return True
132 except Exception as e:
133 self._send_websocket_log(f"【策略模块】❌ 启动策略引擎失败: {e}")
134 return False
136 def _start_initialization(self):
137 """启动策略模块初始化流程"""
139 def initialization_thread():
140 try:
141 with self.initialization_lock:
142 if self.is_initialized:
143 return
145 self._send_websocket_log(f"【策略模块】📊 开始策略模块初始化...")
147 # 1. 加载策略历史数据
148 self._load_strategies_historical_data()
150 # 2. 等待所有策略完成初始化
151 self._wait_for_strategies_initialization()
153 # 3. 标记初始化完成
154 self.is_initialized = True
155 self.initialization_event.set()
157 self._send_websocket_log(
158 f"【策略模块】✅ 策略模块初始化完成,会话: {self.session_id}"
159 )
161 except Exception as e:
162 self.initialization_error = e
163 self.initialization_event.set()
164 self._send_websocket_log(f"【策略模块】❌ 策略模块初始化失败: {e}")
166 threading.Thread(target=initialization_thread, daemon=True).start()
168 def _wait_for_strategies_initialization(self):
169 """等待所有策略完成初始化"""
170 for strategy in self.strategies.values():
171 if hasattr(strategy, "is_initialized") and not strategy.is_initialized:
172 self._send_websocket_log(
173 f"【策略模块】⏳ 等待策略 {strategy.name} 初始化完成..."
174 )
175 # 这里可以添加策略初始化的等待逻辑
176 # 暂时使用简单的等待
177 import time
179 time.sleep(0.1)
181 def wait_for_initialization(self, timeout: float = 30.0) -> bool:
182 """等待策略模块初始化完成"""
183 if self.is_initialized:
184 return True
186 self._send_websocket_log(f"【策略模块】⏳ 等待策略模块初始化完成...")
188 # 等待初始化完成
189 success = self.initialization_event.wait(timeout)
191 if success and not self.initialization_error:
192 self._send_websocket_log(f"【策略模块】✅ 策略模块初始化完成")
193 return True
194 elif self.initialization_error:
195 self._send_websocket_log(
196 f"【策略模块】❌ 策略模块初始化失败: {self.initialization_error}"
197 )
198 return False
199 else:
200 self._send_websocket_log(f"【策略模块】⏰ 策略模块初始化超时")
201 return False
203 def stop(self) -> bool:
204 """停止策略引擎"""
205 if not self.is_running:
206 return True
208 try:
209 self.is_running = False
210 self.stop_event.set()
212 # 等待策略线程结束
213 if self.strategy_thread and self.strategy_thread.is_alive():
214 self.strategy_thread.join(timeout=5)
216 self._send_websocket_log(
217 f"【策略模块】🛑 策略引擎已停止,会话: {self.session_id}"
218 )
219 return True
221 except Exception as e:
222 print(f"❌ 停止策略引擎失败: {e}")
223 return False
225 def pause(self) -> bool:
226 """暂停策略引擎"""
227 # 策略引擎暂停逻辑
228 self._send_websocket_log(
229 f"【策略模块】⏸️ 策略引擎已暂停,会话: {self.session_id}"
230 )
231 return True
233 def resume(self) -> bool:
234 """恢复策略引擎"""
235 # 策略引擎恢复逻辑
236 self._send_websocket_log(
237 f"【策略模块】▶️ 策略引擎已恢复,会话: {self.session_id}"
238 )
239 return True
241 def process_market_data(self, market_data: MarketData) -> None:
242 """处理市场数据 - 分发给所有策略"""
243 if not self.is_running:
244 return
246 # 等待策略执行完成
247 self.strategy_execution_event.wait()
249 # 设置策略执行状态为进行中
250 self.strategy_execution_event.clear()
252 try:
253 self._send_websocket_log(
254 f"【策略模块】📈 处理市场数据: {market_data.symbol} at {market_data.timestamp} = ${market_data.close}"
255 )
257 # 分发给所有策略
258 for strategy in self.strategies.values():
259 try:
260 strategy.on_market_data(market_data)
261 self._send_websocket_log(
262 f"【策略模块】⚡ 策略 {strategy.name} 执行完成,数据时间: {market_data.timestamp}"
263 )
264 except Exception as e:
265 self._send_websocket_log(
266 f"【策略模块】❌ 策略 {strategy.name} 执行失败: {e}"
267 )
268 finally:
269 # 设置策略执行状态为完成
270 self.strategy_execution_event.set()
272 def wait_for_strategy_execution(self, timeout: float = 5.0) -> bool:
273 """等待策略执行完成"""
274 return self.strategy_execution_event.wait(timeout)
276 # 更新最后市场数据时间
277 self.last_market_data_time = market_data.timestamp
279 def _strategy_loop(self):
280 """策略循环 - 被动等待数据推送"""
281 while self.is_running and not self.stop_event.is_set():
282 try:
283 # 策略引擎完全被动化,不主动获取数据
284 # 只等待数据推送
285 time.sleep(1) # 短暂休眠,避免CPU占用过高
287 except Exception as e:
288 self._send_websocket_log(f"【策略模块】❌ 策略循环错误: {e}")
289 time.sleep(5) # 出错时等待更长时间
291 def _load_strategies_historical_data(self):
292 """加载所有策略的历史数据"""
293 try:
294 for strategy in self.strategies.values():
295 if hasattr(strategy, "_load_historical_data"):
296 self._send_websocket_log(
297 f"【策略模块】📊 开始加载策略 {strategy.name} 的历史数据"
298 )
299 strategy._load_historical_data()
300 except Exception as e:
301 self._send_websocket_log(f"【策略模块】❌ 加载策略历史数据失败: {e}")
303 def get_historical_data(
304 self, symbol: str, start_time: datetime, end_time: datetime
305 ) -> List[MarketData]:
306 """获取历史数据 - 供策略使用"""
307 try:
308 # 定义日志回调函数
309 def log_callback(message: str, log_type: str = "info"):
310 self._send_websocket_log(f"【数据适配器】{message}", log_type)
312 return self.quote_data_adapter.get_historical_data(
313 symbol, start_time, end_time, log_callback
314 )
315 except Exception as e:
316 self._send_websocket_log(f"【策略模块】❌ 获取历史数据失败: {symbol} - {e}")
317 return []
319 def log_message(
320 self, message: str, log_type: str = "log", component: str = "strategy_engine"
321 ):
322 """公共日志接口 - 供其他模块调用"""
323 self._send_websocket_log(message, log_type, component)
325 def _send_websocket_log(
326 self, message: str, log_type: str = "log", component: str = "strategy_engine"
327 ):
328 """发送WebSocket日志"""
329 # 增加日志序号
330 self.log_sequence += 1
332 # 保存到Redis
333 self._save_log_to_redis(message, log_type, component)
335 # 推送到WebSocket - 使用工厂模式
336 try:
337 from core.services.websocket_service_factory import \
338 send_websocket_log
340 send_websocket_log(self.session_id, message, log_type)
341 except Exception as e:
342 print(f"❌ 发送WebSocket日志失败: {e}")
343 print(f"📝 [{component}] {message}")
345 def _save_log_to_redis(self, message: str, log_type: str, component: str):
346 """保存日志到Redis - 通过TradingRepository"""
347 try:
348 from core.repositories.trading_repository import TradingRepository
350 trading_repo = TradingRepository()
352 log_entry = {
353 "id": str(uuid.uuid4()),
354 "message": message,
355 "log_type": log_type,
356 "component": component,
357 "timestamp": datetime.now().isoformat(timespec="milliseconds"),
358 "sequence": self.log_sequence,
359 }
361 # 通过TradingRepository保存日志
362 success = trading_repo.add_session_log(self.session_id, log_entry)
363 if not success:
364 print(f"❌ 保存日志到Redis失败")
366 except Exception as e:
367 print(f"❌ 保存日志到Redis失败: {e}")
369 def get_strategies_status(self) -> Dict[str, Any]:
370 """获取策略状态"""
371 return {
372 "is_running": self.is_running,
373 "strategies_count": len(self.strategies),
374 "strategies": [strategy.name for strategy in self.strategies.values()],
375 "subscribed_symbols": self.subscribed_symbols,
376 "last_market_data_time": (
377 self.last_market_data_time.isoformat()
378 if hasattr(self, "last_market_data_time")
379 else None
380 ),
381 }