Coverage for core/repositories/trading_repository.py: 38.97%

331 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-13 18:58 +0000

1""" 

2交易数据访问层 

3""" 

4 

5import json 

6import logging 

7import uuid 

8from datetime import datetime 

9from decimal import Decimal 

10from typing import Any, Dict, List, Optional 

11 

12logger = logging.getLogger(__name__) 

13 

14 

15class DecimalEncoder(json.JSONEncoder): 

16 """Decimal类型JSON编码器""" 

17 

18 def default(self, obj): 

19 if isinstance(obj, Decimal): 

20 return float(obj) 

21 elif isinstance(obj, datetime): 

22 return obj.isoformat() 

23 return super().default(obj) 

24 

25 

26from core.models.trading import (AssetMode, OrderSide, OrderStatus, OrderType, 

27 PositionHistoryCreate, 

28 PositionHistoryResponse, SessionStatus, 

29 TradingMode, TradingOrderCreate, 

30 TradingOrderResponse, TradingOrderUpdate, 

31 TradingSessionCreate, TradingSessionResponse, 

32 TradingSessionUpdate) 

33from infrastructure.database.redis_client import get_redis 

34 

35 

36class TradingRepository: 

37 """交易数据访问层""" 

38 

39 def __init__(self): 

40 self.redis_client = get_redis() 

41 

42 # ===== 交易会话管理 ===== 

43 

44 def create_trading_session( 

45 self, session_data: TradingSessionCreate, user_id: str 

46 ) -> TradingSessionResponse: 

47 """创建交易会话""" 

48 session_id = str(uuid.uuid4()) 

49 now = datetime.now() 

50 

51 session_dict = { 

52 "id": session_id, 

53 "user_id": user_id, 

54 "session_name": session_data.session_name, 

55 "trading_mode": session_data.trading_mode.value, 

56 "asset_mode": session_data.asset_mode.value, 

57 "strategy_name": session_data.strategy_name, 

58 "status": SessionStatus.CREATED.value, 

59 "start_time": None, 

60 "end_time": None, 

61 "initial_capital": str(session_data.initial_capital), 

62 "final_capital": str( 

63 session_data.initial_capital 

64 ), # 期末总资产初始等于期初总资产 

65 "initial_capital_by_currency": ( 

66 json.dumps(session_data.initial_capital_by_currency, cls=DecimalEncoder) 

67 if session_data.initial_capital_by_currency 

68 else None 

69 ), 

70 "final_capital_by_currency": ( 

71 json.dumps(session_data.initial_capital_by_currency, cls=DecimalEncoder) 

72 if session_data.initial_capital_by_currency 

73 else None 

74 ), # 期末总资产初始等于期初总资产 

75 "config": session_data.model_dump(), 

76 "strategy_params": json.dumps( 

77 session_data.strategy_params, cls=DecimalEncoder 

78 ), 

79 "tradable_symbols": ( 

80 json.dumps(session_data.tradable_symbols, cls=DecimalEncoder) 

81 if session_data.tradable_symbols 

82 else None 

83 ), 

84 "performance_metrics": None, 

85 "created_at": now.isoformat(), 

86 "updated_at": now.isoformat(), 

87 } 

88 

89 # 存储到Redis 

90 key = f"trading_session:{session_id}" 

91 # 过滤掉None值并转换字典为JSON字符串 

92 filtered_dict = {} 

93 for k, v in session_dict.items(): 

94 if v is not None: 

95 if isinstance(v, dict): 

96 filtered_dict[k] = json.dumps(v, cls=DecimalEncoder) 

97 else: 

98 filtered_dict[k] = str(v) 

99 self.redis_client.client.hset(key, mapping=filtered_dict) 

100 

101 # 添加到用户会话列表 

102 user_sessions_key = f"user_trading_sessions:{user_id}" 

103 self.redis_client.client.sadd(user_sessions_key, session_id) 

104 

105 return self._dict_to_session_response(session_dict) 

106 

107 def get_trading_session(self, session_id: str) -> Optional[TradingSessionResponse]: 

108 """获取交易会话""" 

