Coverage for core/services/trading_service.py: 51.90%

420 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-13 18:58 +0000

1""" 

2交易服务层 

3""" 

4 

5import logging 

6from datetime import datetime 

7from decimal import Decimal 

8from typing import Any, Dict, List, Optional 

9 

10from core.data_source.adapters.trade_adapter import TradeDataSourceAdapter 

11from core.models.trading import (OrderSide, OrderStatus, OrderType, 

12 PerformanceMetrics, PositionHistoryCreate, 

13 PositionHistoryResponse, RiskConfig, 

14 SessionStatus, TradingOrderCreate, 

15 TradingOrderResponse, TradingOrderUpdate, 

16 TradingSessionCreate, TradingSessionResponse, 

17 TradingSessionUpdate) 

18from core.repositories.trading_repository import TradingRepository 

19from core.services.risk_service import RiskService 

20from core.trading.engines import (StrategyEngine, TradingSessionEngine, 

21 create_trading_engine) 

22from core.trading.strategies import (create_strategy, 

23 get_strategy_recommended_risk_config) 

24 

25logger = logging.getLogger(__name__) 

26 

27 

28class TradingService: 

29 """交易服务""" 

30 

31 def __init__( 

32 self, 

33 user_id: str, 

34 trading_repository: Optional[TradingRepository] = None, 

35 risk_service: Optional[RiskService] = None, 

36 ): 

37 self.user_id = user_id 

38 self.trading_repo = trading_repository or TradingRepository() 

39 self.trade_adapter = TradeDataSourceAdapter(user_id) 

40 

41 # 交易会话引擎 

42 self.session_engines: Dict[str, TradingSessionEngine] = {} 

43 

44 # 风险管理服务 

45 self.risk_service = risk_service or RiskService(user_id) 

46 

47 # ===== 交易会话管理 ===== 

48 

49 def create_trading_session( 

50 self, session_data: TradingSessionCreate 

51 ) -> TradingSessionResponse: 

52 """创建交易会话""" 

53 # 如果风险配置为空,使用策略推荐的风险配置 

54 if not session_data.risk_config.model_dump() or all( 

55 v is None or v == [] for v in session_data.risk_config.model_dump().values() 

56 ): 

57 recommended_risk_config = get_strategy_recommended_risk_config( 

58 session_data.strategy_name 

59 ) 

60 if recommended_risk_config: 

61 # 更新风险配置 

62 session_data.risk_config = RiskConfig(**recommended_risk_config) 

63 print(f"✅ 使用策略 {session_data.strategy_name} 推荐的风险配置") 

64 

65 # 如果策略参数为空,使用策略默认参数 

66 if not session_data.strategy_params: 

67 from core.trading.strategies import get_strategy_parameters 

68 

69 strategy_params = get_strategy_parameters(session_data.strategy_name) 

70 if strategy_params: 

71 # 构建默认参数字典 

72 default_params = {} 

73 for param in strategy_params: 

74 if param.get("name") != "symbols": # 过滤掉symbols参数 

75 default_params[param["name"]] = param["default_value"] 

76 session_data.strategy_params = default_params 

77 print( 

78 f"✅ 使用策略 {session_data.strategy_name} 默认参数: {default_params}" 

79 ) 

80 

81 return self.trading_repo.create_trading_session(session_data, self.user_id) 

82 

83 def get_trading_session(self, session_id: str) -> Optional[TradingSessionResponse]: 

84 """获取交易会话""" 

85 session = self.trading_repo.get_trading_session(session_id) 

86 if session and session.user_id != self.user_id: 

87 return None # 只能访问自己的会话 

88 return session 

89 

90 def update_trading_session( 

91 self, session_id: str, update_data: TradingSessionUpdate 

92 ) -> Optional[TradingSessionResponse]: 

93 """更新交易会话""" 

94 # 检查会话是否属于当前用户 

95 session = self.get_trading_session(session_id) 

96 if not session: 

97 return None 

98 

99 return self.trading_repo.update_trading_session(session_id, update_data) 

100 

101 def delete_trading_session(self, session_id: str) -> bool: 

102 """删除交易会话及其所有关联数据""" 

103 # 检查会话是否属于当前用户 

104 session = self.get_trading_session(session_id) 

105 if not session: 

