Coverage for core/trading/engines/trading_engine.py: 25.22%

341 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-13 18:58 +0000

1""" 

2交易引擎 

3""" 

4 

5from abc import ABC, abstractmethod 

6from datetime import datetime 

7from decimal import Decimal 

8from typing import Any, Dict, List, Optional 

9 

10from core.data_source.adapters.trade_adapter import TradeDataSourceAdapter 

11from core.models.broker import FeeConfig, TradingFee 

12from core.models.trading import (AccountBalance, AssetMode, MarketData, Order, 

13 OrderResult, OrderSide, OrderStatus, 

14 OrderType, Position, Trade, TradingMode) 

15from core.trading.engines.simulation_engine import SimulationEngine 

16from core.trading.utils.fee_calculator import FeeCalculator 

17 

18 

19class TradingEngine(ABC): 

20 """交易引擎抽象基类""" 

21 

22 def __init__( 

23 self, 

24 user_id: str, 

25 session_id: str, 

26 trading_mode: TradingMode, 

27 asset_mode: AssetMode, 

28 ): 

29 self.user_id = user_id 

30 self.session_id = session_id 

31 self.trading_mode = trading_mode 

32 self.asset_mode = asset_mode 

33 self.is_running = False 

34 self.start_time: Optional[datetime] = None 

35 self.end_time: Optional[datetime] = None 

36 

37 @abstractmethod 

38 def initialize(self) -> bool: 

39 """初始化交易引擎""" 

40 pass 

41 

42 @abstractmethod 

43 def start(self) -> bool: 

44 """启动交易引擎""" 

45 pass 

46 

47 @abstractmethod 

48 def stop(self) -> bool: 

49 """停止交易引擎""" 

50 pass 

51 

52 @abstractmethod 

53 def pause(self) -> bool: 

54 """暂停交易引擎""" 

55 pass 

56 

57 @abstractmethod 

58 def resume(self) -> bool: 

59 """恢复交易引擎""" 

60 pass 

61 

62 @abstractmethod 

63 def submit_order(self, order: Order) -> OrderResult: 

64 """提交订单""" 

65 pass 

66 

67 @abstractmethod 

68 def cancel_order(self, order_id: str) -> bool: 

69 """取消订单""" 

70 pass 

71 

72 @abstractmethod 

73 def get_positions(self) -> List[Position]: 

74 """获取持仓""" 

75 pass 

76 

77 @abstractmethod 

78 def get_account_balance(self) -> AccountBalance: 

79 """获取账户余额""" 

80 pass 

81 

82 @abstractmethod 

83 def process_market_data(self, market_data: MarketData): 

84 """处理市场数据""" 

85 pass 

86 

87 def set_strategy_engine(self, strategy_engine): 

88 """设置策略引擎引用,用于统一日志管理""" 

89 pass 

90 

91 

92class RealTradingEngine(TradingEngine): 

93 """真实交易引擎""" 

94 

95 def __init__( 

96 self, 

97 user_id: str, 

98 session_id: str, 

99 trading_mode: TradingMode, 

100 asset_mode: AssetMode, 

101 fee_config: Optional[FeeConfig] = None, 

102 ): 

103 super().__init__(user_id, session_id, trading_mode, asset_mode) 

104 self.trade_adapter = TradeDataSourceAdapter(user_id) 

105 self.orders: Dict[str, Order] = {} 

106 self.positions: List[Position] = [] 

107 self.strategy_engine = None # 策略引擎引用 

108 

109 # 费用计算器(用于模拟计算费用,不实际扣除) 

110 if fee_config is None: 

111 fee_config = FeeConfig() # 使用默认的长桥费率 

112 self.fee_calculator = FeeCalculator(fee_config) 

113 

114 def initialize(self) -> bool: 

115 """初始化真实交易引擎""" 

116 try: 

117 # 检查交易适配器是否可用 

118 if not self.trade_adapter.is_available(): 

119 if self.strategy_engine: 

120 self.strategy_engine.log_message( 

121 "❌ 交易适配器不可用", "error", "交易引擎" 

122 ) 

123 else: 

124 print("❌ 交易适配器不可用") 

125 return False 

126 

127 # 加载现有持仓 

128 self._load_positions() 

129 

130 return True 