109 key = f"trading_session:{session_id}" 

110 session_dict = self.redis_client.client.hgetall(key) 

111 

112 if not session_dict: 

113 return None 

114 

115 return self._dict_to_session_response(session_dict) 

116 

117 def update_trading_session( 

118 self, session_id: str, update_data: TradingSessionUpdate 

119 ) -> Optional[TradingSessionResponse]: 

120 """更新交易会话""" 

121 key = f"trading_session:{session_id}" 

122 

123 # 检查会话是否存在 

124 if not self.redis_client.client.exists(key): 

125 return None 

126 

127 # 准备更新数据 

128 update_dict = {} 

129 if update_data.session_name is not None: 

130 update_dict["session_name"] = update_data.session_name 

131 if update_data.status is not None: 

132 update_dict["status"] = update_data.status.value 

133 if update_data.risk_config is not None: 

134 # 更新配置中的风险配置 

135 current_config = self.redis_client.client.hget(key, "config") 

136 if current_config: 

137 config = json.loads(current_config) 

138 config["risk_config"] = update_data.risk_config.dict() 

139 update_dict["config"] = json.dumps(config) 

140 

141 update_dict["updated_at"] = datetime.now().isoformat() 

142 

143 # 更新Redis 

144 if update_dict: 

145 self.redis_client.client.hset(key, mapping=update_dict) 

146 

147 return self.get_trading_session(session_id) 

148 

149 def delete_trading_session(self, session_id: str) -> bool: 

150 """删除交易会话及其所有关联数据""" 

151 key = f"trading_session:{session_id}" 

152 

153 # 获取会话信息 

154 session_dict = self.redis_client.client.hgetall(key) 

155 if not session_dict: 

156 return False 

157 

158 user_id = session_dict.get("user_id") 

159 

160 print(f"🗑️ 开始删除交易会话: {session_id}") 

161 

162 # 1. 删除会话基本信息 

163 self.redis_client.client.delete(key) 

164 print(f"✅ 已删除会话基本信息: {key}") 

165 

166 # 2. 从用户会话列表中移除 

167 if user_id: 

168 user_sessions_key = f"user_trading_sessions:{user_id}" 

169 self.redis_client.client.srem(user_sessions_key, session_id) 

170 print(f"✅ 已从用户会话列表中移除: {user_sessions_key}") 

171 

172 # 3. 删除相关订单记录 

173 self._delete_session_orders(session_id) 

174 

175 # 4. 删除相关持仓记录 

176 self._delete_session_positions(session_id) 

177 

178 # 5. 删除策略日志 

179 self._delete_session_logs(session_id) 

180 

181 # 6. 删除风险相关数据 

182 self._delete_session_risk_data(session_id) 

183 

184 # 7. 删除性能指标数据 

185 self._delete_session_performance_data(session_id) 

186 

187 # 8. 删除实时监控数据 

188 self._delete_session_monitoring_data(session_id) 

189 

190 # 9. 删除模拟交易引擎数据 

191 self._delete_session_simulation_data(session_id) 

192 

193 # 10. 清理WebSocket连接 

194 self._cleanup_websocket_connections(session_id) 

195 

196 # 11. 删除回测相关数据 

197 self._delete_session_backtest_data(session_id) 

198 

199 print(f"🎉 交易会话删除完成: {session_id}") 

200 return True 

201 

202 def get_user_trading_sessions( 

203 self, 

204 user_id: str, 

205 status: Optional[SessionStatus] = None, 

206 limit: int = 10, 

207 offset: int = 0, 

208 ) -> List[TradingSessionResponse]: 

209 """获取用户交易会话列表""" 

210 user_sessions_key = f"user_trading_sessions:{user_id}" 

211 session_ids = self.redis_client.client.smembers(user_sessions_key) 

212 

213 sessions = [] 

214 for session_id in session_ids: 

215 session = self.get_trading_session( 

216 session_id.decode() if isinstance(session_id, bytes) else session_id 

217 ) 

218 if session: 

219 if status is None or session.status == status: 

220 sessions.append(session) 

221 

222 # 按创建时间倒序排序 