106 return False 

107 

108 print(f"🗑️ 开始删除交易会话: {session_id}") 

109 

110 # 1. 停止并清理交易会话引擎 

111 self._cleanup_session_engines(session_id) 

112 

113 # 3. 清理风险管理引擎 

114 self._cleanup_risk_engines(session_id) 

115 

116 # 4. 删除数据库中的所有相关数据 

117 success = self.trading_repo.delete_trading_session(session_id) 

118 

119 if success: 

120 print(f"🎉 交易会话删除完成: {session_id}") 

121 else: 

122 print(f"❌ 交易会话删除失败: {session_id}") 

123 

124 return success 

125 

126 def get_user_trading_sessions( 

127 self, status: Optional[SessionStatus] = None, limit: int = 10, offset: int = 0 

128 ) -> List[TradingSessionResponse]: 

129 """获取用户交易会话列表""" 

130 return self.trading_repo.get_user_trading_sessions( 

131 self.user_id, status, limit, offset 

132 ) 

133 

134 def start_trading_session( 

135 self, session_id: str 

136 ) -> Optional[TradingSessionResponse]: 

137 """启动交易会话""" 

138 session = self.get_trading_session(session_id) 

139 if not session: 

140 return None 

141 

142 if session.status not in [SessionStatus.CREATED, SessionStatus.STOPPED]: 

143 return None # 只能启动已创建或已停止的会话 

144 

145 update_data = TradingSessionUpdate(status=SessionStatus.RUNNING) 

146 

147 # 更新开始时间 

148 updated_session = self.trading_repo.update_trading_session( 

149 session_id, update_data 

150 ) 

151 if updated_session: 

152 # 立即发送状态更新 

153 self._send_session_status_update(session_id, "running") 

154 

155 # 异步启动交易引擎和风险管理引擎 

156 import threading 

157 

158 def async_start(): 

159 try: 

160 self._send_websocket_log( 

161 f"🚀 开始异步初始化交易会话: {session_id}", "info" 

162 ) 

163 

164 # 启动交易引擎 

165 self._start_trading_engine(session_id, updated_session) 

166 

167 # 创建风险管理引擎 

168 self._create_risk_engine(session_id, updated_session) 

169 

170 self._send_websocket_log( 

171 f"✅ 交易会话异步初始化完成: {session_id}", "info" 

172 ) 

173 except Exception as e: 

174 self._send_websocket_log( 

175 f"❌ 交易会话异步初始化失败: {session_id} - {e}", "error" 

176 ) 

177 # 如果初始化失败,更新状态为停止 

178 self._send_session_status_update(session_id, "stopped") 

179 

180 threading.Thread(target=async_start, daemon=True).start() 

181 

182 return updated_session 

183 

184 def stop_trading_session(self, session_id: str) -> Optional[TradingSessionResponse]: 

185 """停止交易会话""" 

186 session = self.get_trading_session(session_id) 

187 if not session: 

188 return None 

189 

190 if session.status not in [SessionStatus.RUNNING, SessionStatus.PAUSED]: 

191 return None # 只能停止运行中或暂停的会话 

192 

193 update_data = TradingSessionUpdate(status=SessionStatus.STOPPED) 

194 

195 # 更新结束时间 

196 updated_session = self.trading_repo.update_trading_session( 

197 session_id, update_data 

198 ) 

199 if updated_session: 

200 # 停止交易引擎 

201 self._stop_trading_engine(session_id) 

202 

203 # 移除风险管理引擎 

204 self._remove_risk_engine(session_id) 

205 

206 return updated_session 

207 

208 def pause_trading_session( 

209 self, session_id: str 

210 ) -> Optional[TradingSessionResponse]: 

211 """暂停交易会话""" 

212 session = self.get_trading_session(session_id) 

213 if not session: 

214 return None 

215 

216 if session.status != SessionStatus.RUNNING: 

217 return None # 只能暂停运行中的会话 

218 

219 update_data = TradingSessionUpdate(status=SessionStatus.PAUSED) 

220 

221 updated_session = self.trading_repo.update_trading_session( 

222 session_id, update_data 

223 ) 

224 if updated_session: 

225 # 暂停交易引擎 

226 self._pause_trading_engine(session_id) 

227 

228 return updated_session 

