Coverage for core/repositories/trading_repository.py: 38.97%
331 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-13 18:58 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-13 18:58 +0000
1"""
2交易数据访问层
3"""
5import json
6import logging
7import uuid
8from datetime import datetime
9from decimal import Decimal
10from typing import Any, Dict, List, Optional
12logger = logging.getLogger(__name__)
15class DecimalEncoder(json.JSONEncoder):
16 """Decimal类型JSON编码器"""
18 def default(self, obj):
19 if isinstance(obj, Decimal):
20 return float(obj)
21 elif isinstance(obj, datetime):
22 return obj.isoformat()
23 return super().default(obj)
26from core.models.trading import (AssetMode, OrderSide, OrderStatus, OrderType,
27 PositionHistoryCreate,
28 PositionHistoryResponse, SessionStatus,
29 TradingMode, TradingOrderCreate,
30 TradingOrderResponse, TradingOrderUpdate,
31 TradingSessionCreate, TradingSessionResponse,
32 TradingSessionUpdate)
33from infrastructure.database.redis_client import get_redis
36class TradingRepository:
37 """交易数据访问层"""
39 def __init__(self):
40 self.redis_client = get_redis()
42 # ===== 交易会话管理 =====
44 def create_trading_session(
45 self, session_data: TradingSessionCreate, user_id: str
46 ) -> TradingSessionResponse:
47 """创建交易会话"""
48 session_id = str(uuid.uuid4())
49 now = datetime.now()
51 session_dict = {
52 "id": session_id,
53 "user_id": user_id,
54 "session_name": session_data.session_name,
55 "trading_mode": session_data.trading_mode.value,
56 "asset_mode": session_data.asset_mode.value,
57 "strategy_name": session_data.strategy_name,
58 "status": SessionStatus.CREATED.value,
59 "start_time": None,
60 "end_time": None,
61 "initial_capital": str(session_data.initial_capital),
62 "final_capital": str(
63 session_data.initial_capital
64 ), # 期末总资产初始等于期初总资产
65 "initial_capital_by_currency": (
66 json.dumps(session_data.initial_capital_by_currency, cls=DecimalEncoder)
67 if session_data.initial_capital_by_currency
68 else None
69 ),
70 "final_capital_by_currency": (
71 json.dumps(session_data.initial_capital_by_currency, cls=DecimalEncoder)
72 if session_data.initial_capital_by_currency
73 else None
74 ), # 期末总资产初始等于期初总资产
75 "config": session_data.model_dump(),
76 "strategy_params": json.dumps(
77 session_data.strategy_params, cls=DecimalEncoder
78 ),
79 "tradable_symbols": (
80 json.dumps(session_data.tradable_symbols, cls=DecimalEncoder)
81 if session_data.tradable_symbols
82 else None
83 ),
84 "performance_metrics": None,
85 "created_at": now.isoformat(),
86 "updated_at": now.isoformat(),
87 }
89 # 存储到Redis
90 key = f"trading_session:{session_id}"
91 # 过滤掉None值并转换字典为JSON字符串
92 filtered_dict = {}
93 for k, v in session_dict.items():
94 if v is not None:
95 if isinstance(v, dict):
96 filtered_dict[k] = json.dumps(v, cls=DecimalEncoder)
97 else:
98 filtered_dict[k] = str(v)
99 self.redis_client.client.hset(key, mapping=filtered_dict)
101 # 添加到用户会话列表
102 user_sessions_key = f"user_trading_sessions:{user_id}"
103 self.redis_client.client.sadd(user_sessions_key, session_id)
105 return self._dict_to_session_response(session_dict)
107 def get_trading_session(self, session_id: str) -> Optional[TradingSessionResponse]:
108 """获取交易会话"""
109 key = f"trading_session:{session_id}"
110 session_dict = self.redis_client.client.hgetall(key)
112 if not session_dict:
113 return None
115 return self._dict_to_session_response(session_dict)
117 def update_trading_session(
118 self, session_id: str, update_data: TradingSessionUpdate
119 ) -> Optional[TradingSessionResponse]:
120 """更新交易会话"""
121 key = f"trading_session:{session_id}"
123 # 检查会话是否存在
124 if not self.redis_client.client.exists(key):
125 return None
127 # 准备更新数据
128 update_dict = {}
129 if update_data.session_name is not None:
130 update_dict["session_name"] = update_data.session_name
131 if update_data.status is not None:
132 update_dict["status"] = update_data.status.value
133 if update_data.risk_config is not None:
134 # 更新配置中的风险配置
135 current_config = self.redis_client.client.hget(key, "config")
136 if current_config:
137 config = json.loads(current_config)
138 config["risk_config"] = update_data.risk_config.dict()
139 update_dict["config"] = json.dumps(config)
141 update_dict["updated_at"] = datetime.now().isoformat()
143 # 更新Redis
144 if update_dict:
145 self.redis_client.client.hset(key, mapping=update_dict)
147 return self.get_trading_session(session_id)
149 def delete_trading_session(self, session_id: str) -> bool:
150 """删除交易会话及其所有关联数据"""
151 key = f"trading_session:{session_id}"
153 # 获取会话信息
154 session_dict = self.redis_client.client.hgetall(key)
155 if not session_dict:
156 return False
158 user_id = session_dict.get("user_id")
160 print(f"🗑️ 开始删除交易会话: {session_id}")
162 # 1. 删除会话基本信息
163 self.redis_client.client.delete(key)
164 print(f"✅ 已删除会话基本信息: {key}")
166 # 2. 从用户会话列表中移除
167 if user_id:
168 user_sessions_key = f"user_trading_sessions:{user_id}"
169 self.redis_client.client.srem(user_sessions_key, session_id)
170 print(f"✅ 已从用户会话列表中移除: {user_sessions_key}")
172 # 3. 删除相关订单记录
173 self._delete_session_orders(session_id)
175 # 4. 删除相关持仓记录
176 self._delete_session_positions(session_id)
178 # 5. 删除策略日志
179 self._delete_session_logs(session_id)
181 # 6. 删除风险相关数据
182 self._delete_session_risk_data(session_id)
184 # 7. 删除性能指标数据
185 self._delete_session_performance_data(session_id)
187 # 8. 删除实时监控数据
188 self._delete_session_monitoring_data(session_id)
190 # 9. 删除模拟交易引擎数据
191 self._delete_session_simulation_data(session_id)
193 # 10. 清理WebSocket连接
194 self._cleanup_websocket_connections(session_id)
196 # 11. 删除回测相关数据
197 self._delete_session_backtest_data(session_id)
199 print(f"🎉 交易会话删除完成: {session_id}")
200 return True
202 def get_user_trading_sessions(
203 self,
204 user_id: str,
205 status: Optional[SessionStatus] = None,
206 limit: int = 10,
207 offset: int = 0,
208 ) -> List[TradingSessionResponse]:
209 """获取用户交易会话列表"""
210 user_sessions_key = f"user_trading_sessions:{user_id}"
211 session_ids = self.redis_client.client.smembers(user_sessions_key)
213 sessions = []
214 for session_id in session_ids:
215 session = self.get_trading_session(
216 session_id.decode() if isinstance(session_id, bytes) else session_id
217 )
218 if session:
219 if status is None or session.status == status:
220 sessions.append(session)
222 # 按创建时间倒序排序
223 sessions.sort(key=lambda x: x.created_at, reverse=True)
225 # 分页
226 return sessions[offset : offset + limit]
228 # ===== 交易订单管理 =====
230 def create_trading_order(
231 self, order_data: TradingOrderCreate
232 ) -> TradingOrderResponse:
233 """创建交易订单"""
234 order_id = str(uuid.uuid4())
235 now = datetime.now()
237 order_dict = {
238 "id": order_id,
239 "session_id": order_data.session_id,
240 "symbol": order_data.symbol,
241 "side": order_data.side.value,
242 "order_type": order_data.order_type.value,
243 "quantity": str(order_data.quantity),
244 "price": str(order_data.price) if order_data.price else None,
245 "status": OrderStatus.PENDING.value,
246 "submitted_at": now.isoformat(),
247 "filled_at": None,
248 "cancelled_at": None,
249 "filled_price": None,
250 "filled_quantity": None,
251 "commission": "0",
252 "trading_fee": (
253 json.dumps(order_data.trading_fee, cls=DecimalEncoder)
254 if order_data.trading_fee
255 else None
256 ),
257 }
259 # 存储到Redis
260 key = f"trading_order:{order_id}"
261 # 过滤掉None值并转换字典为JSON字符串
262 filtered_dict = {}
263 for k, v in order_dict.items():
264 if v is not None:
265 if isinstance(v, dict):
266 filtered_dict[k] = json.dumps(v, cls=DecimalEncoder)
267 else:
268 filtered_dict[k] = str(v)
269 self.redis_client.client.hset(key, mapping=filtered_dict)
271 # 添加到会话订单列表
272 session_orders_key = f"session_orders:{order_data.session_id}"
273 self.redis_client.client.sadd(session_orders_key, order_id)
275 return self._dict_to_order_response(order_dict)
277 def get_trading_order(self, order_id: str) -> Optional[TradingOrderResponse]:
278 """获取交易订单"""
279 key = f"trading_order:{order_id}"
280 order_dict = self.redis_client.client.hgetall(key)
282 if not order_dict:
283 return None
285 return self._dict_to_order_response(order_dict)
287 def update_trading_order(
288 self, order_id: str, update_data: TradingOrderUpdate
289 ) -> Optional[TradingOrderResponse]:
290 """更新交易订单"""
291 key = f"trading_order:{order_id}"
293 # 检查订单是否存在
294 if not self.redis_client.client.exists(key):
295 return None
297 # 准备更新数据
298 update_dict = {}
299 if update_data.status is not None:
300 update_dict["status"] = update_data.status.value
301 if update_data.status == OrderStatus.FILLED:
302 update_dict["filled_at"] = datetime.now().isoformat()
303 elif update_data.status == OrderStatus.CANCELLED:
304 update_dict["cancelled_at"] = datetime.now().isoformat()
306 if update_data.filled_price is not None:
307 update_dict["filled_price"] = str(update_data.filled_price)
308 if update_data.filled_quantity is not None:
309 update_dict["filled_quantity"] = str(update_data.filled_quantity)
310 if update_data.commission is not None:
311 update_dict["commission"] = str(update_data.commission)
312 if update_data.trading_fee is not None:
313 update_dict["trading_fee"] = json.dumps(
314 update_data.trading_fee, cls=DecimalEncoder
315 )
317 # 更新Redis
318 if update_dict:
319 self.redis_client.client.hset(key, mapping=update_dict)
321 return self.get_trading_order(order_id)
323 def get_session_orders(
324 self,
325 session_id: str,
326 status: Optional[OrderStatus] = None,
327 limit: int = 100,
328 offset: int = 0,
329 ) -> List[TradingOrderResponse]:
330 """获取会话订单列表"""
331 session_orders_key = f"session_orders:{session_id}"
332 order_ids = self.redis_client.client.smembers(session_orders_key)
334 orders = []
335 for order_id in order_ids:
336 order = self.get_trading_order(
337 order_id.decode() if isinstance(order_id, bytes) else order_id
338 )
339 if order:
340 if status is None or order.status == status:
341 orders.append(order)
343 # 按提交时间倒序排序
344 orders.sort(key=lambda x: x.submitted_at, reverse=True)
346 # 分页
347 return orders[offset : offset + limit]
349 def get_pending_orders(self, session_id: str) -> List[TradingOrderResponse]:
350 """获取待处理订单列表"""
351 return self.get_session_orders(session_id, OrderStatus.PENDING)
353 # ===== 持仓记录管理 =====
355 def create_position_history(
356 self, position_data: PositionHistoryCreate
357 ) -> PositionHistoryResponse:
358 """创建持仓记录"""
359 position_id = str(uuid.uuid4())
360 now = datetime.now()
362 position_dict = {
363 "id": position_id,
364 "session_id": position_data.session_id,
365 "symbol": position_data.symbol,
366 "quantity": str(position_data.quantity),
367 "avg_price": str(position_data.avg_price),
368 "market_value": str(position_data.market_value),
369 "unrealized_pnl": str(position_data.unrealized_pnl),
370 "realized_pnl": str(position_data.realized_pnl),
371 "timestamp": now.isoformat(),
372 }
374 # 存储到Redis
375 key = f"position_history:{position_id}"
376 self.redis_client.client.hset(key, mapping=position_dict)
378 # 添加到会话持仓列表
379 session_positions_key = f"session_positions:{position_data.session_id}"
380 self.redis_client.client.sadd(session_positions_key, position_id)
382 return self._dict_to_position_response(position_dict)
384 def get_position_history(
385 self, position_id: str
386 ) -> Optional[PositionHistoryResponse]:
387 """获取持仓记录"""
388 key = f"position_history:{position_id}"
389 position_dict = self.redis_client.client.hgetall(key)
391 if not position_dict:
392 return None
394 return self._dict_to_position_response(position_dict)
396 def get_session_positions(
397 self, session_id: str, limit: int = 100, offset: int = 0
398 ) -> List[PositionHistoryResponse]:
399 """获取会话持仓记录列表"""
400 session_positions_key = f"session_positions:{session_id}"
401 position_ids = self.redis_client.client.smembers(session_positions_key)
403 positions = []
404 for position_id in position_ids:
405 position = self.get_position_history(
406 position_id.decode() if isinstance(position_id, bytes) else position_id
407 )
408 if position:
409 positions.append(position)
411 # 按时间戳倒序排序
412 positions.sort(key=lambda x: x.timestamp, reverse=True)
414 # 分页
415 return positions[offset : offset + limit]
417 def get_latest_positions(
418 self, session_id: str
419 ) -> Dict[str, PositionHistoryResponse]:
420 """获取最新持仓快照"""
421 positions = self.get_session_positions(session_id, limit=1000)
423 # 按股票代码分组,取最新的记录
424 latest_positions = {}
425 for position in positions:
426 if (
427 position.symbol not in latest_positions
428 or position.timestamp > latest_positions[position.symbol].timestamp
429 ):
430 latest_positions[position.symbol] = position
432 return latest_positions
434 def add_session_log(self, session_id: str, log_entry: Dict[str, Any]) -> bool:
435 """添加会话日志"""
436 try:
437 # 使用统一的日志键格式
438 logs_key = f"session_logs:{session_id}"
440 # 确保日志条目有必要的字段
441 if "id" not in log_entry:
442 log_entry["id"] = str(uuid.uuid4())
443 if "timestamp" not in log_entry:
444 log_entry["timestamp"] = datetime.now().isoformat(
445 timespec="milliseconds"
446 )
448 # 保存到Redis
449 self.redis_client.client.lpush(
450 logs_key, json.dumps(log_entry, cls=DecimalEncoder)
451 )
452 self.redis_client.client.ltrim(logs_key, 0, 99999) # 保留最近10万条
454 return True
455 except Exception as e:
456 logger.error(f"添加会话日志失败: {e}")
457 return False
459 def get_session_logs(
460 self,
461 session_id: str,
462 page: int = 1,
463 page_size: int = 20,
464 search: Optional[str] = None,
465 start_date: Optional[str] = None,
466 end_date: Optional[str] = None,
467 component: Optional[str] = None,
468 ) -> Dict[str, Any]:
469 """获取会话策略日志"""
470 try:
471 # 从Redis获取日志数据
472 logs_key = f"session_logs:{session_id}"
474 # 获取所有日志
475 all_logs = self.redis_client.lrange(logs_key, 0, -1)
477 # 解析日志数据
478 logs = []
479 for log_str in all_logs:
480 try:
481 log_data = json.loads(log_str)
483 # 应用筛选条件
484 if (
485 search
486 and search.lower() not in log_data.get("message", "").lower()
487 ):
488 continue
490 if component and component != log_data.get("component", ""):
491 continue
493 if start_date:
494 log_time = datetime.fromisoformat(log_data.get("timestamp", ""))
495 start_dt = datetime.fromisoformat(
496 start_date.replace("Z", "+00:00")
497 )
498 # 确保两个datetime对象都有时区信息
499 if log_time.tzinfo is None:
500 from datetime import timezone
502 log_time = log_time.replace(tzinfo=timezone.utc)
503 if start_dt.tzinfo is None:
504 from datetime import timezone
506 start_dt = start_dt.replace(tzinfo=timezone.utc)
507 if log_time < start_dt:
508 continue
510 if end_date:
511 log_time = datetime.fromisoformat(log_data.get("timestamp", ""))
512 end_dt = datetime.fromisoformat(end_date.replace("Z", "+00:00"))
513 # 确保两个datetime对象都有时区信息
514 if log_time.tzinfo is None:
515 from datetime import timezone
517 log_time = log_time.replace(tzinfo=timezone.utc)
518 if end_dt.tzinfo is None:
519 from datetime import timezone
521 end_dt = end_dt.replace(tzinfo=timezone.utc)
522 if log_time > end_dt:
523 continue
525 # 为没有id的日志条目生成唯一id
526 log_id = log_data.get("id", "")
527 if not log_id:
528 import uuid
530 log_id = str(uuid.uuid4())
532 logs.append(
533 {
534 "id": log_id,
535 "sequence": log_data.get("sequence", 0),
536 "timestamp": log_data.get("timestamp", ""),
537 "level": log_data.get("level", "INFO"),
538 "component": log_data.get("component", "unknown"),
539 "message": log_data.get("message", ""),
540 "data": log_data.get("data", {}),
541 }
542 )
544 except Exception as e:
545 logger.error(f"解析日志数据失败: {e}")
546 continue
548 # 按序号倒序排序(最新的日志序号最大)
549 logs.sort(key=lambda x: x.get("sequence", 0), reverse=True)
551 # 分页
552 total = len(logs)
553 start_idx = (page - 1) * page_size
554 end_idx = start_idx + page_size
555 paginated_logs = logs[start_idx:end_idx]
557 return {
558 "logs": paginated_logs,
559 "total": total,
560 "page": page,
561 "page_size": page_size,
562 }
564 except Exception as e:
565 logger.error(f"获取会话日志失败: {e}")
566 return {"logs": [], "total": 0, "page": page, "page_size": page_size}
568 # ===== 辅助方法 =====
570 def _dict_to_session_response(
571 self, session_dict: Dict[str, Any]
572 ) -> TradingSessionResponse:
573 """将字典转换为会话响应对象"""
574 # 处理字节字符串
575 if isinstance(session_dict, dict):
576 session_dict = {
577 k.decode() if isinstance(k, bytes) else k: (
578 v.decode() if isinstance(v, bytes) else v
579 )
580 for k, v in session_dict.items()
581 }
583 # 获取按货币分组的资产信息
584 initial_capital_by_currency = None
585 final_capital_by_currency = None
587 # 暂时设为None,避免循环导入和性能问题
588 # TODO: 后续可以通过缓存或其他方式优化
590 return TradingSessionResponse(
591 id=session_dict["id"],
592 user_id=session_dict["user_id"],
593 session_name=session_dict["session_name"],
594 trading_mode=TradingMode(session_dict["trading_mode"]),
595 asset_mode=AssetMode(session_dict["asset_mode"]),
596 strategy_name=session_dict["strategy_name"],
597 status=SessionStatus(session_dict["status"]),
598 start_time=(
599 datetime.fromisoformat(session_dict["start_time"])
600 if session_dict.get("start_time")
601 else None
602 ),
603 end_time=(
604 datetime.fromisoformat(session_dict["end_time"])
605 if session_dict.get("end_time")
606 else None
607 ),
608 initial_capital=Decimal(session_dict["initial_capital"]),
609 final_capital=(
610 Decimal(session_dict["final_capital"])
611 if session_dict.get("final_capital")
612 else None
613 ),
614 initial_capital_by_currency=(
615 json.loads(session_dict["initial_capital_by_currency"])
616 if session_dict.get("initial_capital_by_currency")
617 else None
618 ),
619 final_capital_by_currency=(
620 json.loads(session_dict["final_capital_by_currency"])
621 if session_dict.get("final_capital_by_currency")
622 else None
623 ),
624 config=(
625 json.loads(session_dict["config"])
626 if isinstance(session_dict["config"], str)
627 else session_dict["config"]
628 ),
629 strategy_params=(
630 json.loads(session_dict["strategy_params"])
631 if session_dict.get("strategy_params")
632 else {}
633 ),
634 tradable_symbols=(
635 json.loads(session_dict["tradable_symbols"])
636 if session_dict.get("tradable_symbols")
637 else None
638 ),
639 performance_metrics=(
640 json.loads(session_dict["performance_metrics"])
641 if session_dict.get("performance_metrics")
642 else None
643 ),
644 created_at=datetime.fromisoformat(session_dict["created_at"]),
645 updated_at=datetime.fromisoformat(session_dict["updated_at"]),
646 )
648 def _dict_to_order_response(
649 self, order_dict: Dict[str, Any]
650 ) -> TradingOrderResponse:
651 """将字典转换为订单响应对象"""
652 # 处理字节字符串
653 if isinstance(order_dict, dict):
654 order_dict = {
655 k.decode() if isinstance(k, bytes) else k: (
656 v.decode() if isinstance(v, bytes) else v
657 )
658 for k, v in order_dict.items()
659 }
661 # 处理trading_fee字段
662 trading_fee = None
663 if order_dict.get("trading_fee"):
664 try:
665 import json
667 trading_fee = (
668 json.loads(order_dict["trading_fee"])
669 if isinstance(order_dict["trading_fee"], str)
670 else order_dict["trading_fee"]
671 )
672 except (json.JSONDecodeError, TypeError):
673 trading_fee = None
675 return TradingOrderResponse(
676 id=order_dict["id"],
677 session_id=order_dict["session_id"],
678 symbol=order_dict["symbol"],
679 side=OrderSide(order_dict["side"]),
680 order_type=OrderType(order_dict["order_type"]),
681 quantity=Decimal(order_dict["quantity"]),
682 price=Decimal(order_dict["price"]) if order_dict.get("price") else None,
683 status=OrderStatus(order_dict["status"]),
684 submitted_at=datetime.fromisoformat(order_dict["submitted_at"]),
685 filled_at=(
686 datetime.fromisoformat(order_dict["filled_at"])
687 if order_dict.get("filled_at")
688 else None
689 ),
690 cancelled_at=(
691 datetime.fromisoformat(order_dict["cancelled_at"])
692 if order_dict.get("cancelled_at")
693 else None
694 ),
695 filled_price=(
696 Decimal(order_dict["filled_price"])
697 if order_dict.get("filled_price")
698 else None
699 ),
700 filled_quantity=(
701 Decimal(order_dict["filled_quantity"])
702 if order_dict.get("filled_quantity")
703 else None
704 ),
705 commission=Decimal(order_dict.get("commission", "0")),
706 trading_fee=trading_fee,
707 )
709 def _dict_to_position_response(
710 self, position_dict: Dict[str, Any]
711 ) -> PositionHistoryResponse:
712 """将字典转换为持仓响应对象"""
713 # 处理字节字符串
714 if isinstance(position_dict, dict):
715 position_dict = {
716 k.decode() if isinstance(k, bytes) else k: (
717 v.decode() if isinstance(v, bytes) else v
718 )
719 for k, v in position_dict.items()
720 }
722 return PositionHistoryResponse(
723 id=position_dict["id"],
724 session_id=position_dict["session_id"],
725 symbol=position_dict["symbol"],
726 quantity=Decimal(position_dict["quantity"]),
727 avg_price=Decimal(position_dict["avg_price"]),
728 market_value=Decimal(position_dict["market_value"]),
729 unrealized_pnl=Decimal(position_dict["unrealized_pnl"]),
730 realized_pnl=Decimal(position_dict["realized_pnl"]),
731 timestamp=datetime.fromisoformat(position_dict["timestamp"]),
732 )
734 def _delete_session_orders(self, session_id: str):
735 """删除会话相关订单"""
736 session_orders_key = f"session_orders:{session_id}"
737 order_ids = self.redis_client.client.smembers(session_orders_key)
739 for order_id in order_ids:
740 order_id_str = (
741 order_id.decode() if isinstance(order_id, bytes) else order_id
742 )
743 self.redis_client.client.delete(f"trading_order:{order_id_str}")
745 self.redis_client.client.delete(session_orders_key)
747 def _delete_session_positions(self, session_id: str):
748 """删除会话相关持仓记录"""
749 session_positions_key = f"session_positions:{session_id}"
750 position_ids = self.redis_client.client.smembers(session_positions_key)
752 deleted_count = 0
753 for position_id in position_ids:
754 position_id_str = (
755 position_id.decode() if isinstance(position_id, bytes) else position_id
756 )
757 if self.redis_client.client.delete(f"position_history:{position_id_str}"):
758 deleted_count += 1
760 self.redis_client.client.delete(session_positions_key)
761 print(f"✅ 已删除持仓记录: {deleted_count} 条")
763 def _delete_session_logs(self, session_id: str):
764 """删除会话策略日志"""
765 logs_key = f"trading_session:{session_id}:logs"
766 if self.redis_client.client.delete(logs_key):
767 print(f"✅ 已删除策略日志: {logs_key}")
769 def _delete_session_risk_data(self, session_id: str):
770 """删除会话风险相关数据"""
771 risk_keys = [
772 f"risk_config:{session_id}",
773 f"risk_events:{session_id}",
774 f"risk_summary:{session_id}",
775 f"risk_metrics:{session_id}",
776 f"risk_violations:{session_id}",
777 f"risk_alerts:{session_id}",
778 ]
780 deleted_count = 0
781 for risk_key in risk_keys:
782 if self.redis_client.client.delete(risk_key):
783 deleted_count += 1
785 print(f"✅ 已删除风险数据: {deleted_count} 个键")
787 def _delete_session_performance_data(self, session_id: str):
788 """删除会话性能指标数据"""
789 performance_keys = [
790 f"performance_metrics:{session_id}",
791 f"daily_returns:{session_id}",
792 f"trade_history:{session_id}",
793 f"backtest_results:{session_id}",
794 f"strategy_performance:{session_id}",
795 ]
797 deleted_count = 0
798 for perf_key in performance_keys:
799 if self.redis_client.client.delete(perf_key):
800 deleted_count += 1
802 print(f"✅ 已删除性能数据: {deleted_count} 个键")
804 def _delete_session_monitoring_data(self, session_id: str):
805 """删除会话实时监控数据"""
806 monitoring_keys = [
807 f"monitoring:{session_id}",
808 f"realtime_data:{session_id}",
809 f"market_data_cache:{session_id}",
810 f"websocket_connections:{session_id}",
811 f"session_status:{session_id}",
812 ]
814 deleted_count = 0
815 for monitor_key in monitoring_keys:
816 if self.redis_client.client.delete(monitor_key):
817 deleted_count += 1
819 print(f"✅ 已删除监控数据: {deleted_count} 个键")
821 def _delete_session_simulation_data(self, session_id: str):
822 """删除会话模拟交易引擎数据"""
823 simulation_keys = [
824 f"simulation_engine:{session_id}",
825 f"simulation_orders:{session_id}",
826 f"simulation_positions:{session_id}",
827 f"simulation_trades:{session_id}",
828 f"simulation_balance:{session_id}",
829 f"simulation_metrics:{session_id}",
830 ]
832 deleted_count = 0
833 for sim_key in simulation_keys:
834 if self.redis_client.client.delete(sim_key):
835 deleted_count += 1
837 print(f"✅ 已删除模拟交易数据: {deleted_count} 个键")
839 def _cleanup_websocket_connections(self, session_id: str):
840 """清理WebSocket连接"""
841 try:
842 # 导入WebSocket服务
843 from core.services.websocket_service import websocket_service
845 # 断开WebSocket连接
846 if websocket_service.is_connected(session_id):
847 websocket_service.disconnect(session_id)
848 print(f"✅ 已断开WebSocket连接: {session_id}")
850 except Exception as e:
851 print(f"❌ 清理WebSocket连接失败: {e}")
853 def _delete_session_backtest_data(self, session_id: str):
854 """删除会话回测相关数据"""
855 backtest_keys = [
856 f"backtest_engine:{session_id}",
857 f"backtest_progress:{session_id}",
858 f"backtest_status:{session_id}",
859 f"backtest_config:{session_id}",
860 f"backtest_history:{session_id}",
861 f"backtest_metrics:{session_id}",
862 f"backtest_trades:{session_id}",
863 f"backtest_daily_returns:{session_id}",
864 f"backtest_time_series:{session_id}",
865 f"backtest_candlestick_data:{session_id}",
866 ]
868 deleted_count = 0
869 for backtest_key in backtest_keys:
870 if self.redis_client.client.delete(backtest_key):
871 deleted_count += 1
873 print(f"✅ 已删除回测数据: {deleted_count} 个键")