223 sessions.sort(key=lambda x: x.created_at, reverse=True) 

224 

225 # 分页 

226 return sessions[offset : offset + limit] 

227 

228 # ===== 交易订单管理 ===== 

229 

230 def create_trading_order( 

231 self, order_data: TradingOrderCreate 

232 ) -> TradingOrderResponse: 

233 """创建交易订单""" 

234 order_id = str(uuid.uuid4()) 

235 now = datetime.now() 

236 

237 order_dict = { 

238 "id": order_id, 

239 "session_id": order_data.session_id, 

240 "symbol": order_data.symbol, 

241 "side": order_data.side.value, 

242 "order_type": order_data.order_type.value, 

243 "quantity": str(order_data.quantity), 

244 "price": str(order_data.price) if order_data.price else None, 

245 "status": OrderStatus.PENDING.value, 

246 "submitted_at": now.isoformat(), 

247 "filled_at": None, 

248 "cancelled_at": None, 

249 "filled_price": None, 

250 "filled_quantity": None, 

251 "commission": "0", 

252 "trading_fee": ( 

253 json.dumps(order_data.trading_fee, cls=DecimalEncoder) 

254 if order_data.trading_fee 

255 else None 

256 ), 

257 } 

258 

259 # 存储到Redis 

260 key = f"trading_order:{order_id}" 

261 # 过滤掉None值并转换字典为JSON字符串 

262 filtered_dict = {} 

263 for k, v in order_dict.items(): 

264 if v is not None: 

265 if isinstance(v, dict): 

266 filtered_dict[k] = json.dumps(v, cls=DecimalEncoder) 

267 else: 

268 filtered_dict[k] = str(v) 

269 self.redis_client.client.hset(key, mapping=filtered_dict) 

270 

271 # 添加到会话订单列表 

272 session_orders_key = f"session_orders:{order_data.session_id}" 

273 self.redis_client.client.sadd(session_orders_key, order_id) 

274 

275 return self._dict_to_order_response(order_dict) 

276 

277 def get_trading_order(self, order_id: str) -> Optional[TradingOrderResponse]: 

278 """获取交易订单""" 

279 key = f"trading_order:{order_id}" 

280 order_dict = self.redis_client.client.hgetall(key) 

281 

282 if not order_dict: 

283 return None 

284 

285 return self._dict_to_order_response(order_dict) 

286 

287 def update_trading_order( 

288 self, order_id: str, update_data: TradingOrderUpdate 

289 ) -> Optional[TradingOrderResponse]: 

290 """更新交易订单""" 

291 key = f"trading_order:{order_id}" 

292 

293 # 检查订单是否存在 

294 if not self.redis_client.client.exists(key): 

295 return None 

296 

297 # 准备更新数据 

298 update_dict = {} 

299 if update_data.status is not None: 

300 update_dict["status"] = update_data.status.value 

301 if update_data.status == OrderStatus.FILLED: 

302 update_dict["filled_at"] = datetime.now().isoformat() 

303 elif update_data.status == OrderStatus.CANCELLED: 

304 update_dict["cancelled_at"] = datetime.now().isoformat() 

305 

306 if update_data.filled_price is not None: 

307 update_dict["filled_price"] = str(update_data.filled_price) 

308 if update_data.filled_quantity is not None: 

309 update_dict["filled_quantity"] = str(update_data.filled_quantity) 

310 if update_data.commission is not None: 

311 update_dict["commission"] = str(update_data.commission) 

312 if update_data.trading_fee is not None: 

313 update_dict["trading_fee"] = json.dumps( 

314 update_data.trading_fee, cls=DecimalEncoder 

315 ) 

316 

317 # 更新Redis 

318 if update_dict: 

319 self.redis_client.client.hset(key, mapping=update_dict) 

320 

321 return self.get_trading_order(order_id) 

322 

323 def get_session_orders( 

324 self, 

325 session_id: str, 

326 status: Optional[OrderStatus] = None, 

327 limit: int = 100, 

328 offset: int = 0, 

329 ) -> List[TradingOrderResponse]: 

330 """获取会话订单列表""" 