229 

230 def resume_trading_session( 

231 self, session_id: str 

232 ) -> Optional[TradingSessionResponse]: 

233 """恢复交易会话""" 

234 session = self.get_trading_session(session_id) 

235 if not session: 

236 return None 

237 

238 if session.status != SessionStatus.PAUSED: 

239 return None # 只能恢复暂停的会话 

240 

241 update_data = TradingSessionUpdate(status=SessionStatus.RUNNING) 

242 

243 updated_session = self.trading_repo.update_trading_session( 

244 session_id, update_data 

245 ) 

246 if updated_session: 

247 # 恢复交易引擎 

248 self._resume_trading_engine(session_id) 

249 

250 return updated_session 

251 

252 # ===== 交易订单管理 ===== 

253 

254 def create_trading_order( 

255 self, order_data: TradingOrderCreate 

256 ) -> Optional[TradingOrderResponse]: 

257 """创建交易订单""" 

258 # 检查会话是否属于当前用户 

259 session = self.get_trading_session(order_data.session_id) 

260 if not session: 

261 return None 

262 

263 if session.status != SessionStatus.RUNNING: 

264 return None # 只能在运行中的会话中创建订单 

265 

266 return self.trading_repo.create_trading_order(order_data) 

267 

268 def get_trading_order(self, order_id: str) -> Optional[TradingOrderResponse]: 

269 """获取交易订单""" 

270 order = self.trading_repo.get_trading_order(order_id) 

271 if not order: 

272 return None 

273 

274 # 检查订单所属会话是否属于当前用户 

275 session = self.get_trading_session(order.session_id) 

276 if not session: 

277 return None 

278 

279 return order 

280 

281 def update_trading_order( 

282 self, order_id: str, update_data: TradingOrderUpdate 

283 ) -> Optional[TradingOrderResponse]: 

284 """更新交易订单""" 

285 # 检查订单是否属于当前用户 

286 order = self.get_trading_order(order_id) 

287 if not order: 

288 return None 

289 

290 return self.trading_repo.update_trading_order(order_id, update_data) 

291 

292 def get_session_orders( 

293 self, 

294 session_id: str, 

295 status: Optional[OrderStatus] = None, 

296 limit: int = 100, 

297 offset: int = 0, 

298 ) -> List[TradingOrderResponse]: 

299 """获取会话订单列表""" 

300 # 检查会话是否属于当前用户 

301 session = self.get_trading_session(session_id) 

302 if not session: 

303 return [] 

304 

305 return self.trading_repo.get_session_orders(session_id, status, limit, offset) 

306 

307 def cancel_order(self, order_id: str) -> bool: 

308 """取消订单""" 

309 order = self.get_trading_order(order_id) 

310 if not order: 

311 return False 

312 

313 if order.status not in [OrderStatus.PENDING]: 

314 return False # 只能取消待处理的订单 

315 

316 update_data = TradingOrderUpdate(status=OrderStatus.CANCELLED) 

317 

318 updated_order = self.trading_repo.update_trading_order(order_id, update_data) 

319 return updated_order is not None 

320 

321 # ===== 持仓记录管理 ===== 

322 

323 def create_position_history( 

324 self, position_data: PositionHistoryCreate 

325 ) -> Optional[PositionHistoryResponse]: 

326 """创建持仓记录""" 

327 # 检查会话是否属于当前用户 

328 session = self.get_trading_session(position_data.session_id) 

329 if not session: 

330 return None 

331 

332 return self.trading_repo.create_position_history(position_data) 

333 

334 def get_session_positions( 

335 self, session_id: str, limit: int = 100, offset: int = 0 

336 ) -> List[PositionHistoryResponse]: 

337 """获取会话持仓记录列表""" 

338 # 检查会话是否属于当前用户 

339 session = self.get_trading_session(session_id) 

340 if not session: 

341 return [] 

342 

343 return self.trading_repo.get_session_positions(session_id, limit, offset) 

344 

345 def get_latest_positions( 

346 self, session_id: str 

347 ) -> Dict[str, PositionHistoryResponse]: 

348 """获取最新持仓快照""" 

349 # 检查会话是否属于当前用户 

350 session = self.get_trading_session(session_id) 

351 if not session: 

352 return {} 

353 