131 except Exception as e: 

132 if self.strategy_engine: 

133 self.strategy_engine.log_message( 

134 f"❌ 真实交易引擎初始化失败: {e}", "error", "交易引擎" 

135 ) 

136 else: 

137 print(f"❌ 真实交易引擎初始化失败: {e}") 

138 return False 

139 

140 def start(self) -> bool: 

141 """启动真实交易引擎""" 

142 if self.is_running: 

143 return True 

144 

145 try: 

146 self.is_running = True 

147 self.start_time = datetime.now() 

148 if self.strategy_engine: 

149 self.strategy_engine.log_message( 

150 f"✅ 真实交易引擎已启动,会话: {self.session_id}", 

151 "info", 

152 "交易引擎", 

153 ) 

154 else: 

155 print(f"✅ 真实交易引擎已启动,会话: {self.session_id}") 

156 return True 

157 except Exception as e: 

158 if self.strategy_engine: 

159 self.strategy_engine.log_message( 

160 f"❌ 启动真实交易引擎失败: {e}", "error", "交易引擎" 

161 ) 

162 else: 

163 print(f"❌ 启动真实交易引擎失败: {e}") 

164 return False 

165 

166 def stop(self) -> bool: 

167 """停止真实交易引擎""" 

168 if not self.is_running: 

169 return True 

170 

171 try: 

172 self.is_running = False 

173 self.end_time = datetime.now() 

174 if self.strategy_engine: 

175 self.strategy_engine.log_message( 

176 f"✅ 真实交易引擎已停止,会话: {self.session_id}", 

177 "info", 

178 "交易引擎", 

179 ) 

180 else: 

181 print(f"✅ 真实交易引擎已停止,会话: {self.session_id}") 

182 return True 

183 except Exception as e: 

184 if self.strategy_engine: 

185 self.strategy_engine.log_message( 

186 f"❌ 停止真实交易引擎失败: {e}", "error", "交易引擎" 

187 ) 

188 else: 

189 print(f"❌ 停止真实交易引擎失败: {e}") 

190 return False 

191 

192 def pause(self) -> bool: 

193 """暂停真实交易引擎""" 

194 if not self.is_running: 

195 return False 

196 

197 try: 

198 self.is_running = False 

199 if self.strategy_engine: 

200 self.strategy_engine.log_message( 

201 f"⏸️ 真实交易引擎已暂停,会话: {self.session_id}", "info", "交易引擎" 

202 ) 

203 else: 

204 print(f"⏸️ 真实交易引擎已暂停,会话: {self.session_id}") 

205 return True 

206 except Exception as e: 

207 if self.strategy_engine: 

208 self.strategy_engine.log_message( 

209 f"❌ 暂停真实交易引擎失败: {e}", "error", "交易引擎" 

210 ) 

211 else: 

212 print(f"❌ 暂停真实交易引擎失败: {e}") 

213 return False 

214 

215 def resume(self) -> bool: 

216 """恢复真实交易引擎""" 

217 if self.is_running: 

218 return True 

219 

220 try: 

221 self.is_running = True 

222 if self.strategy_engine: 

223 self.strategy_engine.log_message( 

224 f"▶️ 真实交易引擎已恢复,会话: {self.session_id}", "info", "交易引擎" 

225 ) 

226 else: 

227 print(f"▶️ 真实交易引擎已恢复,会话: {self.session_id}") 

228 return True 

229 except Exception as e: 

230 if self.strategy_engine: 

231 self.strategy_engine.log_message( 

232 f"❌ 恢复真实交易引擎失败: {e}", "error", "交易引擎" 

233 ) 

234 else: 

235 print(f"❌ 恢复真实交易引擎失败: {e}") 

236 return False 

237 

238 def submit_order(self, order: Order) -> OrderResult: 

239 """提交真实订单""" 

240 if not self.is_running: 

241 return OrderResult( 

242 order_id="", status=OrderStatus.REJECTED, message="交易引擎未运行" 

243 ) 

244 

245 try: 

246 # 使用交易适配器提交订单 

247 result = self.trade_adapter.submit_order( 

248 symbol=order.symbol, 

249 order_type=order.order_type.value, 

250 side=order.side.value, 

251 submitted_quantity=order.quantity, 

252 time_in_force="Day", # 默认当日有效 

253 submitted_price=order.price, 

254 ) 