331 session_orders_key = f"session_orders:{session_id}" 

332 order_ids = self.redis_client.client.smembers(session_orders_key) 

333 

334 orders = [] 

335 for order_id in order_ids: 

336 order = self.get_trading_order( 

337 order_id.decode() if isinstance(order_id, bytes) else order_id 

338 ) 

339 if order: 

340 if status is None or order.status == status: 

341 orders.append(order) 

342 

343 # 按提交时间倒序排序 

344 orders.sort(key=lambda x: x.submitted_at, reverse=True) 

345 

346 # 分页 

347 return orders[offset : offset + limit] 

348 

349 def get_pending_orders(self, session_id: str) -> List[TradingOrderResponse]: 

350 """获取待处理订单列表""" 

351 return self.get_session_orders(session_id, OrderStatus.PENDING) 

352 

353 # ===== 持仓记录管理 ===== 

354 

355 def create_position_history( 

356 self, position_data: PositionHistoryCreate 

357 ) -> PositionHistoryResponse: 

358 """创建持仓记录""" 

359 position_id = str(uuid.uuid4()) 

360 now = datetime.now() 

361 

362 position_dict = { 

363 "id": position_id, 

364 "session_id": position_data.session_id, 

365 "symbol": position_data.symbol, 

366 "quantity": str(position_data.quantity), 

367 "avg_price": str(position_data.avg_price), 

368 "market_value": str(position_data.market_value), 

369 "unrealized_pnl": str(position_data.unrealized_pnl), 

370 "realized_pnl": str(position_data.realized_pnl), 

371 "timestamp": now.isoformat(), 

372 } 

373 

374 # 存储到Redis 

375 key = f"position_history:{position_id}" 

376 self.redis_client.client.hset(key, mapping=position_dict) 

377 

378 # 添加到会话持仓列表 

379 session_positions_key = f"session_positions:{position_data.session_id}" 

380 self.redis_client.client.sadd(session_positions_key, position_id) 

381 

382 return self._dict_to_position_response(position_dict) 

383 

384 def get_position_history( 

385 self, position_id: str 

386 ) -> Optional[PositionHistoryResponse]: 

387 """获取持仓记录""" 

388 key = f"position_history:{position_id}" 

389 position_dict = self.redis_client.client.hgetall(key) 

390 

391 if not position_dict: 

392 return None 

393 

394 return self._dict_to_position_response(position_dict) 

395 

396 def get_session_positions( 

397 self, session_id: str, limit: int = 100, offset: int = 0 

398 ) -> List[PositionHistoryResponse]: 

399 """获取会话持仓记录列表""" 

400 session_positions_key = f"session_positions:{session_id}" 

401 position_ids = self.redis_client.client.smembers(session_positions_key) 

402 

403 positions = [] 

404 for position_id in position_ids: 

405 position = self.get_position_history( 

406 position_id.decode() if isinstance(position_id, bytes) else position_id 

407 ) 

408 if position: 

409 positions.append(position) 

410 

411 # 按时间戳倒序排序 

412 positions.sort(key=lambda x: x.timestamp, reverse=True) 

413 

414 # 分页 

415 return positions[offset : offset + limit] 

416 

417 def get_latest_positions( 

418 self, session_id: str 

419 ) -> Dict[str, PositionHistoryResponse]: 

420 """获取最新持仓快照""" 

421 positions = self.get_session_positions(session_id, limit=1000) 

422 

423 # 按股票代码分组,取最新的记录 

424 latest_positions = {} 

425 for position in positions: 

426 if ( 

427 position.symbol not in latest_positions 

428 or position.timestamp > latest_positions[position.symbol].timestamp 

429 ): 

430 latest_positions[position.symbol] = position 

431 

432 return latest_positions 

433 

434 def add_session_log(self, session_id: str, log_entry: Dict[str, Any]) -> bool: 

435 """添加会话日志""" 

436 try: 

437 # 使用统一的日志键格式 

438 logs_key = f"session_logs:{session_id}" 

439 

440 # 确保日志条目有必要的字段 

441 if "id" not in log_entry: 