354 return self.trading_repo.get_latest_positions(session_id) 

355 

356 def get_session_logs( 

357 self, 

358 session_id: str, 

359 page: int = 1, 

360 page_size: int = 20, 

361 search: Optional[str] = None, 

362 start_date: Optional[str] = None, 

363 end_date: Optional[str] = None, 

364 component: Optional[str] = None, 

365 ) -> Dict[str, Any]: 

366 """获取会话策略日志""" 

367 # 检查会话是否属于当前用户 

368 session = self.get_trading_session(session_id) 

369 if not session: 

370 return {"logs": [], "total": 0} 

371 

372 return self.trading_repo.get_session_logs( 

373 session_id, page, page_size, search, start_date, end_date, component 

374 ) 

375 

376 # ===== 性能分析 ===== 

377 

378 def calculate_performance_metrics( 

379 self, session_id: str 

380 ) -> Optional[PerformanceMetrics]: 

381 """计算性能指标""" 

382 session = self.get_trading_session(session_id) 

383 if not session: 

384 return None 

385 

386 # 获取所有订单 

387 orders = self.get_session_orders(session_id) 

388 if not orders: 

389 # 没有订单时返回默认性能指标 

390 return PerformanceMetrics( 

391 total_return=Decimal("0"), 

392 annualized_return=Decimal("0"), 

393 max_drawdown=Decimal("0"), 

394 sharpe_ratio=Decimal("0"), 

395 win_rate=Decimal("0"), 

396 total_trades=0, 

397 winning_trades=0, 

398 losing_trades=0, 

399 total_fees=Decimal("0"), 

400 platform_fees=Decimal("0"), 

401 activity_fees=Decimal("0"), 

402 clearing_fees=Decimal("0"), 

403 audit_fees=Decimal("0"), 

404 ) 

405 

406 # 计算基本指标 

407 total_trades = len(orders) 

408 filled_orders = [ 

409 order for order in orders if order.status == OrderStatus.FILLED 

410 ] 

411 winning_trades = 0 

412 losing_trades = 0 

413 

414 # 初始化费用统计 

415 total_fees = Decimal("0") 

416 platform_fees = Decimal("0") 

417 activity_fees = Decimal("0") 

418 clearing_fees = Decimal("0") 

419 audit_fees = Decimal("0") 

420 

421 # 计算盈亏 - 使用FIFO方法配对买卖订单 

422 total_pnl = Decimal("0") 

423 buy_orders = [] # 买入订单队列(FIFO) 

424 

425 for order in filled_orders: 

426 # 累加费用统计 

427 if order.trading_fee and isinstance(order.trading_fee, dict): 

428 total_fees += Decimal(str(order.trading_fee.get("total_fee", "0"))) 

429 platform_fees += Decimal( 

430 str(order.trading_fee.get("platform_fee", "0")) 

431 ) 

432 activity_fees += Decimal( 

433 str(order.trading_fee.get("activity_fee", "0")) 

434 ) 

435 clearing_fees += Decimal( 

436 str(order.trading_fee.get("clearing_fee", "0")) 

437 ) 

438 audit_fees += Decimal(str(order.trading_fee.get("audit_fee", "0"))) 

439 

440 if order.filled_price and order.filled_quantity: 

441 if order.side == OrderSide.BUY: 

442 # 买入订单,加入队列 

443 # 计算买入费用 

444 buy_fee = Decimal("0") 

445 if order.trading_fee and isinstance(order.trading_fee, dict): 

446 buy_fee = Decimal(str(order.trading_fee.get("total_fee", "0"))) 

447 elif order.commission: 

448 buy_fee = order.commission 

449 

450 buy_orders.append( 

451 { 

452 "price": order.filled_price, 

453 "quantity": order.filled_quantity, 

454 "commission": buy_fee, 

455 } 

456 ) 

457 elif order.side == OrderSide.SELL: 

458 # 卖出订单,与买入订单配对计算盈亏 

459 sell_quantity = order.filled_quantity 

460 sell_price = order.filled_price 

461 # 计算卖出费用 

462 sell_fee = Decimal("0") 

463 if order.trading_fee and isinstance(order.trading_fee, dict): 

464 sell_fee = Decimal(str(order.trading_fee.get("total_fee", "0"))) 