255 

256 if result and result.get("success", False): 

257 # 订单提交成功 

258 order_id = result.get("order_id", "") 

259 self.orders[order_id] = order 

260 

261 return OrderResult( 

262 order_id=order_id, status=OrderStatus.PENDING, message="订单已提交" 

263 ) 

264 else: 

265 # 订单提交失败 

266 error_msg = ( 

267 result.get("message", "订单提交失败") if result else "订单提交失败" 

268 ) 

269 return OrderResult( 

270 order_id="", status=OrderStatus.REJECTED, message=error_msg 

271 ) 

272 

273 except Exception as e: 

274 if self.strategy_engine: 

275 self.strategy_engine.log_message( 

276 f"❌ 提交真实订单失败: {e}", "error", "交易引擎" 

277 ) 

278 else: 

279 print(f"❌ 提交真实订单失败: {e}") 

280 return OrderResult( 

281 order_id="", 

282 status=OrderStatus.REJECTED, 

283 message=f"提交订单失败: {str(e)}", 

284 ) 

285 

286 def cancel_order(self, order_id: str) -> bool: 

287 """取消真实订单""" 

288 if not self.is_running: 

289 return False 

290 

291 try: 

292 success = self.trade_adapter.cancel_order(order_id) 

293 if success and order_id in self.orders: 

294 self.orders[order_id].status = OrderStatus.CANCELLED 

295 return success 

296 except Exception as e: 

297 if self.strategy_engine: 

298 self.strategy_engine.log_message( 

299 f"❌ 取消真实订单失败: {e}", "error", "交易引擎" 

300 ) 

301 else: 

302 print(f"❌ 取消真实订单失败: {e}") 

303 return False 

304 

305 def get_positions(self) -> List[Position]: 

306 """获取真实持仓""" 

307 try: 

308 # 从交易适配器获取持仓 

309 positions_data = self.trade_adapter.get_positions() 

310 

311 positions = [] 

312 for pos_data in positions_data: 

313 position = Position( 

314 symbol=pos_data.get("symbol", ""), 

315 quantity=Decimal(str(pos_data.get("quantity", 0))), 

316 avg_price=Decimal(str(pos_data.get("cost_price", 0))), 

317 current_price=Decimal(str(pos_data.get("current_price", 0))), 

318 market_value=Decimal(str(pos_data.get("market_value", 0))), 

319 unrealized_pnl=Decimal(str(pos_data.get("unrealized_pnl", 0))), 

320 realized_pnl=Decimal("0"), # 需要从成交记录计算 

321 ) 

322 positions.append(position) 

323 

324 self.positions = positions 

325 return positions 

326 

327 except Exception as e: 

328 if self.strategy_engine: 

329 self.strategy_engine.log_message( 

330 f"❌ 获取真实持仓失败: {e}", "error", "交易引擎" 

331 ) 

332 else: 

333 print(f"❌ 获取真实持仓失败: {e}") 

334 return [] 

335 

336 def get_account_balance(self) -> AccountBalance: 

337 """获取真实账户余额""" 

338 try: 

339 # 从交易适配器获取账户余额 

340 balance_data = self.trade_adapter.get_account_balance() 

341 

342 if balance_data and len(balance_data) > 0: 

343 balance = balance_data[0] 

344 return AccountBalance( 

345 total_cash=Decimal(str(balance.get("total_cash", 0))), 

346 available_cash=Decimal(str(balance.get("available_cash", 0))), 

347 frozen_cash=Decimal(str(balance.get("frozen_cash", 0))), 

348 currency=balance.get("currency", "USD"), 

349 ) 

350 else: 

351 return AccountBalance( 

352 total_cash=Decimal("0"), 

353 available_cash=Decimal("0"), 

354 frozen_cash=Decimal("0"), 

355 currency="USD", 

356 ) 

357 

358 except Exception as e: 

359 if self.strategy_engine: 

360 self.strategy_engine.log_message( 

361 f"❌ 获取真实账户余额失败: {e}", "error", "交易引擎" 

362 ) 

363 else: 

364 print(f"❌ 获取真实账户余额失败: {e}") 