442 log_entry["id"] = str(uuid.uuid4()) 

443 if "timestamp" not in log_entry: 

444 log_entry["timestamp"] = datetime.now().isoformat( 

445 timespec="milliseconds" 

446 ) 

447 

448 # 保存到Redis 

449 self.redis_client.client.lpush( 

450 logs_key, json.dumps(log_entry, cls=DecimalEncoder) 

451 ) 

452 self.redis_client.client.ltrim(logs_key, 0, 99999) # 保留最近10万条 

453 

454 return True 

455 except Exception as e: 

456 logger.error(f"添加会话日志失败: {e}") 

457 return False 

458 

459 def get_session_logs( 

460 self, 

461 session_id: str, 

462 page: int = 1, 

463 page_size: int = 20, 

464 search: Optional[str] = None, 

465 start_date: Optional[str] = None, 

466 end_date: Optional[str] = None, 

467 component: Optional[str] = None, 

468 ) -> Dict[str, Any]: 

469 """获取会话策略日志""" 

470 try: 

471 # 从Redis获取日志数据 

472 logs_key = f"session_logs:{session_id}" 

473 

474 # 获取所有日志 

475 all_logs = self.redis_client.lrange(logs_key, 0, -1) 

476 

477 # 解析日志数据 

478 logs = [] 

479 for log_str in all_logs: 

480 try: 

481 log_data = json.loads(log_str) 

482 

483 # 应用筛选条件 

484 if ( 

485 search 

486 and search.lower() not in log_data.get("message", "").lower() 

487 ): 

488 continue 

489 

490 if component and component != log_data.get("component", ""): 

491 continue 

492 

493 if start_date: 

494 log_time = datetime.fromisoformat(log_data.get("timestamp", "")) 

495 start_dt = datetime.fromisoformat( 

496 start_date.replace("Z", "+00:00") 

497 ) 

498 # 确保两个datetime对象都有时区信息 

499 if log_time.tzinfo is None: 

500 from datetime import timezone 

501 

502 log_time = log_time.replace(tzinfo=timezone.utc) 

503 if start_dt.tzinfo is None: 

504 from datetime import timezone 

505 

506 start_dt = start_dt.replace(tzinfo=timezone.utc) 

507 if log_time < start_dt: 

508 continue 

509 

510 if end_date: 

511 log_time = datetime.fromisoformat(log_data.get("timestamp", "")) 

512 end_dt = datetime.fromisoformat(end_date.replace("Z", "+00:00")) 

513 # 确保两个datetime对象都有时区信息 

514 if log_time.tzinfo is None: 

515 from datetime import timezone 

516 

517 log_time = log_time.replace(tzinfo=timezone.utc) 

518 if end_dt.tzinfo is None: 

519 from datetime import timezone 

520 

521 end_dt = end_dt.replace(tzinfo=timezone.utc) 

522 if log_time > end_dt: 

523 continue 

524 

525 # 为没有id的日志条目生成唯一id 

526 log_id = log_data.get("id", "") 

527 if not log_id: 

528 import uuid 

529 

530 log_id = str(uuid.uuid4()) 

531 

532 logs.append( 

533 { 

534 "id": log_id, 

535 "sequence": log_data.get("sequence", 0), 

536 "timestamp": log_data.get("timestamp", ""), 

537 "level": log_data.get("level", "INFO"), 

538 "component": log_data.get("component", "unknown"), 

539 "message": log_data.get("message", ""), 

540 "data": log_data.get("data", {}), 

541 } 

542 ) 

543 

544 except Exception as e: 

545 logger.error(f"解析日志数据失败: {e}") 

546 continue 

547 

548 # 按序号倒序排序(最新的日志序号最大) 

549 logs.sort(key=lambda x: x.get("sequence", 0), reverse=True) 

550 

551 # 分页 

552 total = len(logs) 

553 start_idx = (page - 1) * page_size 

554 end_idx = start_idx + page_size 

555 paginated_logs = logs[start_idx:end_idx] 

556 

557 return { 

558 "logs": paginated_logs, 

559 "total": total, 

560 "page": page, 

561 "page_size": page_size, 

562 } 