465 elif order.commission: 

466 sell_fee = order.commission 

467 

468 while sell_quantity > 0 and buy_orders: 

469 buy_order = buy_orders[0] 

470 buy_quantity = buy_order["quantity"] 

471 buy_price = buy_order["price"] 

472 buy_commission = buy_order["commission"] 

473 

474 # 计算配对数量 

475 pair_quantity = min(sell_quantity, buy_quantity) 

476 

477 # 计算这笔交易的盈亏 

478 trade_pnl = ( 

479 (sell_price - buy_price) * pair_quantity 

480 - buy_order["commission"] 

481 - sell_fee 

482 ) 

483 total_pnl += trade_pnl 

484 

485 # 更新胜率统计 

486 if trade_pnl > 0: 

487 winning_trades += 1 

488 elif trade_pnl < 0: 

489 losing_trades += 1 

490 

491 # 更新数量 

492 sell_quantity -= pair_quantity 

493 buy_quantity -= pair_quantity 

494 

495 if buy_quantity > 0: 

496 # 买入订单还有剩余 

497 buy_orders[0]["quantity"] = buy_quantity 

498 else: 

499 # 买入订单完全配对,移除 

500 buy_orders.pop(0) 

501 

502 # 计算胜率 

503 if total_trades > 0: 

504 win_rate = Decimal(str(winning_trades)) / Decimal(str(total_trades)) 

505 else: 

506 win_rate = Decimal("0") 

507 

508 # 计算总收益率 - 使用配置中的正确初始资金 

509 initial_capital = Decimal("100000") # 默认值 

510 if hasattr(session, "config") and session.config: 

511 import json 

512 

513 config = ( 

514 json.loads(session.config) 

515 if isinstance(session.config, str) 

516 else session.config 

517 ) 

518 if ( 

519 config 

520 and "initial_capital_by_currency" in config 

521 and config["initial_capital_by_currency"] 

522 ): 

523 usd_capital = config["initial_capital_by_currency"].get("USD", {}) 

524 if "cash_assets" in usd_capital: 

525 initial_capital = Decimal(str(usd_capital["cash_assets"])) 

526 elif ( 

527 config and "initial_capital" in config and config["initial_capital"] > 0 

528 ): 

529 initial_capital = Decimal(str(config["initial_capital"])) 

530 elif session.initial_capital > 0: 

531 initial_capital = session.initial_capital 

532 

533 if initial_capital > 0: 

534 total_return = total_pnl / initial_capital 

535 else: 

536 total_return = Decimal("0") 

537 

538 # 简化计算,实际需要更复杂的指标计算 

539 return PerformanceMetrics( 

540 total_return=total_return, 

541 annualized_return=total_return, # 简化 

542 max_drawdown=Decimal("0"), # 需要历史数据计算 

543 sharpe_ratio=Decimal("0"), # 需要波动率数据 

544 win_rate=win_rate, 

545 total_trades=total_trades, 

546 winning_trades=winning_trades, 

547 losing_trades=losing_trades, 

548 total_fees=total_fees, 

549 platform_fees=platform_fees, 

550 activity_fees=activity_fees, 

551 clearing_fees=clearing_fees, 

552 audit_fees=audit_fees, 

553 ) 

554 

555 # ===== 风险控制 ===== 

556 

557 def validate_order_risk(self, order_data: TradingOrderCreate) -> Dict[str, Any]: 

558 """验证订单风险""" 

559 session = self.get_trading_session(order_data.session_id) 

560 if not session: 

561 return {"valid": False, "errors": ["会话不存在"]} 

562 

563 errors = [] 

564 warnings = [] 

565 

566 # 获取风险配置 

567 risk_config = session.config.get("risk_config", {}) 

568 

569 # 检查允许交易的股票代码 

570 allowed_symbols = risk_config.get("allowed_symbols", []) 

571 if allowed_symbols and order_data.symbol not in allowed_symbols: 

572 errors.append(f"股票 {order_data.symbol} 不在允许交易列表中") 

573 

574 # 检查单只股票最大持仓比例 

575 max_position_ratio = risk_config.get("max_position_ratio", 0.1) 

576 current_positions = self.get_latest_positions(order_data.session_id) 

577 

578 if order_data.symbol in current_positions: 