365 return AccountBalance( 

366 total_cash=Decimal("0"), 

367 available_cash=Decimal("0"), 

368 frozen_cash=Decimal("0"), 

369 currency="USD", 

370 ) 

371 

372 def process_market_data(self, market_data: MarketData): 

373 """处理市场数据""" 

374 # 真实交易引擎不需要特殊处理市场数据 

375 # 订单成交由券商系统处理 

376 pass 

377 

378 def _load_positions(self): 

379 """加载现有持仓""" 

380 try: 

381 self.positions = self.get_positions() 

382 except Exception as e: 

383 if self.strategy_engine: 

384 self.strategy_engine.log_message( 

385 f"❌ 加载现有持仓失败: {e}", "error", "交易引擎" 

386 ) 

387 else: 

388 print(f"❌ 加载现有持仓失败: {e}") 

389 

390 def set_strategy_engine(self, strategy_engine): 

391 """设置策略引擎引用,用于统一日志管理""" 

392 self.strategy_engine = strategy_engine 

393 

394 def _calculate_order_fee( 

395 self, quantity: Decimal, price: Decimal, side: str, symbol: str = None 

396 ) -> Optional[TradingFee]: 

397 """计算订单费用(仅用于展示,不实际扣除) 

398 

399 Args: 

400 quantity: 交易数量 

401 price: 交易价格 

402 side: 交易方向(buy/sell) 

403 symbol: 股票代码(用于确定市场类型) 

404 

405 Returns: 

406 TradingFee对象,包含详细的费用分解 

407 """ 

408 try: 

409 # 根据股票代码确定市场类型和货币类型 

410 market, currency = self._get_market_and_currency_from_symbol(symbol) 

411 

412 # 根据市场类型选择对应的计算方法 

413 if market == "US": 

414 return self.fee_calculator.calculate_us_fees(quantity, price, side) 

415 elif market == "HK": 

416 return self.fee_calculator.calculate_hk_fees(quantity, price, side) 

417 else: 

418 # 默认使用美股费率 

419 if self.strategy_engine: 

420 self.strategy_engine.log_message( 

421 f"⚠️ 未知市场类型 {market},使用美股费率", "warning", "交易引擎" 

422 ) 

423 return self.fee_calculator.calculate_us_fees(quantity, price, side) 

424 except Exception as e: 

425 if self.strategy_engine: 

426 self.strategy_engine.log_message( 

427 f"❌ 计算交易费用失败: {e}", "error", "交易引擎" 

428 ) 

429 return None 

430 

431 def _get_market_and_currency_from_symbol(self, symbol: str) -> tuple[str, str]: 

432 """根据股票代码确定市场类型和货币类型 

433 

434 Args: 

435 symbol: 股票代码(如 AAPL.US, 700.HK) 

436 

437 Returns: 

438 tuple: (市场类型, 货币类型) 

439 """ 

440 if not symbol or "." not in symbol: 

441 # 如果没有股票代码或格式不正确,默认使用美股 

442 return ("US", "USD") 

443 

444 # 从股票代码中提取市场后缀 

445 market_suffix = symbol.split(".")[-1].upper() 

446 

447 # 市场类型到货币类型的映射 

448 market_currency_mapping = { 

449 "US": ("US", "USD"), 

450 "HK": ("HK", "HKD"), 

451 "SZ": ("CN", "CNY"), # 深圳 

452 "SH": ("CN", "CNY"), # 上海 

453 "SG": ("SG", "SGD"), # 新加坡 

454 } 

455 

456 market, currency = market_currency_mapping.get(market_suffix, ("US", "USD")) 

457 return market, currency 

458 

459 

460class SimulationTradingEngine(TradingEngine): 

461 """模拟交易引擎""" 

462 

463 def __init__( 

464 self, 

465 user_id: str, 

466 session_id: str, 

467 trading_mode: TradingMode, 

468 asset_mode: AssetMode, 

469 initial_capital: Decimal, 

470 currency: str = "USD", 

471 fee_config: Optional[FeeConfig] = None, 

472 ): 

473 super().__init__(user_id, session_id, trading_mode, asset_mode) 

474 self.initial_capital = initial_capital 

475 self.currency = currency 

476 self.simulation_engine: Optional[SimulationEngine] = None 

477 self.strategy_engine = None # 策略引擎引用 