563 

564 except Exception as e: 

565 logger.error(f"获取会话日志失败: {e}") 

566 return {"logs": [], "total": 0, "page": page, "page_size": page_size} 

567 

568 # ===== 辅助方法 ===== 

569 

570 def _dict_to_session_response( 

571 self, session_dict: Dict[str, Any] 

572 ) -> TradingSessionResponse: 

573 """将字典转换为会话响应对象""" 

574 # 处理字节字符串 

575 if isinstance(session_dict, dict): 

576 session_dict = { 

577 k.decode() if isinstance(k, bytes) else k: ( 

578 v.decode() if isinstance(v, bytes) else v 

579 ) 

580 for k, v in session_dict.items() 

581 } 

582 

583 # 获取按货币分组的资产信息 

584 initial_capital_by_currency = None 

585 final_capital_by_currency = None 

586 

587 # 暂时设为None,避免循环导入和性能问题 

588 # TODO: 后续可以通过缓存或其他方式优化 

589 

590 return TradingSessionResponse( 

591 id=session_dict["id"], 

592 user_id=session_dict["user_id"], 

593 session_name=session_dict["session_name"], 

594 trading_mode=TradingMode(session_dict["trading_mode"]), 

595 asset_mode=AssetMode(session_dict["asset_mode"]), 

596 strategy_name=session_dict["strategy_name"], 

597 status=SessionStatus(session_dict["status"]), 

598 start_time=( 

599 datetime.fromisoformat(session_dict["start_time"]) 

600 if session_dict.get("start_time") 

601 else None 

602 ), 

603 end_time=( 

604 datetime.fromisoformat(session_dict["end_time"]) 

605 if session_dict.get("end_time") 

606 else None 

607 ), 

608 initial_capital=Decimal(session_dict["initial_capital"]), 

609 final_capital=( 

610 Decimal(session_dict["final_capital"]) 

611 if session_dict.get("final_capital") 

612 else None 

613 ), 

614 initial_capital_by_currency=( 

615 json.loads(session_dict["initial_capital_by_currency"]) 

616 if session_dict.get("initial_capital_by_currency") 

617 else None 

618 ), 

619 final_capital_by_currency=( 

620 json.loads(session_dict["final_capital_by_currency"]) 

621 if session_dict.get("final_capital_by_currency") 

622 else None 

623 ), 

624 config=( 

625 json.loads(session_dict["config"]) 

626 if isinstance(session_dict["config"], str) 

627 else session_dict["config"] 

628 ), 

629 strategy_params=( 

630 json.loads(session_dict["strategy_params"]) 

631 if session_dict.get("strategy_params") 

632 else {} 

633 ), 

634 tradable_symbols=( 

635 json.loads(session_dict["tradable_symbols"]) 

636 if session_dict.get("tradable_symbols") 

637 else None 

638 ), 

639 performance_metrics=( 

640 json.loads(session_dict["performance_metrics"]) 

641 if session_dict.get("performance_metrics") 

642 else None 

643 ), 

644 created_at=datetime.fromisoformat(session_dict["created_at"]), 

645 updated_at=datetime.fromisoformat(session_dict["updated_at"]), 

646 ) 

647 

648 def _dict_to_order_response( 

649 self, order_dict: Dict[str, Any] 

650 ) -> TradingOrderResponse: 

651 """将字典转换为订单响应对象""" 

652 # 处理字节字符串 

653 if isinstance(order_dict, dict): 

654 order_dict = { 

655 k.decode() if isinstance(k, bytes) else k: ( 

656 v.decode() if isinstance(v, bytes) else v 

657 ) 

658 for k, v in order_dict.items() 

659 } 

660 

661 # 处理trading_fee字段 

662 trading_fee = None 

663 if order_dict.get("trading_fee"): 

664 try: 

665 import json 

666 

667 trading_fee = ( 

668 json.loads(order_dict["trading_fee"]) 

669 if isinstance(order_dict["trading_fee"], str) 

670 else order_dict["trading_fee"] 

671 ) 

