Coverage for core/trading/engines/trading_engine.py: 25.22%
341 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-13 18:58 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-13 18:58 +0000
1"""
2交易引擎
3"""
5from abc import ABC, abstractmethod
6from datetime import datetime
7from decimal import Decimal
8from typing import Any, Dict, List, Optional
10from core.data_source.adapters.trade_adapter import TradeDataSourceAdapter
11from core.models.broker import FeeConfig, TradingFee
12from core.models.trading import (AccountBalance, AssetMode, MarketData, Order,
13 OrderResult, OrderSide, OrderStatus,
14 OrderType, Position, Trade, TradingMode)
15from core.trading.engines.simulation_engine import SimulationEngine
16from core.trading.utils.fee_calculator import FeeCalculator
19class TradingEngine(ABC):
20 """交易引擎抽象基类"""
22 def __init__(
23 self,
24 user_id: str,
25 session_id: str,
26 trading_mode: TradingMode,
27 asset_mode: AssetMode,
28 ):
29 self.user_id = user_id
30 self.session_id = session_id
31 self.trading_mode = trading_mode
32 self.asset_mode = asset_mode
33 self.is_running = False
34 self.start_time: Optional[datetime] = None
35 self.end_time: Optional[datetime] = None
37 @abstractmethod
38 def initialize(self) -> bool:
39 """初始化交易引擎"""
40 pass
42 @abstractmethod
43 def start(self) -> bool:
44 """启动交易引擎"""
45 pass
47 @abstractmethod
48 def stop(self) -> bool:
49 """停止交易引擎"""
50 pass
52 @abstractmethod
53 def pause(self) -> bool:
54 """暂停交易引擎"""
55 pass
57 @abstractmethod
58 def resume(self) -> bool:
59 """恢复交易引擎"""
60 pass
62 @abstractmethod
63 def submit_order(self, order: Order) -> OrderResult:
64 """提交订单"""
65 pass
67 @abstractmethod
68 def cancel_order(self, order_id: str) -> bool:
69 """取消订单"""
70 pass
72 @abstractmethod
73 def get_positions(self) -> List[Position]:
74 """获取持仓"""
75 pass
77 @abstractmethod
78 def get_account_balance(self) -> AccountBalance:
79 """获取账户余额"""
80 pass
82 @abstractmethod
83 def process_market_data(self, market_data: MarketData):
84 """处理市场数据"""
85 pass
87 def set_strategy_engine(self, strategy_engine):
88 """设置策略引擎引用,用于统一日志管理"""
89 pass
92class RealTradingEngine(TradingEngine):
93 """真实交易引擎"""
95 def __init__(
96 self,
97 user_id: str,
98 session_id: str,
99 trading_mode: TradingMode,
100 asset_mode: AssetMode,
101 fee_config: Optional[FeeConfig] = None,
102 ):
103 super().__init__(user_id, session_id, trading_mode, asset_mode)
104 self.trade_adapter = TradeDataSourceAdapter(user_id)
105 self.orders: Dict[str, Order] = {}
106 self.positions: List[Position] = []
107 self.strategy_engine = None # 策略引擎引用
109 # 费用计算器(用于模拟计算费用,不实际扣除)
110 if fee_config is None:
111 fee_config = FeeConfig() # 使用默认的长桥费率
112 self.fee_calculator = FeeCalculator(fee_config)
114 def initialize(self) -> bool:
115 """初始化真实交易引擎"""
116 try:
117 # 检查交易适配器是否可用
118 if not self.trade_adapter.is_available():
119 if self.strategy_engine:
120 self.strategy_engine.log_message(
121 "❌ 交易适配器不可用", "error", "交易引擎"
122 )
123 else:
124 print("❌ 交易适配器不可用")
125 return False
127 # 加载现有持仓
128 self._load_positions()
130 return True
131 except Exception as e:
132 if self.strategy_engine:
133 self.strategy_engine.log_message(
134 f"❌ 真实交易引擎初始化失败: {e}", "error", "交易引擎"
135 )
136 else:
137 print(f"❌ 真实交易引擎初始化失败: {e}")
138 return False
140 def start(self) -> bool:
141 """启动真实交易引擎"""
142 if self.is_running:
143 return True
145 try:
146 self.is_running = True
147 self.start_time = datetime.now()
148 if self.strategy_engine:
149 self.strategy_engine.log_message(
150 f"✅ 真实交易引擎已启动,会话: {self.session_id}",
151 "info",
152 "交易引擎",
153 )
154 else:
155 print(f"✅ 真实交易引擎已启动,会话: {self.session_id}")
156 return True
157 except Exception as e:
158 if self.strategy_engine:
159 self.strategy_engine.log_message(
160 f"❌ 启动真实交易引擎失败: {e}", "error", "交易引擎"
161 )
162 else:
163 print(f"❌ 启动真实交易引擎失败: {e}")
164 return False
166 def stop(self) -> bool:
167 """停止真实交易引擎"""
168 if not self.is_running:
169 return True
171 try:
172 self.is_running = False
173 self.end_time = datetime.now()
174 if self.strategy_engine:
175 self.strategy_engine.log_message(
176 f"✅ 真实交易引擎已停止,会话: {self.session_id}",
177 "info",
178 "交易引擎",
179 )
180 else:
181 print(f"✅ 真实交易引擎已停止,会话: {self.session_id}")
182 return True
183 except Exception as e:
184 if self.strategy_engine:
185 self.strategy_engine.log_message(
186 f"❌ 停止真实交易引擎失败: {e}", "error", "交易引擎"
187 )
188 else:
189 print(f"❌ 停止真实交易引擎失败: {e}")
190 return False
192 def pause(self) -> bool:
193 """暂停真实交易引擎"""
194 if not self.is_running:
195 return False
197 try:
198 self.is_running = False
199 if self.strategy_engine:
200 self.strategy_engine.log_message(
201 f"⏸️ 真实交易引擎已暂停,会话: {self.session_id}", "info", "交易引擎"
202 )
203 else:
204 print(f"⏸️ 真实交易引擎已暂停,会话: {self.session_id}")
205 return True
206 except Exception as e:
207 if self.strategy_engine:
208 self.strategy_engine.log_message(
209 f"❌ 暂停真实交易引擎失败: {e}", "error", "交易引擎"
210 )
211 else:
212 print(f"❌ 暂停真实交易引擎失败: {e}")
213 return False
215 def resume(self) -> bool:
216 """恢复真实交易引擎"""
217 if self.is_running:
218 return True
220 try:
221 self.is_running = True
222 if self.strategy_engine:
223 self.strategy_engine.log_message(
224 f"▶️ 真实交易引擎已恢复,会话: {self.session_id}", "info", "交易引擎"
225 )
226 else:
227 print(f"▶️ 真实交易引擎已恢复,会话: {self.session_id}")
228 return True
229 except Exception as e:
230 if self.strategy_engine:
231 self.strategy_engine.log_message(
232 f"❌ 恢复真实交易引擎失败: {e}", "error", "交易引擎"
233 )
234 else:
235 print(f"❌ 恢复真实交易引擎失败: {e}")
236 return False
238 def submit_order(self, order: Order) -> OrderResult:
239 """提交真实订单"""
240 if not self.is_running:
241 return OrderResult(
242 order_id="", status=OrderStatus.REJECTED, message="交易引擎未运行"
243 )
245 try:
246 # 使用交易适配器提交订单
247 result = self.trade_adapter.submit_order(
248 symbol=order.symbol,
249 order_type=order.order_type.value,
250 side=order.side.value,
251 submitted_quantity=order.quantity,
252 time_in_force="Day", # 默认当日有效
253 submitted_price=order.price,
254 )
256 if result and result.get("success", False):
257 # 订单提交成功
258 order_id = result.get("order_id", "")
259 self.orders[order_id] = order
261 return OrderResult(
262 order_id=order_id, status=OrderStatus.PENDING, message="订单已提交"
263 )
264 else:
265 # 订单提交失败
266 error_msg = (
267 result.get("message", "订单提交失败") if result else "订单提交失败"
268 )
269 return OrderResult(
270 order_id="", status=OrderStatus.REJECTED, message=error_msg
271 )
273 except Exception as e:
274 if self.strategy_engine:
275 self.strategy_engine.log_message(
276 f"❌ 提交真实订单失败: {e}", "error", "交易引擎"
277 )
278 else:
279 print(f"❌ 提交真实订单失败: {e}")
280 return OrderResult(
281 order_id="",
282 status=OrderStatus.REJECTED,
283 message=f"提交订单失败: {str(e)}",
284 )
286 def cancel_order(self, order_id: str) -> bool:
287 """取消真实订单"""
288 if not self.is_running:
289 return False
291 try:
292 success = self.trade_adapter.cancel_order(order_id)
293 if success and order_id in self.orders:
294 self.orders[order_id].status = OrderStatus.CANCELLED
295 return success
296 except Exception as e:
297 if self.strategy_engine:
298 self.strategy_engine.log_message(
299 f"❌ 取消真实订单失败: {e}", "error", "交易引擎"
300 )
301 else:
302 print(f"❌ 取消真实订单失败: {e}")
303 return False
305 def get_positions(self) -> List[Position]:
306 """获取真实持仓"""
307 try:
308 # 从交易适配器获取持仓
309 positions_data = self.trade_adapter.get_positions()
311 positions = []
312 for pos_data in positions_data:
313 position = Position(
314 symbol=pos_data.get("symbol", ""),
315 quantity=Decimal(str(pos_data.get("quantity", 0))),
316 avg_price=Decimal(str(pos_data.get("cost_price", 0))),
317 current_price=Decimal(str(pos_data.get("current_price", 0))),
318 market_value=Decimal(str(pos_data.get("market_value", 0))),
319 unrealized_pnl=Decimal(str(pos_data.get("unrealized_pnl", 0))),
320 realized_pnl=Decimal("0"), # 需要从成交记录计算
321 )
322 positions.append(position)
324 self.positions = positions
325 return positions
327 except Exception as e:
328 if self.strategy_engine:
329 self.strategy_engine.log_message(
330 f"❌ 获取真实持仓失败: {e}", "error", "交易引擎"
331 )
332 else:
333 print(f"❌ 获取真实持仓失败: {e}")
334 return []
336 def get_account_balance(self) -> AccountBalance:
337 """获取真实账户余额"""
338 try:
339 # 从交易适配器获取账户余额
340 balance_data = self.trade_adapter.get_account_balance()
342 if balance_data and len(balance_data) > 0:
343 balance = balance_data[0]
344 return AccountBalance(
345 total_cash=Decimal(str(balance.get("total_cash", 0))),
346 available_cash=Decimal(str(balance.get("available_cash", 0))),
347 frozen_cash=Decimal(str(balance.get("frozen_cash", 0))),
348 currency=balance.get("currency", "USD"),
349 )
350 else:
351 return AccountBalance(
352 total_cash=Decimal("0"),
353 available_cash=Decimal("0"),
354 frozen_cash=Decimal("0"),
355 currency="USD",
356 )
358 except Exception as e:
359 if self.strategy_engine:
360 self.strategy_engine.log_message(
361 f"❌ 获取真实账户余额失败: {e}", "error", "交易引擎"
362 )
363 else:
364 print(f"❌ 获取真实账户余额失败: {e}")
365 return AccountBalance(
366 total_cash=Decimal("0"),
367 available_cash=Decimal("0"),
368 frozen_cash=Decimal("0"),
369 currency="USD",
370 )
372 def process_market_data(self, market_data: MarketData):
373 """处理市场数据"""
374 # 真实交易引擎不需要特殊处理市场数据
375 # 订单成交由券商系统处理
376 pass
378 def _load_positions(self):
379 """加载现有持仓"""
380 try:
381 self.positions = self.get_positions()
382 except Exception as e:
383 if self.strategy_engine:
384 self.strategy_engine.log_message(
385 f"❌ 加载现有持仓失败: {e}", "error", "交易引擎"
386 )
387 else:
388 print(f"❌ 加载现有持仓失败: {e}")
390 def set_strategy_engine(self, strategy_engine):
391 """设置策略引擎引用,用于统一日志管理"""
392 self.strategy_engine = strategy_engine
394 def _calculate_order_fee(
395 self, quantity: Decimal, price: Decimal, side: str, symbol: str = None
396 ) -> Optional[TradingFee]:
397 """计算订单费用(仅用于展示,不实际扣除)
399 Args:
400 quantity: 交易数量
401 price: 交易价格
402 side: 交易方向(buy/sell)
403 symbol: 股票代码(用于确定市场类型)
405 Returns:
406 TradingFee对象,包含详细的费用分解
407 """
408 try:
409 # 根据股票代码确定市场类型和货币类型
410 market, currency = self._get_market_and_currency_from_symbol(symbol)
412 # 根据市场类型选择对应的计算方法
413 if market == "US":
414 return self.fee_calculator.calculate_us_fees(quantity, price, side)
415 elif market == "HK":
416 return self.fee_calculator.calculate_hk_fees(quantity, price, side)
417 else:
418 # 默认使用美股费率
419 if self.strategy_engine:
420 self.strategy_engine.log_message(
421 f"⚠️ 未知市场类型 {market},使用美股费率", "warning", "交易引擎"
422 )
423 return self.fee_calculator.calculate_us_fees(quantity, price, side)
424 except Exception as e:
425 if self.strategy_engine:
426 self.strategy_engine.log_message(
427 f"❌ 计算交易费用失败: {e}", "error", "交易引擎"
428 )
429 return None
431 def _get_market_and_currency_from_symbol(self, symbol: str) -> tuple[str, str]:
432 """根据股票代码确定市场类型和货币类型
434 Args:
435 symbol: 股票代码(如 AAPL.US, 700.HK)
437 Returns:
438 tuple: (市场类型, 货币类型)
439 """
440 if not symbol or "." not in symbol:
441 # 如果没有股票代码或格式不正确,默认使用美股
442 return ("US", "USD")
444 # 从股票代码中提取市场后缀
445 market_suffix = symbol.split(".")[-1].upper()
447 # 市场类型到货币类型的映射
448 market_currency_mapping = {
449 "US": ("US", "USD"),
450 "HK": ("HK", "HKD"),
451 "SZ": ("CN", "CNY"), # 深圳
452 "SH": ("CN", "CNY"), # 上海
453 "SG": ("SG", "SGD"), # 新加坡
454 }
456 market, currency = market_currency_mapping.get(market_suffix, ("US", "USD"))
457 return market, currency
460class SimulationTradingEngine(TradingEngine):
461 """模拟交易引擎"""
463 def __init__(
464 self,
465 user_id: str,
466 session_id: str,
467 trading_mode: TradingMode,
468 asset_mode: AssetMode,
469 initial_capital: Decimal,
470 currency: str = "USD",
471 fee_config: Optional[FeeConfig] = None,
472 ):
473 super().__init__(user_id, session_id, trading_mode, asset_mode)
474 self.initial_capital = initial_capital
475 self.currency = currency
476 self.simulation_engine: Optional[SimulationEngine] = None
477 self.strategy_engine = None # 策略引擎引用
478 self.fee_config = fee_config if fee_config else FeeConfig() # 保存费用配置
480 def initialize(self) -> bool:
481 """初始化模拟交易引擎"""
482 try:
483 # 先创建SimulationEngine,但不传递strategy_engine(因为还没有创建)
484 self.simulation_engine = SimulationEngine(
485 user_id=self.user_id,
486 session_id=self.session_id,
487 initial_capital=self.initial_capital,
488 strategy_engine=None, # 稍后通过set_strategy_engine设置
489 currency=self.currency,
490 fee_config=self.fee_config, # 传递费用配置
491 )
493 return self.simulation_engine.initialize()
494 except Exception as e:
495 if self.strategy_engine:
496 self.strategy_engine.log_message(
497 f"❌ 模拟交易引擎初始化失败: {e}", "error", "交易引擎"
498 )
499 else:
500 print(f"❌ 模拟交易引擎初始化失败: {e}")
501 return False
503 def start(self) -> bool:
504 """启动模拟交易引擎"""
505 if not self.simulation_engine:
506 return False
508 if self.is_running:
509 return True
511 try:
512 self.is_running = True
513 self.start_time = datetime.now()
514 # 通过策略引擎统一日志管理
515 if self.strategy_engine:
516 self.strategy_engine.log_message(
517 f"✅ 模拟交易引擎已启动,会话: {self.session_id}",
518 "info",
519 "交易引擎",
520 )
521 else:
522 print(f"✅ 模拟交易引擎已启动,会话: {self.session_id}")
523 return True
524 except Exception as e:
525 if self.strategy_engine:
526 self.strategy_engine.log_message(
527 f"❌ 启动模拟交易引擎失败: {e}", "error", "交易引擎"
528 )
529 else:
530 print(f"❌ 启动模拟交易引擎失败: {e}")
531 return False
533 def stop(self) -> bool:
534 """停止模拟交易引擎"""
535 if not self.simulation_engine:
536 return False
538 if not self.is_running:
539 return True
541 try:
542 self.is_running = False
543 self.end_time = datetime.now()
544 # 通过策略引擎统一日志管理
545 if self.strategy_engine:
546 self.strategy_engine.log_message(
547 f"✅ 模拟交易引擎已停止,会话: {self.session_id}",
548 "info",
549 "交易引擎",
550 )
551 else:
552 print(f"✅ 模拟交易引擎已停止,会话: {self.session_id}")
553 return True
554 except Exception as e:
555 if self.strategy_engine:
556 self.strategy_engine.log_message(
557 f"❌ 停止模拟交易引擎失败: {e}", "error", "交易引擎"
558 )
559 else:
560 print(f"❌ 停止模拟交易引擎失败: {e}")
561 return False
563 def pause(self) -> bool:
564 """暂停模拟交易引擎"""
565 if not self.simulation_engine:
566 return False
568 if not self.is_running:
569 return False
571 try:
572 self.is_running = False
573 if self.strategy_engine:
574 self.strategy_engine.log_message(
575 f"⏸️ 模拟交易引擎已暂停,会话: {self.session_id}", "info", "交易引擎"
576 )
577 else:
578 print(f"⏸️ 模拟交易引擎已暂停,会话: {self.session_id}")
579 return True
580 except Exception as e:
581 if self.strategy_engine:
582 self.strategy_engine.log_message(
583 f"❌ 暂停模拟交易引擎失败: {e}", "error", "交易引擎"
584 )
585 else:
586 print(f"❌ 暂停模拟交易引擎失败: {e}")
587 return False
589 def resume(self) -> bool:
590 """恢复模拟交易引擎"""
591 if not self.simulation_engine:
592 return False
594 if self.is_running:
595 return True
597 try:
598 self.is_running = True
599 if self.strategy_engine:
600 self.strategy_engine.log_message(
601 f"▶️ 模拟交易引擎已恢复,会话: {self.session_id}", "info", "交易引擎"
602 )
603 else:
604 print(f"▶️ 模拟交易引擎已恢复,会话: {self.session_id}")
605 return True
606 except Exception as e:
607 if self.strategy_engine:
608 self.strategy_engine.log_message(
609 f"❌ 恢复模拟交易引擎失败: {e}", "error", "交易引擎"
610 )
611 else:
612 print(f"❌ 恢复模拟交易引擎失败: {e}")
613 return False
615 def submit_order(self, order: Order) -> OrderResult:
616 """提交模拟订单"""
617 if not self.simulation_engine or not self.is_running:
618 return OrderResult(
619 order_id="", status=OrderStatus.REJECTED, message="模拟交易引擎未运行"
620 )
622 return self.simulation_engine.submit_order(order)
624 def cancel_order(self, order_id: str) -> bool:
625 """取消模拟订单"""
626 if not self.simulation_engine or not self.is_running:
627 return False
629 return self.simulation_engine.cancel_order(order_id)
631 def get_positions(self) -> List[Position]:
632 """获取模拟持仓"""
633 if not self.simulation_engine:
634 return []
636 return self.simulation_engine.get_positions()
638 def get_account_balance(self) -> AccountBalance:
639 """获取模拟账户余额"""
640 if not self.simulation_engine:
641 return AccountBalance(
642 total_cash=Decimal("0"),
643 available_cash=Decimal("0"),
644 frozen_cash=Decimal("0"),
645 currency="USD",
646 )
648 return self.simulation_engine.get_account_balance()
650 def process_market_data(self, market_data: MarketData):
651 """处理市场数据"""
652 if self.simulation_engine and self.is_running:
653 self.simulation_engine.process_market_data(market_data)
655 def get_performance_metrics(self) -> Dict[str, Any]:
656 """获取性能指标"""
657 if not self.simulation_engine:
658 return {}
660 return self.simulation_engine.get_performance_metrics()
662 def get_trade_history(self) -> List[Trade]:
663 """获取成交历史"""
664 if not self.simulation_engine:
665 return []
667 return self.simulation_engine.get_trade_history()
669 def get_pending_orders(self) -> List[Order]:
670 """获取待处理订单"""
671 if not self.simulation_engine:
672 return []
674 return self.simulation_engine.get_pending_orders()
676 def set_strategy_engine(self, strategy_engine):
677 """设置策略引擎引用,用于统一日志管理"""
678 self.strategy_engine = strategy_engine
679 # 如果SimulationEngine已经创建,更新其引用
680 if self.simulation_engine:
681 self.simulation_engine.strategy_engine = strategy_engine
684def create_trading_engine(
685 user_id: str,
686 session_id: str,
687 trading_mode: TradingMode,
688 asset_mode: AssetMode,
689 initial_capital: Decimal = Decimal("100000"),
690 currency: str = "USD",
691 fee_config: Optional[FeeConfig] = None,
692) -> TradingEngine:
693 """创建交易引擎工厂函数
695 Args:
696 user_id: 用户ID
697 session_id: 会话ID
698 trading_mode: 交易模式(实时/回测)
699 asset_mode: 资产模式(真实/模拟)
700 initial_capital: 初始资金(模拟资产模式必需)
701 currency: 货币类型
702 fee_config: 费用配置(可选,默认使用长桥费率)
704 Returns:
705 TradingEngine实例
706 """
707 if asset_mode == AssetMode.SIMULATION:
708 return SimulationTradingEngine(
709 user_id,
710 session_id,
711 trading_mode,
712 asset_mode,
713 initial_capital,
714 currency,
715 fee_config,
716 )
717 else:
718 return RealTradingEngine(
719 user_id, session_id, trading_mode, asset_mode, fee_config
720 )