579 current_position = current_positions[order_data.symbol] 

580 current_ratio = current_position.market_value / session.initial_capital 

581 if current_ratio > max_position_ratio: 

582 errors.append( 

583 f"股票 {order_data.symbol} 当前持仓比例 {current_ratio:.2%} 超过最大限制 {max_position_ratio:.2%}" 

584 ) 

585 

586 # 检查资金充足性 

587 if order_data.side == OrderSide.BUY: 

588 required_amount = order_data.quantity * (order_data.price or Decimal("0")) 

589 # 这里需要获取当前可用资金,简化处理 

590 if required_amount > session.initial_capital: 

591 errors.append("资金不足") 

592 

593 return {"valid": len(errors) == 0, "errors": errors, "warnings": warnings} 

594 

595 # ===== 模拟交易支持 ===== 

596 

597 def submit_simulation_order( 

598 self, order_data: TradingOrderCreate 

599 ) -> Optional[TradingOrderResponse]: 

600 """提交模拟交易订单""" 

601 # 检查会话是否为模拟模式 

602 session = self.get_trading_session(order_data.session_id) 

603 if not session: 

604 return None 

605 

606 if session.asset_mode.value != "simulation": 

607 return None # 只能提交模拟订单 

608 

609 # 风险检查 

610 risk_check = self.validate_order_risk(order_data) 

611 if not risk_check["valid"]: 

612 return None 

613 

614 # 创建订单 

615 order = self.create_trading_order(order_data) 

616 if not order: 

617 return None 

618 

619 # 模拟成交逻辑(简化处理) 

620 if order.order_type == OrderType.MARKET: 

621 # 市价单立即成交 

622 update_data = TradingOrderUpdate( 

623 status=OrderStatus.FILLED, 

624 filled_price=Decimal("100"), # 模拟价格 

625 filled_quantity=order.quantity, 

626 commission=Decimal("1"), # 模拟手续费 

627 ) 

628 self.update_trading_order(order.id, update_data) 

629 

630 # 更新持仓记录 

631 self._update_simulation_position(order) 

632 

633 return self.get_trading_order(order.id) 

634 

635 def _update_simulation_position(self, order: TradingOrderResponse): 

636 """更新模拟持仓""" 

637 if not order.filled_price or not order.filled_quantity: 

638 return 

639 

640 # 获取当前持仓 

641 current_positions = self.get_latest_positions(order.session_id) 

642 current_position = current_positions.get(order.symbol) 

643 

644 if order.side == OrderSide.BUY: 

645 # 买入 

646 if current_position: 

647 # 更新现有持仓 

648 new_quantity = current_position.quantity + order.filled_quantity 

649 new_avg_price = ( 

650 current_position.avg_price * current_position.quantity 

651 + order.filled_price * order.filled_quantity 

652 ) / new_quantity 

653 new_market_value = new_quantity * order.filled_price 

654 new_unrealized_pnl = new_market_value - (new_avg_price * new_quantity) 

655 else: 

656 # 新建持仓 

657 new_quantity = order.filled_quantity 

658 new_avg_price = order.filled_price 

659 new_market_value = new_quantity * order.filled_price 

660 new_unrealized_pnl = Decimal("0") 

661 else: 

662 # 卖出 

663 if current_position: 

664 new_quantity = current_position.quantity - order.filled_quantity 

665 if new_quantity <= 0: 

666 # 完全卖出 

667 new_quantity = Decimal("0") 

668 new_avg_price = Decimal("0") 

669 new_market_value = Decimal("0") 

670 new_unrealized_pnl = Decimal("0") 

671 else: 

672 # 部分卖出 

673 new_avg_price = current_position.avg_price 

674 new_market_value = new_quantity * order.filled_price 

675 new_unrealized_pnl = new_market_value - ( 

676 new_avg_price * new_quantity 

677 ) 

678 else: 

679 # 没有持仓却要卖出,错误情况 

680 return 

681 

682 # 创建持仓记录 

683 position_data = PositionHistoryCreate( 

684 session_id=order.session_id, 

685 symbol=order.symbol, 

686 quantity=new_quantity, 

687 avg_price=new_avg_price, 

688 market_value=new_market_value, 

689 unrealized_pnl=new_unrealized_pnl, 

690 realized_pnl=Decimal("0"), 

691 ) 