672 except (json.JSONDecodeError, TypeError): 

673 trading_fee = None 

674 

675 return TradingOrderResponse( 

676 id=order_dict["id"], 

677 session_id=order_dict["session_id"], 

678 symbol=order_dict["symbol"], 

679 side=OrderSide(order_dict["side"]), 

680 order_type=OrderType(order_dict["order_type"]), 

681 quantity=Decimal(order_dict["quantity"]), 

682 price=Decimal(order_dict["price"]) if order_dict.get("price") else None, 

683 status=OrderStatus(order_dict["status"]), 

684 submitted_at=datetime.fromisoformat(order_dict["submitted_at"]), 

685 filled_at=( 

686 datetime.fromisoformat(order_dict["filled_at"]) 

687 if order_dict.get("filled_at") 

688 else None 

689 ), 

690 cancelled_at=( 

691 datetime.fromisoformat(order_dict["cancelled_at"]) 

692 if order_dict.get("cancelled_at") 

693 else None 

694 ), 

695 filled_price=( 

696 Decimal(order_dict["filled_price"]) 

697 if order_dict.get("filled_price") 

698 else None 

699 ), 

700 filled_quantity=( 

701 Decimal(order_dict["filled_quantity"]) 

702 if order_dict.get("filled_quantity") 

703 else None 

704 ), 

705 commission=Decimal(order_dict.get("commission", "0")), 

706 trading_fee=trading_fee, 

707 ) 

708 

709 def _dict_to_position_response( 

710 self, position_dict: Dict[str, Any] 

711 ) -> PositionHistoryResponse: 

712 """将字典转换为持仓响应对象""" 

713 # 处理字节字符串 

714 if isinstance(position_dict, dict): 

715 position_dict = { 

716 k.decode() if isinstance(k, bytes) else k: ( 

717 v.decode() if isinstance(v, bytes) else v 

718 ) 

719 for k, v in position_dict.items() 

720 } 

721 

722 return PositionHistoryResponse( 

723 id=position_dict["id"], 

724 session_id=position_dict["session_id"], 

725 symbol=position_dict["symbol"], 

726 quantity=Decimal(position_dict["quantity"]), 

727 avg_price=Decimal(position_dict["avg_price"]), 

728 market_value=Decimal(position_dict["market_value"]), 

729 unrealized_pnl=Decimal(position_dict["unrealized_pnl"]), 

730 realized_pnl=Decimal(position_dict["realized_pnl"]), 

731 timestamp=datetime.fromisoformat(position_dict["timestamp"]), 

732 ) 

733 

734 def _delete_session_orders(self, session_id: str): 

735 """删除会话相关订单""" 

736 session_orders_key = f"session_orders:{session_id}" 

737 order_ids = self.redis_client.client.smembers(session_orders_key) 

738 

739 for order_id in order_ids: 

740 order_id_str = ( 

741 order_id.decode() if isinstance(order_id, bytes) else order_id 

742 ) 

743 self.redis_client.client.delete(f"trading_order:{order_id_str}") 

744 

745 self.redis_client.client.delete(session_orders_key) 

746 

747 def _delete_session_positions(self, session_id: str): 

748 """删除会话相关持仓记录""" 

749 session_positions_key = f"session_positions:{session_id}" 

750 position_ids = self.redis_client.client.smembers(session_positions_key) 

751 

752 deleted_count = 0 

753 for position_id in position_ids: 

754 position_id_str = ( 

755 position_id.decode() if isinstance(position_id, bytes) else position_id 

756 ) 

757 if self.redis_client.client.delete(f"position_history:{position_id_str}"): 

758 deleted_count += 1 

759 

760 self.redis_client.client.delete(session_positions_key) 

761 print(f"✅ 已删除持仓记录: {deleted_count}") 

762 

763 def _delete_session_logs(self, session_id: str): 

764 """删除会话策略日志""" 

765 logs_key = f"trading_session:{session_id}:logs" 

766 if self.redis_client.client.delete(logs_key): 

767 print(f"✅ 已删除策略日志: {logs_key}") 

768 

769 def _delete_session_risk_data(self, session_id: str): 