478 self.fee_config = fee_config if fee_config else FeeConfig() # 保存费用配置 

479 

480 def initialize(self) -> bool: 

481 """初始化模拟交易引擎""" 

482 try: 

483 # 先创建SimulationEngine,但不传递strategy_engine(因为还没有创建) 

484 self.simulation_engine = SimulationEngine( 

485 user_id=self.user_id, 

486 session_id=self.session_id, 

487 initial_capital=self.initial_capital, 

488 strategy_engine=None, # 稍后通过set_strategy_engine设置 

489 currency=self.currency, 

490 fee_config=self.fee_config, # 传递费用配置 

491 ) 

492 

493 return self.simulation_engine.initialize() 

494 except Exception as e: 

495 if self.strategy_engine: 

496 self.strategy_engine.log_message( 

497 f"❌ 模拟交易引擎初始化失败: {e}", "error", "交易引擎" 

498 ) 

499 else: 

500 print(f"❌ 模拟交易引擎初始化失败: {e}") 

501 return False 

502 

503 def start(self) -> bool: 

504 """启动模拟交易引擎""" 

505 if not self.simulation_engine: 

506 return False 

507 

508 if self.is_running: 

509 return True 

510 

511 try: 

512 self.is_running = True 

513 self.start_time = datetime.now() 

514 # 通过策略引擎统一日志管理 

515 if self.strategy_engine: 

516 self.strategy_engine.log_message( 

517 f"✅ 模拟交易引擎已启动,会话: {self.session_id}", 

518 "info", 

519 "交易引擎", 

520 ) 

521 else: 

522 print(f"✅ 模拟交易引擎已启动,会话: {self.session_id}") 

523 return True 

524 except Exception as e: 

525 if self.strategy_engine: 

526 self.strategy_engine.log_message( 

527 f"❌ 启动模拟交易引擎失败: {e}", "error", "交易引擎" 

528 ) 

529 else: 

530 print(f"❌ 启动模拟交易引擎失败: {e}") 

531 return False 

532 

533 def stop(self) -> bool: 

534 """停止模拟交易引擎""" 

535 if not self.simulation_engine: 

536 return False 

537 

538 if not self.is_running: 

539 return True 

540 

541 try: 

542 self.is_running = False 

543 self.end_time = datetime.now() 

544 # 通过策略引擎统一日志管理 

545 if self.strategy_engine: 

546 self.strategy_engine.log_message( 

547 f"✅ 模拟交易引擎已停止,会话: {self.session_id}", 

548 "info", 

549 "交易引擎", 

550 ) 

551 else: 

552 print(f"✅ 模拟交易引擎已停止,会话: {self.session_id}") 

553 return True 

554 except Exception as e: 

555 if self.strategy_engine: 

556 self.strategy_engine.log_message( 

557 f"❌ 停止模拟交易引擎失败: {e}", "error", "交易引擎" 

558 ) 

559 else: 

560 print(f"❌ 停止模拟交易引擎失败: {e}") 

561 return False 

562 

563 def pause(self) -> bool: 

564 """暂停模拟交易引擎""" 

565 if not self.simulation_engine: 

566 return False 

567 

568 if not self.is_running: 

569 return False 

570 

571 try: 

572 self.is_running = False 

573 if self.strategy_engine: 

574 self.strategy_engine.log_message( 

575 f"⏸️ 模拟交易引擎已暂停,会话: {self.session_id}", "info", "交易引擎" 

576 ) 

577 else: 

578 print(f"⏸️ 模拟交易引擎已暂停,会话: {self.session_id}") 

579 return True 

580 except Exception as e: 

581 if self.strategy_engine: 

582 self.strategy_engine.log_message( 

583 f"❌ 暂停模拟交易引擎失败: {e}", "error", "交易引擎" 

584 ) 

585 else: 

586 print(f"❌ 暂停模拟交易引擎失败: {e}") 

587 return False 

588 

589 def resume(self) -> bool: 

590 """恢复模拟交易引擎""" 

591 if not self.simulation_engine: 

592 return False 

593 

594 if self.is_running: 

595 return True 

596 

597 try: 

598 self.is_running = True 

599 if self.strategy_engine: 