692 

693 self.create_position_history(position_data) 

694 

695 # ===== 交易引擎管理 ===== 

696 

697 def _start_trading_engine(self, session_id: str, session: TradingSessionResponse): 

698 """启动交易会话引擎""" 

699 try: 

700 # 创建交易会话引擎 

701 session_engine = TradingSessionEngine( 

702 user_id=self.user_id, 

703 session_id=session_id, 

704 session_config=session.config, 

705 ) 

706 

707 # 初始化交易会话引擎 

708 if session_engine.initialize(): 

709 # 启动交易会话引擎 

710 if session_engine.start(): 

711 self.session_engines[session_id] = session_engine 

712 self._send_websocket_log( 

713 f"✅ 交易会话引擎已启动: {session_id}", "info" 

714 ) 

715 self._send_session_status_update(session_id, "running") 

716 else: 

717 self._send_websocket_log( 

718 f"❌ 交易会话引擎启动失败: {session_id}", "error" 

719 ) 

720 else: 

721 self._send_websocket_log( 

722 f"❌ 交易会话引擎初始化失败: {session_id}", "error" 

723 ) 

724 

725 except Exception as e: 

726 self._send_websocket_log(f"❌ 启动交易会话引擎失败: {e}", "error") 

727 

728 def _stop_trading_engine(self, session_id: str): 

729 """停止交易会话引擎""" 

730 try: 

731 # 停止交易会话引擎 

732 if session_id in self.session_engines: 

733 self.session_engines[session_id].stop() 

734 del self.session_engines[session_id] 

735 

736 self._send_websocket_log(f"✅ 交易会话引擎已停止: {session_id}", "info") 

737 self._send_session_status_update(session_id, "stopped") 

738 

739 except Exception as e: 

740 self._send_websocket_log(f"❌ 停止交易会话引擎失败: {e}", "error") 

741 

742 def _pause_trading_engine(self, session_id: str): 

743 """暂停交易会话引擎""" 

744 try: 

745 # 暂停交易会话引擎 

746 if session_id in self.session_engines: 

747 self.session_engines[session_id].pause() 

748 

749 self._send_websocket_log(f"⏸️ 交易会话引擎已暂停: {session_id}", "info") 

750 self._send_session_status_update(session_id, "paused") 

751 

752 except Exception as e: 

753 self._send_websocket_log(f"❌ 暂停交易会话引擎失败: {e}", "error") 

754 

755 def _resume_trading_engine(self, session_id: str): 

756 """恢复交易会话引擎""" 

757 try: 

758 # 恢复交易会话引擎 

759 if session_id in self.session_engines: 

760 self.session_engines[session_id].resume() 

761 

762 self._send_websocket_log(f"▶️ 交易会话引擎已恢复: {session_id}", "info") 

763 self._send_session_status_update(session_id, "running") 

764 

765 except Exception as e: 

766 self._send_websocket_log(f"❌ 恢复交易会话引擎失败: {e}", "error") 

767 

768 def get_trading_engine_status(self, session_id: str) -> Dict[str, Any]: 

769 """获取交易会话引擎状态""" 

770 status = {"session_engine": False, "is_running": False, "is_paused": False} 

771 

772 if session_id in self.session_engines: 

773 session_engine = self.session_engines[session_id] 

774 status["session_engine"] = True 

775 status["is_running"] = session_engine.is_running 

776 status["is_paused"] = session_engine.is_paused 

777 status["trading_mode"] = session_engine.trading_mode.value 

778 status["asset_mode"] = session_engine.asset_mode.value 

779 status["subscribed_symbols"] = session_engine.subscribed_symbols 

780 

781 # 获取性能指标(如果是模拟交易) 

782 if session_engine.trading_engine and hasattr( 

783 session_engine.trading_engine, "get_performance_metrics" 

784 ): 

785 status["performance_metrics"] = ( 

786 session_engine.trading_engine.get_performance_metrics() 

787 ) 

788 

789 # 获取策略信息 

790 if session_engine.strategy_engine: 

791 status["strategy_count"] = len( 

792 session_engine.strategy_engine.strategies 

793 ) 

794 

795 return status 

796 

797 # ===== 风险管理 ===== 