770 """删除会话风险相关数据""" 

771 risk_keys = [ 

772 f"risk_config:{session_id}", 

773 f"risk_events:{session_id}", 

774 f"risk_summary:{session_id}", 

775 f"risk_metrics:{session_id}", 

776 f"risk_violations:{session_id}", 

777 f"risk_alerts:{session_id}", 

778 ] 

779 

780 deleted_count = 0 

781 for risk_key in risk_keys: 

782 if self.redis_client.client.delete(risk_key): 

783 deleted_count += 1 

784 

785 print(f"✅ 已删除风险数据: {deleted_count} 个键") 

786 

787 def _delete_session_performance_data(self, session_id: str): 

788 """删除会话性能指标数据""" 

789 performance_keys = [ 

790 f"performance_metrics:{session_id}", 

791 f"daily_returns:{session_id}", 

792 f"trade_history:{session_id}", 

793 f"backtest_results:{session_id}", 

794 f"strategy_performance:{session_id}", 

795 ] 

796 

797 deleted_count = 0 

798 for perf_key in performance_keys: 

799 if self.redis_client.client.delete(perf_key): 

800 deleted_count += 1 

801 

802 print(f"✅ 已删除性能数据: {deleted_count} 个键") 

803 

804 def _delete_session_monitoring_data(self, session_id: str): 

805 """删除会话实时监控数据""" 

806 monitoring_keys = [ 

807 f"monitoring:{session_id}", 

808 f"realtime_data:{session_id}", 

809 f"market_data_cache:{session_id}", 

810 f"websocket_connections:{session_id}", 

811 f"session_status:{session_id}", 

812 ] 

813 

814 deleted_count = 0 

815 for monitor_key in monitoring_keys: 

816 if self.redis_client.client.delete(monitor_key): 

817 deleted_count += 1 

818 

819 print(f"✅ 已删除监控数据: {deleted_count} 个键") 

820 

821 def _delete_session_simulation_data(self, session_id: str): 

822 """删除会话模拟交易引擎数据""" 

823 simulation_keys = [ 

824 f"simulation_engine:{session_id}", 

825 f"simulation_orders:{session_id}", 

826 f"simulation_positions:{session_id}", 

827 f"simulation_trades:{session_id}", 

828 f"simulation_balance:{session_id}", 

829 f"simulation_metrics:{session_id}", 

830 ] 

831 

832 deleted_count = 0 

833 for sim_key in simulation_keys: 

834 if self.redis_client.client.delete(sim_key): 

835 deleted_count += 1 

836 

837 print(f"✅ 已删除模拟交易数据: {deleted_count} 个键") 

838 

839 def _cleanup_websocket_connections(self, session_id: str): 

840 """清理WebSocket连接""" 

841 try: 

842 # 导入WebSocket服务 

843 from core.services.websocket_service import websocket_service 

844 

845 # 断开WebSocket连接 

846 if websocket_service.is_connected(session_id): 

847 websocket_service.disconnect(session_id) 

848 print(f"✅ 已断开WebSocket连接: {session_id}") 

849 

850 except Exception as e: 

851 print(f"❌ 清理WebSocket连接失败: {e}") 

852 

853 def _delete_session_backtest_data(self, session_id: str): 

854 """删除会话回测相关数据""" 

855 backtest_keys = [ 

856 f"backtest_engine:{session_id}", 

857 f"backtest_progress:{session_id}", 

858 f"backtest_status:{session_id}", 

859 f"backtest_config:{session_id}", 

860 f"backtest_history:{session_id}", 

861 f"backtest_metrics:{session_id}", 

862 f"backtest_trades:{session_id}", 

863 f"backtest_daily_returns:{session_id}", 

864 f"backtest_time_series:{session_id}", 

865 f"backtest_candlestick_data:{session_id}", 

866 ] 

867 

868 deleted_count = 0 

869 for backtest_key in backtest_keys: 

870 if self.redis_client.client.delete(backtest_key): 

871 deleted_count += 1 

872 

873 print(f"✅ 已删除回测数据: {deleted_count} 个键")