600 self.strategy_engine.log_message( 

601 f"▶️ 模拟交易引擎已恢复,会话: {self.session_id}", "info", "交易引擎" 

602 ) 

603 else: 

604 print(f"▶️ 模拟交易引擎已恢复,会话: {self.session_id}") 

605 return True 

606 except Exception as e: 

607 if self.strategy_engine: 

608 self.strategy_engine.log_message( 

609 f"❌ 恢复模拟交易引擎失败: {e}", "error", "交易引擎" 

610 ) 

611 else: 

612 print(f"❌ 恢复模拟交易引擎失败: {e}") 

613 return False 

614 

615 def submit_order(self, order: Order) -> OrderResult: 

616 """提交模拟订单""" 

617 if not self.simulation_engine or not self.is_running: 

618 return OrderResult( 

619 order_id="", status=OrderStatus.REJECTED, message="模拟交易引擎未运行" 

620 ) 

621 

622 return self.simulation_engine.submit_order(order) 

623 

624 def cancel_order(self, order_id: str) -> bool: 

625 """取消模拟订单""" 

626 if not self.simulation_engine or not self.is_running: 

627 return False 

628 

629 return self.simulation_engine.cancel_order(order_id) 

630 

631 def get_positions(self) -> List[Position]: 

632 """获取模拟持仓""" 

633 if not self.simulation_engine: 

634 return [] 

635 

636 return self.simulation_engine.get_positions() 

637 

638 def get_account_balance(self) -> AccountBalance: 

639 """获取模拟账户余额""" 

640 if not self.simulation_engine: 

641 return AccountBalance( 

642 total_cash=Decimal("0"), 

643 available_cash=Decimal("0"), 

644 frozen_cash=Decimal("0"), 

645 currency="USD", 

646 ) 

647 

648 return self.simulation_engine.get_account_balance() 

649 

650 def process_market_data(self, market_data: MarketData): 

651 """处理市场数据""" 

652 if self.simulation_engine and self.is_running: 

653 self.simulation_engine.process_market_data(market_data) 

654 

655 def get_performance_metrics(self) -> Dict[str, Any]: 

656 """获取性能指标""" 

657 if not self.simulation_engine: 

658 return {} 

659 

660 return self.simulation_engine.get_performance_metrics() 

661 

662 def get_trade_history(self) -> List[Trade]: 

663 """获取成交历史""" 

664 if not self.simulation_engine: 

665 return [] 

666 

667 return self.simulation_engine.get_trade_history() 

668 

669 def get_pending_orders(self) -> List[Order]: 

670 """获取待处理订单""" 

671 if not self.simulation_engine: 

672 return [] 

673 

674 return self.simulation_engine.get_pending_orders() 

675 

676 def set_strategy_engine(self, strategy_engine): 

677 """设置策略引擎引用,用于统一日志管理""" 

678 self.strategy_engine = strategy_engine 

679 # 如果SimulationEngine已经创建,更新其引用 

680 if self.simulation_engine: 

681 self.simulation_engine.strategy_engine = strategy_engine 

682 

683 

684def create_trading_engine( 

685 user_id: str, 

686 session_id: str, 

687 trading_mode: TradingMode, 

688 asset_mode: AssetMode, 

689 initial_capital: Decimal = Decimal("100000"), 

690 currency: str = "USD", 

691 fee_config: Optional[FeeConfig] = None, 

692) -> TradingEngine: 

693 """创建交易引擎工厂函数 

694 

695 Args: 

696 user_id: 用户ID 

697 session_id: 会话ID 

698 trading_mode: 交易模式(实时/回测) 

699 asset_mode: 资产模式(真实/模拟) 

700 initial_capital: 初始资金(模拟资产模式必需) 

701 currency: 货币类型 

702 fee_config: 费用配置(可选,默认使用长桥费率) 

703 

704 Returns: 

705 TradingEngine实例 

706 """ 

707 if asset_mode == AssetMode.SIMULATION: 

708 return SimulationTradingEngine( 

709 user_id, 

710 session_id, 

711 trading_mode, 

712 asset_mode, 

713 initial_capital, 

714 currency, 

715 fee_config, 

716 ) 

717 else: 

718 return RealTradingEngine( 

719 user_id, session_id, trading_mode, asset_mode, fee_config 

720 )