798 

799 def _create_risk_engine(self, session_id: str, session: TradingSessionResponse): 

800 """创建风险管理引擎""" 

801 try: 

802 # 从会话配置中获取风险配置 

803 risk_config = session.config.get("risk_config", {}) 

804 

805 # 创建风险管理引擎 

806 risk_engine = self.risk_service.create_risk_engine(session_id, risk_config) 

807 

808 # 通过WebSocket上报日志 

809 self._send_websocket_log(f"✅ 风险管理引擎已创建: {session_id}", "info") 

810 

811 except Exception as e: 

812 self._send_websocket_log(f"❌ 创建风险管理引擎失败: {e}", "error") 

813 

814 def _remove_risk_engine(self, session_id: str): 

815 """移除风险管理引擎""" 

816 try: 

817 success = self.risk_service.remove_risk_engine(session_id) 

818 if success: 

819 self._send_websocket_log(f"✅ 风险管理引擎已移除: {session_id}", "info") 

820 

821 except Exception as e: 

822 self._send_websocket_log(f"❌ 移除风险管理引擎失败: {e}", "error") 

823 

824 def _cleanup_session_engines(self, session_id: str): 

825 """清理交易会话引擎""" 

826 try: 

827 if session_id in self.session_engines: 

828 session_engine = self.session_engines[session_id] 

829 

830 # 停止交易会话引擎 

831 if hasattr(session_engine, "stop"): 

832 session_engine.stop() 

833 

834 # 从内存中移除 

835 del self.session_engines[session_id] 

836 self._send_websocket_log(f"✅ 已清理交易会话引擎: {session_id}", "info") 

837 

838 except Exception as e: 

839 self._send_websocket_log(f"❌ 清理交易会话引擎失败: {e}", "error") 

840 

841 def _cleanup_risk_engines(self, session_id: str): 

842 """清理风险管理引擎""" 

843 try: 

844 # 通过风险管理服务清理 

845 self._remove_risk_engine(session_id) 

846 self._send_websocket_log(f"✅ 已清理风险管理引擎: {session_id}", "info") 

847 

848 except Exception as e: 

849 self._send_websocket_log(f"❌ 清理风险管理引擎失败: {e}", "error") 

850 

851 def get_risk_summary(self, session_id: str) -> Optional[Dict[str, Any]]: 

852 """获取风险摘要""" 

853 return self.risk_service.get_risk_summary(session_id) 

854 

855 def _send_websocket_log(self, message: str, log_type: str = "log"): 

856 """发送WebSocket日志""" 

857 try: 

858 from core.services.websocket_service_factory import \ 

859 send_websocket_log 

860 

861 # 使用会话ID作为task_id 

862 send_websocket_log("trading_service", message, log_type) 

863 except Exception as e: 

864 print(f"❌ WebSocket日志发送失败: {e}") 

865 print(f"📝 {message}") 

866 

867 def _send_session_status_update(self, session_id: str, status: str): 

868 """发送会话状态更新""" 

869 try: 

870 # 发送会话状态更新消息 

871 import json 

872 

873 from core.services.websocket_service_factory import \ 

874 send_websocket_log 

875 

876 message = { 

877 "type": "session_status_update", 

878 "data": {"session_id": session_id, "status": status}, 

879 } 

880 send_websocket_log(session_id, json.dumps(message), "status") 

881 except Exception as e: 

882 print(f"❌ 会话状态更新发送失败: {e}") 

883 

884 def get_risk_events( 

885 self, session_id: str, limit: int = 100 

886 ) -> List[Dict[str, Any]]: 

887 """获取风险事件""" 

888 return self.risk_service.get_risk_events(session_id, limit) 

889 

890 def update_risk_config(self, session_id: str, new_config: Dict[str, Any]) -> bool: 

891 """更新风险配置""" 

892 return self.risk_service.update_risk_config(session_id, new_config) 

893 

894 def get_risk_recommendations(self, session_id: str) -> List[str]: 

895 """获取风险建议""" 

896 return self.risk_service.get_risk_recommendations(session_id) 

897 

898 def reset_daily_risk_metrics(self, session_id: str) -> bool: 

899 """重置日度风险指标""" 

900 return self.risk_service.reset_daily_risk_metrics(session_id)