Coverage for core/services/trading_service.py: 51.90%
420 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-13 18:58 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-13 18:58 +0000
1"""
2交易服务层
3"""
5import logging
6from datetime import datetime
7from decimal import Decimal
8from typing import Any, Dict, List, Optional
10from core.data_source.adapters.trade_adapter import TradeDataSourceAdapter
11from core.models.trading import (OrderSide, OrderStatus, OrderType,
12 PerformanceMetrics, PositionHistoryCreate,
13 PositionHistoryResponse, RiskConfig,
14 SessionStatus, TradingOrderCreate,
15 TradingOrderResponse, TradingOrderUpdate,
16 TradingSessionCreate, TradingSessionResponse,
17 TradingSessionUpdate)
18from core.repositories.trading_repository import TradingRepository
19from core.services.risk_service import RiskService
20from core.trading.engines import (StrategyEngine, TradingSessionEngine,
21 create_trading_engine)
22from core.trading.strategies import (create_strategy,
23 get_strategy_recommended_risk_config)
25logger = logging.getLogger(__name__)
28class TradingService:
29 """交易服务"""
31 def __init__(
32 self,
33 user_id: str,
34 trading_repository: Optional[TradingRepository] = None,
35 risk_service: Optional[RiskService] = None,
36 ):
37 self.user_id = user_id
38 self.trading_repo = trading_repository or TradingRepository()
39 self.trade_adapter = TradeDataSourceAdapter(user_id)
41 # 交易会话引擎
42 self.session_engines: Dict[str, TradingSessionEngine] = {}
44 # 风险管理服务
45 self.risk_service = risk_service or RiskService(user_id)
47 # ===== 交易会话管理 =====
49 def create_trading_session(
50 self, session_data: TradingSessionCreate
51 ) -> TradingSessionResponse:
52 """创建交易会话"""
53 # 如果风险配置为空,使用策略推荐的风险配置
54 if not session_data.risk_config.model_dump() or all(
55 v is None or v == [] for v in session_data.risk_config.model_dump().values()
56 ):
57 recommended_risk_config = get_strategy_recommended_risk_config(
58 session_data.strategy_name
59 )
60 if recommended_risk_config:
61 # 更新风险配置
62 session_data.risk_config = RiskConfig(**recommended_risk_config)
63 print(f"✅ 使用策略 {session_data.strategy_name} 推荐的风险配置")
65 # 如果策略参数为空,使用策略默认参数
66 if not session_data.strategy_params:
67 from core.trading.strategies import get_strategy_parameters
69 strategy_params = get_strategy_parameters(session_data.strategy_name)
70 if strategy_params:
71 # 构建默认参数字典
72 default_params = {}
73 for param in strategy_params:
74 if param.get("name") != "symbols": # 过滤掉symbols参数
75 default_params[param["name"]] = param["default_value"]
76 session_data.strategy_params = default_params
77 print(
78 f"✅ 使用策略 {session_data.strategy_name} 默认参数: {default_params}"
79 )
81 return self.trading_repo.create_trading_session(session_data, self.user_id)
83 def get_trading_session(self, session_id: str) -> Optional[TradingSessionResponse]:
84 """获取交易会话"""
85 session = self.trading_repo.get_trading_session(session_id)
86 if session and session.user_id != self.user_id:
87 return None # 只能访问自己的会话
88 return session
90 def update_trading_session(
91 self, session_id: str, update_data: TradingSessionUpdate
92 ) -> Optional[TradingSessionResponse]:
93 """更新交易会话"""
94 # 检查会话是否属于当前用户
95 session = self.get_trading_session(session_id)
96 if not session:
97 return None
99 return self.trading_repo.update_trading_session(session_id, update_data)
101 def delete_trading_session(self, session_id: str) -> bool:
102 """删除交易会话及其所有关联数据"""
103 # 检查会话是否属于当前用户
104 session = self.get_trading_session(session_id)
105 if not session:
106 return False
108 print(f"🗑️ 开始删除交易会话: {session_id}")
110 # 1. 停止并清理交易会话引擎
111 self._cleanup_session_engines(session_id)
113 # 3. 清理风险管理引擎
114 self._cleanup_risk_engines(session_id)
116 # 4. 删除数据库中的所有相关数据
117 success = self.trading_repo.delete_trading_session(session_id)
119 if success:
120 print(f"🎉 交易会话删除完成: {session_id}")
121 else:
122 print(f"❌ 交易会话删除失败: {session_id}")
124 return success
126 def get_user_trading_sessions(
127 self, status: Optional[SessionStatus] = None, limit: int = 10, offset: int = 0
128 ) -> List[TradingSessionResponse]:
129 """获取用户交易会话列表"""
130 return self.trading_repo.get_user_trading_sessions(
131 self.user_id, status, limit, offset
132 )
134 def start_trading_session(
135 self, session_id: str
136 ) -> Optional[TradingSessionResponse]:
137 """启动交易会话"""
138 session = self.get_trading_session(session_id)
139 if not session:
140 return None
142 if session.status not in [SessionStatus.CREATED, SessionStatus.STOPPED]:
143 return None # 只能启动已创建或已停止的会话
145 update_data = TradingSessionUpdate(status=SessionStatus.RUNNING)
147 # 更新开始时间
148 updated_session = self.trading_repo.update_trading_session(
149 session_id, update_data
150 )
151 if updated_session:
152 # 立即发送状态更新
153 self._send_session_status_update(session_id, "running")
155 # 异步启动交易引擎和风险管理引擎
156 import threading
158 def async_start():
159 try:
160 self._send_websocket_log(
161 f"🚀 开始异步初始化交易会话: {session_id}", "info"
162 )
164 # 启动交易引擎
165 self._start_trading_engine(session_id, updated_session)
167 # 创建风险管理引擎
168 self._create_risk_engine(session_id, updated_session)
170 self._send_websocket_log(
171 f"✅ 交易会话异步初始化完成: {session_id}", "info"
172 )
173 except Exception as e:
174 self._send_websocket_log(
175 f"❌ 交易会话异步初始化失败: {session_id} - {e}", "error"
176 )
177 # 如果初始化失败,更新状态为停止
178 self._send_session_status_update(session_id, "stopped")
180 threading.Thread(target=async_start, daemon=True).start()
182 return updated_session
184 def stop_trading_session(self, session_id: str) -> Optional[TradingSessionResponse]:
185 """停止交易会话"""
186 session = self.get_trading_session(session_id)
187 if not session:
188 return None
190 if session.status not in [SessionStatus.RUNNING, SessionStatus.PAUSED]:
191 return None # 只能停止运行中或暂停的会话
193 update_data = TradingSessionUpdate(status=SessionStatus.STOPPED)
195 # 更新结束时间
196 updated_session = self.trading_repo.update_trading_session(
197 session_id, update_data
198 )
199 if updated_session:
200 # 停止交易引擎
201 self._stop_trading_engine(session_id)
203 # 移除风险管理引擎
204 self._remove_risk_engine(session_id)
206 return updated_session
208 def pause_trading_session(
209 self, session_id: str
210 ) -> Optional[TradingSessionResponse]:
211 """暂停交易会话"""
212 session = self.get_trading_session(session_id)
213 if not session:
214 return None
216 if session.status != SessionStatus.RUNNING:
217 return None # 只能暂停运行中的会话
219 update_data = TradingSessionUpdate(status=SessionStatus.PAUSED)
221 updated_session = self.trading_repo.update_trading_session(
222 session_id, update_data
223 )
224 if updated_session:
225 # 暂停交易引擎
226 self._pause_trading_engine(session_id)
228 return updated_session
230 def resume_trading_session(
231 self, session_id: str
232 ) -> Optional[TradingSessionResponse]:
233 """恢复交易会话"""
234 session = self.get_trading_session(session_id)
235 if not session:
236 return None
238 if session.status != SessionStatus.PAUSED:
239 return None # 只能恢复暂停的会话
241 update_data = TradingSessionUpdate(status=SessionStatus.RUNNING)
243 updated_session = self.trading_repo.update_trading_session(
244 session_id, update_data
245 )
246 if updated_session:
247 # 恢复交易引擎
248 self._resume_trading_engine(session_id)
250 return updated_session
252 # ===== 交易订单管理 =====
254 def create_trading_order(
255 self, order_data: TradingOrderCreate
256 ) -> Optional[TradingOrderResponse]:
257 """创建交易订单"""
258 # 检查会话是否属于当前用户
259 session = self.get_trading_session(order_data.session_id)
260 if not session:
261 return None
263 if session.status != SessionStatus.RUNNING:
264 return None # 只能在运行中的会话中创建订单
266 return self.trading_repo.create_trading_order(order_data)
268 def get_trading_order(self, order_id: str) -> Optional[TradingOrderResponse]:
269 """获取交易订单"""
270 order = self.trading_repo.get_trading_order(order_id)
271 if not order:
272 return None
274 # 检查订单所属会话是否属于当前用户
275 session = self.get_trading_session(order.session_id)
276 if not session:
277 return None
279 return order
281 def update_trading_order(
282 self, order_id: str, update_data: TradingOrderUpdate
283 ) -> Optional[TradingOrderResponse]:
284 """更新交易订单"""
285 # 检查订单是否属于当前用户
286 order = self.get_trading_order(order_id)
287 if not order:
288 return None
290 return self.trading_repo.update_trading_order(order_id, update_data)
292 def get_session_orders(
293 self,
294 session_id: str,
295 status: Optional[OrderStatus] = None,
296 limit: int = 100,
297 offset: int = 0,
298 ) -> List[TradingOrderResponse]:
299 """获取会话订单列表"""
300 # 检查会话是否属于当前用户
301 session = self.get_trading_session(session_id)
302 if not session:
303 return []
305 return self.trading_repo.get_session_orders(session_id, status, limit, offset)
307 def cancel_order(self, order_id: str) -> bool:
308 """取消订单"""
309 order = self.get_trading_order(order_id)
310 if not order:
311 return False
313 if order.status not in [OrderStatus.PENDING]:
314 return False # 只能取消待处理的订单
316 update_data = TradingOrderUpdate(status=OrderStatus.CANCELLED)
318 updated_order = self.trading_repo.update_trading_order(order_id, update_data)
319 return updated_order is not None
321 # ===== 持仓记录管理 =====
323 def create_position_history(
324 self, position_data: PositionHistoryCreate
325 ) -> Optional[PositionHistoryResponse]:
326 """创建持仓记录"""
327 # 检查会话是否属于当前用户
328 session = self.get_trading_session(position_data.session_id)
329 if not session:
330 return None
332 return self.trading_repo.create_position_history(position_data)
334 def get_session_positions(
335 self, session_id: str, limit: int = 100, offset: int = 0
336 ) -> List[PositionHistoryResponse]:
337 """获取会话持仓记录列表"""
338 # 检查会话是否属于当前用户
339 session = self.get_trading_session(session_id)
340 if not session:
341 return []
343 return self.trading_repo.get_session_positions(session_id, limit, offset)
345 def get_latest_positions(
346 self, session_id: str
347 ) -> Dict[str, PositionHistoryResponse]:
348 """获取最新持仓快照"""
349 # 检查会话是否属于当前用户
350 session = self.get_trading_session(session_id)
351 if not session:
352 return {}
354 return self.trading_repo.get_latest_positions(session_id)
356 def get_session_logs(
357 self,
358 session_id: str,
359 page: int = 1,
360 page_size: int = 20,
361 search: Optional[str] = None,
362 start_date: Optional[str] = None,
363 end_date: Optional[str] = None,
364 component: Optional[str] = None,
365 ) -> Dict[str, Any]:
366 """获取会话策略日志"""
367 # 检查会话是否属于当前用户
368 session = self.get_trading_session(session_id)
369 if not session:
370 return {"logs": [], "total": 0}
372 return self.trading_repo.get_session_logs(
373 session_id, page, page_size, search, start_date, end_date, component
374 )
376 # ===== 性能分析 =====
378 def calculate_performance_metrics(
379 self, session_id: str
380 ) -> Optional[PerformanceMetrics]:
381 """计算性能指标"""
382 session = self.get_trading_session(session_id)
383 if not session:
384 return None
386 # 获取所有订单
387 orders = self.get_session_orders(session_id)
388 if not orders:
389 # 没有订单时返回默认性能指标
390 return PerformanceMetrics(
391 total_return=Decimal("0"),
392 annualized_return=Decimal("0"),
393 max_drawdown=Decimal("0"),
394 sharpe_ratio=Decimal("0"),
395 win_rate=Decimal("0"),
396 total_trades=0,
397 winning_trades=0,
398 losing_trades=0,
399 total_fees=Decimal("0"),
400 platform_fees=Decimal("0"),
401 activity_fees=Decimal("0"),
402 clearing_fees=Decimal("0"),
403 audit_fees=Decimal("0"),
404 )
406 # 计算基本指标
407 total_trades = len(orders)
408 filled_orders = [
409 order for order in orders if order.status == OrderStatus.FILLED
410 ]
411 winning_trades = 0
412 losing_trades = 0
414 # 初始化费用统计
415 total_fees = Decimal("0")
416 platform_fees = Decimal("0")
417 activity_fees = Decimal("0")
418 clearing_fees = Decimal("0")
419 audit_fees = Decimal("0")
421 # 计算盈亏 - 使用FIFO方法配对买卖订单
422 total_pnl = Decimal("0")
423 buy_orders = [] # 买入订单队列(FIFO)
425 for order in filled_orders:
426 # 累加费用统计
427 if order.trading_fee and isinstance(order.trading_fee, dict):
428 total_fees += Decimal(str(order.trading_fee.get("total_fee", "0")))
429 platform_fees += Decimal(
430 str(order.trading_fee.get("platform_fee", "0"))
431 )
432 activity_fees += Decimal(
433 str(order.trading_fee.get("activity_fee", "0"))
434 )
435 clearing_fees += Decimal(
436 str(order.trading_fee.get("clearing_fee", "0"))
437 )
438 audit_fees += Decimal(str(order.trading_fee.get("audit_fee", "0")))
440 if order.filled_price and order.filled_quantity:
441 if order.side == OrderSide.BUY:
442 # 买入订单,加入队列
443 # 计算买入费用
444 buy_fee = Decimal("0")
445 if order.trading_fee and isinstance(order.trading_fee, dict):
446 buy_fee = Decimal(str(order.trading_fee.get("total_fee", "0")))
447 elif order.commission:
448 buy_fee = order.commission
450 buy_orders.append(
451 {
452 "price": order.filled_price,
453 "quantity": order.filled_quantity,
454 "commission": buy_fee,
455 }
456 )
457 elif order.side == OrderSide.SELL:
458 # 卖出订单,与买入订单配对计算盈亏
459 sell_quantity = order.filled_quantity
460 sell_price = order.filled_price
461 # 计算卖出费用
462 sell_fee = Decimal("0")
463 if order.trading_fee and isinstance(order.trading_fee, dict):
464 sell_fee = Decimal(str(order.trading_fee.get("total_fee", "0")))
465 elif order.commission:
466 sell_fee = order.commission
468 while sell_quantity > 0 and buy_orders:
469 buy_order = buy_orders[0]
470 buy_quantity = buy_order["quantity"]
471 buy_price = buy_order["price"]
472 buy_commission = buy_order["commission"]
474 # 计算配对数量
475 pair_quantity = min(sell_quantity, buy_quantity)
477 # 计算这笔交易的盈亏
478 trade_pnl = (
479 (sell_price - buy_price) * pair_quantity
480 - buy_order["commission"]
481 - sell_fee
482 )
483 total_pnl += trade_pnl
485 # 更新胜率统计
486 if trade_pnl > 0:
487 winning_trades += 1
488 elif trade_pnl < 0:
489 losing_trades += 1
491 # 更新数量
492 sell_quantity -= pair_quantity
493 buy_quantity -= pair_quantity
495 if buy_quantity > 0:
496 # 买入订单还有剩余
497 buy_orders[0]["quantity"] = buy_quantity
498 else:
499 # 买入订单完全配对,移除
500 buy_orders.pop(0)
502 # 计算胜率
503 if total_trades > 0:
504 win_rate = Decimal(str(winning_trades)) / Decimal(str(total_trades))
505 else:
506 win_rate = Decimal("0")
508 # 计算总收益率 - 使用配置中的正确初始资金
509 initial_capital = Decimal("100000") # 默认值
510 if hasattr(session, "config") and session.config:
511 import json
513 config = (
514 json.loads(session.config)
515 if isinstance(session.config, str)
516 else session.config
517 )
518 if (
519 config
520 and "initial_capital_by_currency" in config
521 and config["initial_capital_by_currency"]
522 ):
523 usd_capital = config["initial_capital_by_currency"].get("USD", {})
524 if "cash_assets" in usd_capital:
525 initial_capital = Decimal(str(usd_capital["cash_assets"]))
526 elif (
527 config and "initial_capital" in config and config["initial_capital"] > 0
528 ):
529 initial_capital = Decimal(str(config["initial_capital"]))
530 elif session.initial_capital > 0:
531 initial_capital = session.initial_capital
533 if initial_capital > 0:
534 total_return = total_pnl / initial_capital
535 else:
536 total_return = Decimal("0")
538 # 简化计算,实际需要更复杂的指标计算
539 return PerformanceMetrics(
540 total_return=total_return,
541 annualized_return=total_return, # 简化
542 max_drawdown=Decimal("0"), # 需要历史数据计算
543 sharpe_ratio=Decimal("0"), # 需要波动率数据
544 win_rate=win_rate,
545 total_trades=total_trades,
546 winning_trades=winning_trades,
547 losing_trades=losing_trades,
548 total_fees=total_fees,
549 platform_fees=platform_fees,
550 activity_fees=activity_fees,
551 clearing_fees=clearing_fees,
552 audit_fees=audit_fees,
553 )
555 # ===== 风险控制 =====
557 def validate_order_risk(self, order_data: TradingOrderCreate) -> Dict[str, Any]:
558 """验证订单风险"""
559 session = self.get_trading_session(order_data.session_id)
560 if not session:
561 return {"valid": False, "errors": ["会话不存在"]}
563 errors = []
564 warnings = []
566 # 获取风险配置
567 risk_config = session.config.get("risk_config", {})
569 # 检查允许交易的股票代码
570 allowed_symbols = risk_config.get("allowed_symbols", [])
571 if allowed_symbols and order_data.symbol not in allowed_symbols:
572 errors.append(f"股票 {order_data.symbol} 不在允许交易列表中")
574 # 检查单只股票最大持仓比例
575 max_position_ratio = risk_config.get("max_position_ratio", 0.1)
576 current_positions = self.get_latest_positions(order_data.session_id)
578 if order_data.symbol in current_positions:
579 current_position = current_positions[order_data.symbol]
580 current_ratio = current_position.market_value / session.initial_capital
581 if current_ratio > max_position_ratio:
582 errors.append(
583 f"股票 {order_data.symbol} 当前持仓比例 {current_ratio:.2%} 超过最大限制 {max_position_ratio:.2%}"
584 )
586 # 检查资金充足性
587 if order_data.side == OrderSide.BUY:
588 required_amount = order_data.quantity * (order_data.price or Decimal("0"))
589 # 这里需要获取当前可用资金,简化处理
590 if required_amount > session.initial_capital:
591 errors.append("资金不足")
593 return {"valid": len(errors) == 0, "errors": errors, "warnings": warnings}
595 # ===== 模拟交易支持 =====
597 def submit_simulation_order(
598 self, order_data: TradingOrderCreate
599 ) -> Optional[TradingOrderResponse]:
600 """提交模拟交易订单"""
601 # 检查会话是否为模拟模式
602 session = self.get_trading_session(order_data.session_id)
603 if not session:
604 return None
606 if session.asset_mode.value != "simulation":
607 return None # 只能提交模拟订单
609 # 风险检查
610 risk_check = self.validate_order_risk(order_data)
611 if not risk_check["valid"]:
612 return None
614 # 创建订单
615 order = self.create_trading_order(order_data)
616 if not order:
617 return None
619 # 模拟成交逻辑(简化处理)
620 if order.order_type == OrderType.MARKET:
621 # 市价单立即成交
622 update_data = TradingOrderUpdate(
623 status=OrderStatus.FILLED,
624 filled_price=Decimal("100"), # 模拟价格
625 filled_quantity=order.quantity,
626 commission=Decimal("1"), # 模拟手续费
627 )
628 self.update_trading_order(order.id, update_data)
630 # 更新持仓记录
631 self._update_simulation_position(order)
633 return self.get_trading_order(order.id)
635 def _update_simulation_position(self, order: TradingOrderResponse):
636 """更新模拟持仓"""
637 if not order.filled_price or not order.filled_quantity:
638 return
640 # 获取当前持仓
641 current_positions = self.get_latest_positions(order.session_id)
642 current_position = current_positions.get(order.symbol)
644 if order.side == OrderSide.BUY:
645 # 买入
646 if current_position:
647 # 更新现有持仓
648 new_quantity = current_position.quantity + order.filled_quantity
649 new_avg_price = (
650 current_position.avg_price * current_position.quantity
651 + order.filled_price * order.filled_quantity
652 ) / new_quantity
653 new_market_value = new_quantity * order.filled_price
654 new_unrealized_pnl = new_market_value - (new_avg_price * new_quantity)
655 else:
656 # 新建持仓
657 new_quantity = order.filled_quantity
658 new_avg_price = order.filled_price
659 new_market_value = new_quantity * order.filled_price
660 new_unrealized_pnl = Decimal("0")
661 else:
662 # 卖出
663 if current_position:
664 new_quantity = current_position.quantity - order.filled_quantity
665 if new_quantity <= 0:
666 # 完全卖出
667 new_quantity = Decimal("0")
668 new_avg_price = Decimal("0")
669 new_market_value = Decimal("0")
670 new_unrealized_pnl = Decimal("0")
671 else:
672 # 部分卖出
673 new_avg_price = current_position.avg_price
674 new_market_value = new_quantity * order.filled_price
675 new_unrealized_pnl = new_market_value - (
676 new_avg_price * new_quantity
677 )
678 else:
679 # 没有持仓却要卖出,错误情况
680 return
682 # 创建持仓记录
683 position_data = PositionHistoryCreate(
684 session_id=order.session_id,
685 symbol=order.symbol,
686 quantity=new_quantity,
687 avg_price=new_avg_price,
688 market_value=new_market_value,
689 unrealized_pnl=new_unrealized_pnl,
690 realized_pnl=Decimal("0"),
691 )
693 self.create_position_history(position_data)
695 # ===== 交易引擎管理 =====
697 def _start_trading_engine(self, session_id: str, session: TradingSessionResponse):
698 """启动交易会话引擎"""
699 try:
700 # 创建交易会话引擎
701 session_engine = TradingSessionEngine(
702 user_id=self.user_id,
703 session_id=session_id,
704 session_config=session.config,
705 )
707 # 初始化交易会话引擎
708 if session_engine.initialize():
709 # 启动交易会话引擎
710 if session_engine.start():
711 self.session_engines[session_id] = session_engine
712 self._send_websocket_log(
713 f"✅ 交易会话引擎已启动: {session_id}", "info"
714 )
715 self._send_session_status_update(session_id, "running")
716 else:
717 self._send_websocket_log(
718 f"❌ 交易会话引擎启动失败: {session_id}", "error"
719 )
720 else:
721 self._send_websocket_log(
722 f"❌ 交易会话引擎初始化失败: {session_id}", "error"
723 )
725 except Exception as e:
726 self._send_websocket_log(f"❌ 启动交易会话引擎失败: {e}", "error")
728 def _stop_trading_engine(self, session_id: str):
729 """停止交易会话引擎"""
730 try:
731 # 停止交易会话引擎
732 if session_id in self.session_engines:
733 self.session_engines[session_id].stop()
734 del self.session_engines[session_id]
736 self._send_websocket_log(f"✅ 交易会话引擎已停止: {session_id}", "info")
737 self._send_session_status_update(session_id, "stopped")
739 except Exception as e:
740 self._send_websocket_log(f"❌ 停止交易会话引擎失败: {e}", "error")
742 def _pause_trading_engine(self, session_id: str):
743 """暂停交易会话引擎"""
744 try:
745 # 暂停交易会话引擎
746 if session_id in self.session_engines:
747 self.session_engines[session_id].pause()
749 self._send_websocket_log(f"⏸️ 交易会话引擎已暂停: {session_id}", "info")
750 self._send_session_status_update(session_id, "paused")
752 except Exception as e:
753 self._send_websocket_log(f"❌ 暂停交易会话引擎失败: {e}", "error")
755 def _resume_trading_engine(self, session_id: str):
756 """恢复交易会话引擎"""
757 try:
758 # 恢复交易会话引擎
759 if session_id in self.session_engines:
760 self.session_engines[session_id].resume()
762 self._send_websocket_log(f"▶️ 交易会话引擎已恢复: {session_id}", "info")
763 self._send_session_status_update(session_id, "running")
765 except Exception as e:
766 self._send_websocket_log(f"❌ 恢复交易会话引擎失败: {e}", "error")
768 def get_trading_engine_status(self, session_id: str) -> Dict[str, Any]:
769 """获取交易会话引擎状态"""
770 status = {"session_engine": False, "is_running": False, "is_paused": False}
772 if session_id in self.session_engines:
773 session_engine = self.session_engines[session_id]
774 status["session_engine"] = True
775 status["is_running"] = session_engine.is_running
776 status["is_paused"] = session_engine.is_paused
777 status["trading_mode"] = session_engine.trading_mode.value
778 status["asset_mode"] = session_engine.asset_mode.value
779 status["subscribed_symbols"] = session_engine.subscribed_symbols
781 # 获取性能指标(如果是模拟交易)
782 if session_engine.trading_engine and hasattr(
783 session_engine.trading_engine, "get_performance_metrics"
784 ):
785 status["performance_metrics"] = (
786 session_engine.trading_engine.get_performance_metrics()
787 )
789 # 获取策略信息
790 if session_engine.strategy_engine:
791 status["strategy_count"] = len(
792 session_engine.strategy_engine.strategies
793 )
795 return status
797 # ===== 风险管理 =====
799 def _create_risk_engine(self, session_id: str, session: TradingSessionResponse):
800 """创建风险管理引擎"""
801 try:
802 # 从会话配置中获取风险配置
803 risk_config = session.config.get("risk_config", {})
805 # 创建风险管理引擎
806 risk_engine = self.risk_service.create_risk_engine(session_id, risk_config)
808 # 通过WebSocket上报日志
809 self._send_websocket_log(f"✅ 风险管理引擎已创建: {session_id}", "info")
811 except Exception as e:
812 self._send_websocket_log(f"❌ 创建风险管理引擎失败: {e}", "error")
814 def _remove_risk_engine(self, session_id: str):
815 """移除风险管理引擎"""
816 try:
817 success = self.risk_service.remove_risk_engine(session_id)
818 if success:
819 self._send_websocket_log(f"✅ 风险管理引擎已移除: {session_id}", "info")
821 except Exception as e:
822 self._send_websocket_log(f"❌ 移除风险管理引擎失败: {e}", "error")
824 def _cleanup_session_engines(self, session_id: str):
825 """清理交易会话引擎"""
826 try:
827 if session_id in self.session_engines:
828 session_engine = self.session_engines[session_id]
830 # 停止交易会话引擎
831 if hasattr(session_engine, "stop"):
832 session_engine.stop()
834 # 从内存中移除
835 del self.session_engines[session_id]
836 self._send_websocket_log(f"✅ 已清理交易会话引擎: {session_id}", "info")
838 except Exception as e:
839 self._send_websocket_log(f"❌ 清理交易会话引擎失败: {e}", "error")
841 def _cleanup_risk_engines(self, session_id: str):
842 """清理风险管理引擎"""
843 try:
844 # 通过风险管理服务清理
845 self._remove_risk_engine(session_id)
846 self._send_websocket_log(f"✅ 已清理风险管理引擎: {session_id}", "info")
848 except Exception as e:
849 self._send_websocket_log(f"❌ 清理风险管理引擎失败: {e}", "error")
851 def get_risk_summary(self, session_id: str) -> Optional[Dict[str, Any]]:
852 """获取风险摘要"""
853 return self.risk_service.get_risk_summary(session_id)
855 def _send_websocket_log(self, message: str, log_type: str = "log"):
856 """发送WebSocket日志"""
857 try:
858 from core.services.websocket_service_factory import \
859 send_websocket_log
861 # 使用会话ID作为task_id
862 send_websocket_log("trading_service", message, log_type)
863 except Exception as e:
864 print(f"❌ WebSocket日志发送失败: {e}")
865 print(f"📝 {message}")
867 def _send_session_status_update(self, session_id: str, status: str):
868 """发送会话状态更新"""
869 try:
870 # 发送会话状态更新消息
871 import json
873 from core.services.websocket_service_factory import \
874 send_websocket_log
876 message = {
877 "type": "session_status_update",
878 "data": {"session_id": session_id, "status": status},
879 }
880 send_websocket_log(session_id, json.dumps(message), "status")
881 except Exception as e:
882 print(f"❌ 会话状态更新发送失败: {e}")
884 def get_risk_events(
885 self, session_id: str, limit: int = 100
886 ) -> List[Dict[str, Any]]:
887 """获取风险事件"""
888 return self.risk_service.get_risk_events(session_id, limit)
890 def update_risk_config(self, session_id: str, new_config: Dict[str, Any]) -> bool:
891 """更新风险配置"""
892 return self.risk_service.update_risk_config(session_id, new_config)
894 def get_risk_recommendations(self, session_id: str) -> List[str]:
895 """获取风险建议"""
896 return self.risk_service.get_risk_recommendations(session_id)
898 def reset_daily_risk_metrics(self, session_id: str) -> bool:
899 """重置日度风险指标"""
900 return self.risk_service.reset_daily_risk_metrics(session_id)