Coverage for core/repositories/asset_repository.py: 27.61%

355 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-13 18:58 +0000

1""" 

2资产数据仓库(修复版本) 

3""" 

4 

5import json 

6from datetime import datetime, timezone 

7from decimal import Decimal 

8from typing import List, Optional 

9 

10from infrastructure.database.redis_client import get_redis 

11 

12from ..models.asset import (AssetOverview, AssetType, CurrencyType, MarketType, 

13 SimulatedAsset, SimulatedAssetCreate, 

14 SimulatedAssetOverview, SimulatedAssetResponse, 

15 SimulatedAssetUpdate, SimulatedPosition, 

16 SimulatedPositionCreate, SimulatedPositionResponse, 

17 SimulatedPositionUpdate, UserAsset, 

18 UserAssetCreate, UserAssetResponse, 

19 UserAssetUpdate, UserPosition, UserPositionCreate, 

20 UserPositionResponse, UserPositionUpdate) 

21 

22 

23class AssetRepository: 

24 """资产数据仓库""" 

25 

26 def __init__(self): 

27 self.redis_client = get_redis() 

28 

29 # 用户资产相关方法 

30 def create_user_asset(self, asset: UserAssetCreate) -> UserAssetResponse: 

31 """创建用户资产(只存储现金,总资产动态计算)""" 

32 asset_id = f"asset_{asset.user_id}_{int(datetime.now().timestamp())}" 

33 

34 asset_data = { 

35 "id": asset_id, 

36 "user_id": asset.user_id, 

37 "cash_assets": float(asset.cash_assets), 

38 "currency": asset.currency.value, 

39 "created_at": datetime.now().isoformat(), 

40 "updated_at": datetime.now().isoformat(), 

41 } 

42 

43 # 存储到 Redis(包含货币信息) 

44 self.redis_client.client.hset( 

45 f"user_asset:{asset.user_id}:{asset.currency.value}", mapping=asset_data 

46 ) 

47 

48 # 确保有id字段 

49 asset_data["id"] = asset_data.get( 

50 "id", f"asset_{asset_data['user_id']}_{asset_data['currency']}" 

51 ) 

52 

53 return UserAssetResponse(**asset_data) 

54 

55 def get_user_asset(self, user_id: str) -> Optional[UserAssetResponse]: 

56 """获取用户资产""" 

57 # 获取所有资产记录 

58 asset_keys = self.redis_client.client.keys(f"user_asset:{user_id}*") 

59 if not asset_keys: 

60 return None 

61 

62 # 获取最新的资产记录 

63 latest_asset = None 

64 latest_time = None 

65 

66 for asset_key in asset_keys: 

67 asset_data = self.redis_client.client.hgetall(asset_key) 

68 if asset_data and asset_data.get("updated_at"): 

69 try: 

70 # 解码字节字符串 

71 asset_data = { 

72 k.decode("utf-8") if isinstance(k, bytes) else k: ( 

73 v.decode("utf-8") if isinstance(v, bytes) else v 

74 ) 

75 for k, v in asset_data.items() 

76 } 

77 updated_at = datetime.fromisoformat(asset_data["updated_at"]) 

78 if latest_time is None or updated_at > latest_time: 

79 latest_time = updated_at 

80 latest_asset = asset_data 

81 except Exception: 

82 continue 

83 

84 if not latest_asset: 

85 return None 

86 

87 # 转换数据类型 

88 latest_asset["id"] = latest_asset.get( 

89 "id", f"asset_{latest_asset['user_id']}_{latest_asset['currency']}" 

90 ) 

91 latest_asset["total_assets"] = Decimal( 

92 str(latest_asset.get("total_assets", "0")) 

93 ) 

94 latest_asset["cash_assets"] = Decimal(str(latest_asset["cash_assets"])) 

95 latest_asset["position_assets"] = Decimal( 

96 str(latest_asset.get("position_assets", "0")) 

97 ) 

98 if latest_asset.get("today_pnl"): 

99 latest_asset["today_pnl"] = Decimal(str(latest_asset["today_pnl"])) 

100 latest_asset["currency"] = CurrencyType(latest_asset["currency"]) 

101 latest_asset["created_at"] = datetime.fromisoformat(latest_asset["created_at"]) 

102 latest_asset["updated_at"] = datetime.fromisoformat(latest_asset["updated_at"]) 

103 

104 return UserAssetResponse(**latest_asset) 

105 

106 def get_latest_user_asset(self, user_id: str) -> Optional[UserAssetResponse]: 

107 """获取最新用户资产记录(按更新时间排序)""" 

108 # 获取所有资产记录 

109 asset_keys = self.redis_client.client.keys(f"user_asset:{user_id}*") 

110 if not asset_keys: 

111 return None 

112 

113 latest_asset = None 

114 latest_time = None 

115 

116 for asset_key in asset_keys: 

117 asset_data = self.redis_client.client.hgetall(asset_key) 

118 if asset_data and asset_data.get("updated_at"): 

119 try: 

120 # 解码字节字符串 

121 asset_data = { 

122 k.decode("utf-8") if isinstance(k, bytes) else k: ( 

123 v.decode("utf-8") if isinstance(v, bytes) else v 

124 ) 

125 for k, v in asset_data.items() 

126 } 

127 updated_at = datetime.fromisoformat(asset_data["updated_at"]) 

128 if latest_time is None or updated_at > latest_time: 

129 latest_time = updated_at 

130 latest_asset = asset_data 

131 except Exception: 

132 continue 

133 

134 if not latest_asset: 

135 return None 

136 

137 # 调试:打印latest_asset的内容 

138 

139 # 转换数据类型 

140 latest_asset["id"] = latest_asset.get( 

141 "id", f"asset_{latest_asset['user_id']}_{latest_asset['currency']}" 

142 ) 

143 latest_asset["total_assets"] = Decimal( 

144 str(latest_asset.get("total_assets", "0")) 

145 ) 

146 latest_asset["cash_assets"] = Decimal(str(latest_asset["cash_assets"])) 

147 latest_asset["position_assets"] = Decimal( 

148 str(latest_asset.get("position_assets", "0")) 

149 ) 

150 if latest_asset.get("today_pnl"): 

151 latest_asset["today_pnl"] = Decimal(str(latest_asset["today_pnl"])) 

152 latest_asset["currency"] = CurrencyType(latest_asset["currency"]) 

153 latest_asset["created_at"] = datetime.fromisoformat(latest_asset["created_at"]) 

154 latest_asset["updated_at"] = datetime.fromisoformat(latest_asset["updated_at"]) 

155 

156 # 创建UserAssetResponse时保持naive datetime 

157 return UserAssetResponse( 

158 id=latest_asset.get( 

159 "id", f"asset_{latest_asset['user_id']}_{latest_asset['currency']}" 

160 ), 

161 user_id=latest_asset["user_id"], 

162 total_assets=latest_asset["total_assets"], 

163 cash_assets=latest_asset["cash_assets"], 

164 position_assets=latest_asset["position_assets"], 

165 today_pnl=latest_asset.get("today_pnl"), 

166 currency=latest_asset["currency"], 

167 created_at=latest_asset["created_at"], 

168 updated_at=latest_asset["updated_at"], 

169 ) 

170 

171 def update_user_asset( 

172 self, user_id: str, asset_update: UserAssetUpdate 

173 ) -> Optional[UserAssetResponse]: 

174 """更新用户资产""" 

175 existing_asset = self.get_user_asset(user_id) 

176 if not existing_asset: 

177 return None 

178 

179 # 更新字段 

180 update_data = {} 

181 if asset_update.total_assets is not None: 

182 update_data["total_assets"] = float(asset_update.total_assets) 

183 if asset_update.cash_assets is not None: 

184 update_data["cash_assets"] = float(asset_update.cash_assets) 

185 if asset_update.position_assets is not None: 

186 update_data["position_assets"] = float(asset_update.position_assets) 

187 if asset_update.today_pnl is not None: 

188 update_data["today_pnl"] = float(asset_update.today_pnl) 

189 if asset_update.currency is not None: 

190 update_data["currency"] = asset_update.currency.value 

191 

192 update_data["updated_at"] = datetime.now().isoformat() 

193 

194 # 更新到 Redis 

195 self.redis_client.client.hset(f"user_asset:{user_id}", mapping=update_data) 

196 

197 return self.get_user_asset(user_id) 

198 

199 def delete_user_asset(self, user_id: str) -> bool: 

200 """删除用户资产(所有货币)""" 

201 # 获取所有资产记录 

202 asset_keys = self.redis_client.client.keys(f"user_asset:{user_id}*") 

203 if not asset_keys: 

204 return True # 没有资产记录,认为删除成功 

205 

206 # 删除所有资产记录 

207 deleted_count = 0 

208 for asset_key in asset_keys: 

209 result = self.redis_client.client.delete(asset_key) 

210 if result > 0: 

211 deleted_count += 1 

212 

213 return deleted_count > 0 

214 

215 # 用户持仓相关方法 

216 def create_user_position( 

217 self, position: UserPositionCreate 

218 ) -> UserPositionResponse: 

219 """创建用户持仓""" 

220 position_id = f"position_{position.user_id}_{position.symbol}_{int(datetime.now().timestamp())}" 

221 

222 position_data = { 

223 "id": position_id, 

224 "user_id": position.user_id, 

225 "symbol": position.symbol, 

226 "symbol_name": position.symbol_name, 

227 "asset_type": position.asset_type.value, 

228 "quantity": float(position.quantity), 

229 "available_quantity": float(position.available_quantity), 

230 "cost_price": float(position.cost_price), 

231 "current_price": ( 

232 float(position.current_price) if position.current_price else None 

233 ), 

234 "market": position.market.value, 

235 "currency": position.currency.value, 

236 "created_at": datetime.now().isoformat(), 

237 "updated_at": datetime.now().isoformat(), 

238 } 

239 

240 # 存储到 Redis 

241 self.redis_client.client.hset( 

242 f"user_position:{position_id}", mapping=position_data 

243 ) 

244 

245 # 添加到用户持仓列表 

246 self.redis_client.client.sadd(f"user_positions:{position.user_id}", position_id) 

247 

248 return UserPositionResponse(**position_data) 

249 

250 def get_user_positions(self, user_id: str) -> List[UserPositionResponse]: 

251 """获取用户持仓列表""" 

252 position_ids = self.redis_client.client.smembers(f"user_positions:{user_id}") 

253 positions = [] 

254 

255 for position_id in position_ids: 

256 position_data = self.redis_client.client.hgetall( 

257 f"user_position:{position_id}" 

258 ) 

259 if position_data: 

260 # 转换数据类型 

261 position_data["quantity"] = Decimal(str(position_data["quantity"])) 

262 position_data["available_quantity"] = Decimal( 

263 str(position_data["available_quantity"]) 

264 ) 

265 position_data["cost_price"] = Decimal(str(position_data["cost_price"])) 

266 if position_data.get("current_price"): 

267 position_data["current_price"] = Decimal( 

268 str(position_data["current_price"]) 

269 ) 

270 position_data["asset_type"] = AssetType(position_data["asset_type"]) 

271 position_data["market"] = MarketType(position_data["market"]) 

272 position_data["currency"] = CurrencyType(position_data["currency"]) 

273 position_data["created_at"] = datetime.fromisoformat( 

274 position_data["created_at"] 

275 ) 

276 position_data["updated_at"] = datetime.fromisoformat( 

277 position_data["updated_at"] 

278 ) 

279 

280 positions.append(UserPositionResponse(**position_data)) 

281 

282 return positions 

283 

284 def clear_user_positions(self, user_id: str) -> bool: 

285 """清除用户的所有持仓数据""" 

286 try: 

287 # 获取用户的所有持仓ID 

288 position_ids = self.redis_client.client.smembers( 

289 f"user_positions:{user_id}" 

290 ) 

291 

292 # 删除每个持仓的详细数据 

293 for position_id in position_ids: 

294 position_id = ( 

295 position_id.decode("utf-8") 

296 if isinstance(position_id, bytes) 

297 else position_id 

298 ) 

299 self.redis_client.client.delete(f"user_position:{position_id}") 

300 

301 # 删除用户的持仓列表 

302 self.redis_client.client.delete(f"user_positions:{user_id}") 

303 

304 return True 

305 except Exception as e: 

306 print(f"❌ 清除用户持仓失败: {e}") 

307 return False 

308 

309 def update_user_position( 

310 self, position_id: str, position_update: UserPositionUpdate 

311 ) -> Optional[UserPositionResponse]: 

312 """更新用户持仓""" 

313 existing_position = self.redis_client.client.hgetall( 

314 f"user_position:{position_id}" 

315 ) 

316 if not existing_position: 

317 return None 

318 

319 # 更新字段 

320 update_data = {} 

321 if position_update.symbol_name is not None: 

322 update_data["symbol_name"] = position_update.symbol_name 

323 if position_update.quantity is not None: 

324 update_data["quantity"] = float(position_update.quantity) 

325 if position_update.available_quantity is not None: 

326 update_data["available_quantity"] = float( 

327 position_update.available_quantity 

328 ) 

329 if position_update.cost_price is not None: 

330 update_data["cost_price"] = float(position_update.cost_price) 

331 if position_update.current_price is not None: 

332 update_data["current_price"] = float(position_update.current_price) 

333 if position_update.currency is not None: 

334 update_data["currency"] = position_update.currency.value 

335 

336 update_data["updated_at"] = datetime.now().isoformat() 

337 

338 # 更新到 Redis 

339 self.redis_client.client.hset( 

340 f"user_position:{position_id}", mapping=update_data 

341 ) 

342 

343 # 重新获取更新后的数据 

344 updated_data = self.redis_client.client.hgetall(f"user_position:{position_id}") 

345 if updated_data: 

346 # 转换数据类型 

347 updated_data["quantity"] = Decimal(str(updated_data["quantity"])) 

348 updated_data["available_quantity"] = Decimal( 

349 str(updated_data["available_quantity"]) 

350 ) 

351 updated_data["cost_price"] = Decimal(str(updated_data["cost_price"])) 

352 if updated_data.get("current_price"): 

353 updated_data["current_price"] = Decimal( 

354 str(updated_data["current_price"]) 

355 ) 

356 updated_data["asset_type"] = AssetType(updated_data["asset_type"]) 

357 updated_data["market"] = MarketType(updated_data["market"]) 

358 updated_data["currency"] = CurrencyType(updated_data["currency"]) 

359 updated_data["created_at"] = datetime.fromisoformat( 

360 updated_data["created_at"] 

361 ) 

362 updated_data["updated_at"] = datetime.fromisoformat( 

363 updated_data["updated_at"] 

364 ) 

365 

366 return UserPositionResponse(**updated_data) 

367 

368 return None 

369 

370 def delete_user_position(self, position_id: str) -> bool: 

371 """删除用户持仓""" 

372 # 获取持仓信息以获取用户ID 

373 position_data = self.redis_client.client.hgetall(f"user_position:{position_id}") 

374 if not position_data: 

375 return False 

376 

377 user_id = position_data["user_id"] 

378 

379 # 删除持仓数据 

380 result = self.redis_client.client.delete(f"user_position:{position_id}") 

381 

382 # 从用户持仓列表中移除 

383 self.redis_client.client.srem(f"user_positions:{user_id}", position_id) 

384 

385 return result > 0 

386 

387 def delete_user_positions(self, user_id: str) -> bool: 

388 """删除用户的所有持仓""" 

389 # 获取用户的所有持仓ID 

390 position_ids = self.redis_client.client.smembers(f"user_positions:{user_id}") 

391 

392 if not position_ids: 

393 return True # 没有持仓数据,认为删除成功 

394 

395 # 删除所有持仓数据 

396 deleted_count = 0 

397 for position_id in position_ids: 

398 position_id_str = ( 

399 position_id.decode() if isinstance(position_id, bytes) else position_id 

400 ) 

401 result = self.redis_client.client.delete(f"user_position:{position_id_str}") 

402 if result > 0: 

403 deleted_count += 1 

404 

405 # 删除用户持仓列表 

406 self.redis_client.client.delete(f"user_positions:{user_id}") 

407 

408 return deleted_count > 0 

409 

410 # 模拟资产相关方法 

411 def create_simulated_asset( 

412 self, asset: SimulatedAssetCreate 

413 ) -> SimulatedAssetResponse: 

414 """创建模拟资产(只存储现金,总资产动态计算)""" 

415 asset_id = f"sim_asset_{asset.user_id}_{int(datetime.now().timestamp())}" 

416 

417 asset_data = { 

418 "id": asset_id, 

419 "user_id": asset.user_id, 

420 "cash_assets": float(asset.cash_assets), 

421 "currency": asset.currency.value, 

422 "created_at": datetime.now().isoformat(), 

423 "updated_at": datetime.now().isoformat(), 

424 } 

425 

426 # 存储到 Redis(包含货币信息) 

427 self.redis_client.client.hset( 

428 f"simulated_asset:{asset.user_id}:{asset.currency.value}", 

429 mapping=asset_data, 

430 ) 

431 

432 return SimulatedAssetResponse(**asset_data) 

433 

434 def get_simulated_asset( 

435 self, user_id: str, currency: Optional[CurrencyType] = None 

436 ) -> Optional[SimulatedAssetResponse]: 

437 """获取模拟资产(支持多货币)""" 

438 if currency: 

439 # 获取特定货币的资产 

440 asset_data = self.redis_client.client.hgetall( 

441 f"simulated_asset:{user_id}:{currency.value}" 

442 ) 

443 else: 

444 # 获取第一个可用的资产(向后兼容) 

445 asset_keys = self.redis_client.client.keys(f"simulated_asset:{user_id}*") 

446 if not asset_keys: 

447 return None 

448 asset_data = self.redis_client.client.hgetall(asset_keys[0]) 

449 

450 if not asset_data: 

451 return None 

452 

453 # 转换数据类型 

454 asset_data["cash_assets"] = Decimal(str(asset_data["cash_assets"])) 

455 asset_data["currency"] = CurrencyType(asset_data["currency"]) 

456 asset_data["created_at"] = datetime.fromisoformat(asset_data["created_at"]) 

457 asset_data["updated_at"] = datetime.fromisoformat(asset_data["updated_at"]) 

458 

459 return SimulatedAssetResponse(**asset_data) 

460 

461 def update_simulated_asset( 

462 self, user_id: str, asset_update: SimulatedAssetUpdate 

463 ) -> Optional[SimulatedAssetResponse]: 

464 """更新模拟资产(支持多货币现金编辑)""" 

465 # 检查该货币的资产是否存在 

466 existing_asset = self.get_simulated_asset(user_id, asset_update.currency) 

467 

468 if existing_asset: 

469 # 更新现有资产 

470 update_data = { 

471 "cash_assets": float(asset_update.cash_assets), 

472 "updated_at": datetime.now().isoformat(), 

473 } 

474 

475 # 更新到 Redis 

476 self.redis_client.client.hset( 

477 f"simulated_asset:{user_id}:{asset_update.currency.value}", 

478 mapping=update_data, 

479 ) 

480 else: 

481 # 创建新的资产记录 

482 asset_data = { 

483 "id": f"sim_asset_{user_id}_{int(datetime.now().timestamp())}", 

484 "user_id": user_id, 

485 "cash_assets": float(asset_update.cash_assets), 

486 "currency": asset_update.currency.value, 

487 "created_at": datetime.now().isoformat(), 

488 "updated_at": datetime.now().isoformat(), 

489 } 

490 

491 # 存储到 Redis 

492 self.redis_client.client.hset( 

493 f"simulated_asset:{user_id}:{asset_update.currency.value}", 

494 mapping=asset_data, 

495 ) 

496 

497 # 返回更新后的数据 

498 return self.get_simulated_asset(user_id, asset_update.currency) 

499 

500 # 模拟持仓相关方法 

501 def create_simulated_position( 

502 self, position: SimulatedPositionCreate 

503 ) -> SimulatedPositionResponse: 

504 """创建模拟持仓""" 

505 position_id = f"sim_position_{position.user_id}_{position.symbol}_{int(datetime.now().timestamp())}" 

506 

507 position_data = { 

508 "id": position_id, 

509 "user_id": position.user_id, 

510 "symbol": position.symbol, 

511 "symbol_name": position.symbol_name, 

512 "asset_type": position.asset_type.value, 

513 "quantity": float(position.quantity), 

514 "cost_price": float(position.cost_price), 

515 "current_price": ( 

516 float(position.current_price) if position.current_price else None 

517 ), 

518 "market": position.market.value, 

519 "currency": position.currency.value, 

520 "created_at": datetime.now().isoformat(), 

521 "updated_at": datetime.now().isoformat(), 

522 } 

523 

524 # 存储到 Redis 

525 self.redis_client.client.hset( 

526 f"simulated_position:{position_id}", mapping=position_data 

527 ) 

528 

529 # 添加到用户模拟持仓列表 

530 self.redis_client.client.sadd( 

531 f"simulated_positions:{position.user_id}", position_id 

532 ) 

533 

534 return SimulatedPositionResponse(**position_data) 

535 

536 def get_simulated_positions(self, user_id: str) -> List[SimulatedPositionResponse]: 

537 """获取模拟持仓列表""" 

538 position_ids = self.redis_client.client.smembers( 

539 f"simulated_positions:{user_id}" 

540 ) 

541 positions = [] 

542 

543 for position_id in position_ids: 

544 position_data = self.redis_client.client.hgetall( 

545 f"simulated_position:{position_id}" 

546 ) 

547 if position_data: 

548 # 转换数据类型 

549 position_data["quantity"] = Decimal(str(position_data["quantity"])) 

550 position_data["cost_price"] = Decimal(str(position_data["cost_price"])) 

551 if position_data.get("current_price"): 

552 position_data["current_price"] = Decimal( 

553 str(position_data["current_price"]) 

554 ) 

555 position_data["asset_type"] = AssetType(position_data["asset_type"]) 

556 position_data["market"] = MarketType(position_data["market"]) 

557 position_data["currency"] = CurrencyType(position_data["currency"]) 

558 position_data["created_at"] = datetime.fromisoformat( 

559 position_data["created_at"] 

560 ) 

561 position_data["updated_at"] = datetime.fromisoformat( 

562 position_data["updated_at"] 

563 ) 

564 

565 positions.append(SimulatedPositionResponse(**position_data)) 

566 

567 return positions 

568 

569 def update_simulated_position( 

570 self, position_id: str, position_update: SimulatedPositionUpdate 

571 ) -> Optional[SimulatedPositionResponse]: 

572 """更新模拟持仓""" 

573 existing_position = self.redis_client.client.hgetall( 

574 f"simulated_position:{position_id}" 

575 ) 

576 if not existing_position: 

577 return None 

578 

579 # 更新字段 

580 update_data = {} 

581 if position_update.symbol_name is not None: 

582 update_data["symbol_name"] = position_update.symbol_name 

583 if position_update.quantity is not None: 

584 update_data["quantity"] = float(position_update.quantity) 

585 if position_update.cost_price is not None: 

586 update_data["cost_price"] = float(position_update.cost_price) 

587 if position_update.current_price is not None: 

588 update_data["current_price"] = float(position_update.current_price) 

589 if position_update.currency is not None: 

590 update_data["currency"] = position_update.currency.value 

591 

592 update_data["updated_at"] = datetime.now().isoformat() 

593 

594 # 更新到 Redis 

595 self.redis_client.client.hset( 

596 f"simulated_position:{position_id}", mapping=update_data 

597 ) 

598 

599 # 重新获取更新后的数据 

600 updated_data = self.redis_client.client.hgetall( 

601 f"simulated_position:{position_id}" 

602 ) 

603 if updated_data: 

604 # 转换数据类型 

605 updated_data["quantity"] = Decimal(str(updated_data["quantity"])) 

606 updated_data["cost_price"] = Decimal(str(updated_data["cost_price"])) 

607 if updated_data.get("current_price"): 

608 updated_data["current_price"] = Decimal( 

609 str(updated_data["current_price"]) 

610 ) 

611 updated_data["asset_type"] = AssetType(updated_data["asset_type"]) 

612 updated_data["market"] = MarketType(updated_data["market"]) 

613 updated_data["currency"] = CurrencyType(updated_data["currency"]) 

614 updated_data["created_at"] = datetime.fromisoformat( 

615 updated_data["created_at"] 

616 ) 

617 updated_data["updated_at"] = datetime.fromisoformat( 

618 updated_data["updated_at"] 

619 ) 

620 

621 return SimulatedPositionResponse(**updated_data) 

622 

623 return None 

624 

625 def delete_simulated_position(self, position_id: str) -> bool: 

626 """删除模拟持仓""" 

627 # 获取持仓信息以获取用户ID 

628 position_data = self.redis_client.client.hgetall( 

629 f"simulated_position:{position_id}" 

630 ) 

631 if not position_data: 

632 return False 

633 

634 user_id = position_data["user_id"] 

635 

636 # 删除持仓数据 

637 result = self.redis_client.client.delete(f"simulated_position:{position_id}") 

638 

639 # 从用户模拟持仓列表中移除 

640 self.redis_client.client.srem(f"simulated_positions:{user_id}", position_id) 

641 

642 return result > 0 

643 

644 # 资产概览方法 

645 def get_asset_overview(self, user_id: str) -> Optional[AssetOverview]: 

646 """获取资产概览(支持多货币,动态计算总资产)""" 

647 # 获取用户的所有资产记录(按货币分组) 

648 cash_assets_by_currency = {} 

649 

650 # 获取所有资产记录(只获取现金部分) 

651 asset_keys = self.redis_client.client.keys(f"user_asset:{user_id}*") 

652 for asset_key in asset_keys: 

653 asset_data = self.redis_client.client.hgetall(asset_key) 

654 if asset_data: 

655 currency = CurrencyType(asset_data["currency"]) 

656 cash_assets_by_currency[currency] = Decimal( 

657 str(asset_data["cash_assets"]) 

658 ) 

659 

660 # 获取所有持仓 

661 positions = self.get_user_positions(user_id) 

662 

663 # 按货币分组持仓并计算持仓资产 

664 positions_by_currency = {} 

665 position_assets_by_currency = {} 

666 

667 for position in positions: 

668 currency = position.currency 

669 if currency not in positions_by_currency: 

670 positions_by_currency[currency] = [] 

671 position_assets_by_currency[currency] = Decimal("0") 

672 positions_by_currency[currency].append(position) 

673 

674 # 计算持仓资产(数量 × 当前价格) 

675 position_value = position.quantity * position.current_price 

676 position_assets_by_currency[currency] += position_value 

677 

678 # 动态计算总资产(现金 + 持仓资产) 

679 total_assets_by_currency = {} 

680 for currency in set( 

681 list(cash_assets_by_currency.keys()) 

682 + list(position_assets_by_currency.keys()) 

683 ): 

684 cash = cash_assets_by_currency.get(currency, Decimal("0")) 

685 position_assets = position_assets_by_currency.get(currency, Decimal("0")) 

686 total_assets = cash + position_assets 

687 

688 total_assets_by_currency[currency] = { 

689 "total_assets": total_assets, 

690 "cash_assets": cash, 

691 "position_assets": position_assets, 

692 "today_pnl": Decimal("0"), # 暂时设为0,后续可以从API获取 

693 } 

694 

695 if not total_assets_by_currency: 

696 return None 

697 

698 # 创建多货币概览 

699 return AssetOverview( 

700 total_assets_by_currency=total_assets_by_currency, 

701 cash_assets_by_currency=cash_assets_by_currency, 

702 position_assets_by_currency=position_assets_by_currency, 

703 positions_by_currency=positions_by_currency, 

704 positions=positions, 

705 ) 

706 

707 def get_simulated_asset_overview( 

708 self, user_id: str 

709 ) -> Optional[SimulatedAssetOverview]: 

710 """获取模拟资产概览(支持多货币,动态计算总资产)""" 

711 # 获取用户的所有模拟资产记录(按货币分组) 

712 cash_assets_by_currency = {} 

713 

714 # 获取所有模拟资产记录(只获取现金部分) 

715 asset_keys = self.redis_client.client.keys(f"simulated_asset:{user_id}*") 

716 for asset_key in asset_keys: 

717 asset_data = self.redis_client.client.hgetall(asset_key) 

718 if asset_data: 

719 currency = CurrencyType(asset_data["currency"]) 

720 cash_assets_by_currency[currency] = Decimal( 

721 str(asset_data["cash_assets"]) 

722 ) 

723 

724 # 获取所有模拟持仓 

725 positions = self.get_simulated_positions(user_id) 

726 

727 # 按货币分组持仓并计算持仓资产 

728 positions_by_currency = {} 

729 position_assets_by_currency = {} 

730 

731 for position in positions: 

732 currency = position.currency 

733 if currency not in positions_by_currency: 

734 positions_by_currency[currency] = [] 

735 position_assets_by_currency[currency] = Decimal("0") 

736 positions_by_currency[currency].append(position) 

737 

738 # 计算持仓资产(数量 × 当前价格) 

739 position_value = position.quantity * position.current_price 

740 position_assets_by_currency[currency] += position_value 

741 

742 # 动态计算总资产(现金 + 持仓资产) 

743 total_assets_by_currency = {} 

744 for currency in set( 

745 list(cash_assets_by_currency.keys()) 

746 + list(position_assets_by_currency.keys()) 

747 ): 

748 cash = cash_assets_by_currency.get(currency, Decimal("0")) 

749 position_assets = position_assets_by_currency.get(currency, Decimal("0")) 

750 total_assets = cash + position_assets 

751 

752 total_assets_by_currency[currency] = { 

753 "total_assets": total_assets, 

754 "cash_assets": cash, 

755 "position_assets": position_assets, 

756 "today_pnl": Decimal("0"), # 暂时设为0,后续可以从API获取 

757 } 

758 

759 if not total_assets_by_currency: 

760 return None 

761 

762 # 创建多货币概览 

763 return SimulatedAssetOverview( 

764 total_assets_by_currency=total_assets_by_currency, 

765 positions_by_currency=positions_by_currency, 

766 positions=positions, 

767 ) 

768 

769 def delete_simulated_asset(self, user_id: str) -> bool: 

770 """删除模拟资产(所有货币)""" 

771 # 获取所有模拟资产记录 

772 asset_keys = self.redis_client.client.keys(f"simulated_asset:{user_id}*") 

773 if not asset_keys: 

774 return True # 没有资产记录,认为删除成功 

775 

776 # 删除所有资产记录 

777 deleted_count = 0 

778 for asset_key in asset_keys: 

779 result = self.redis_client.client.delete(asset_key) 

780 if result > 0: 

781 deleted_count += 1 

782 

783 return deleted_count > 0 

784 

785 def delete_simulated_positions(self, user_id: str) -> bool: 

786 """删除用户的所有模拟持仓""" 

787 # 获取用户的所有模拟持仓ID 

788 position_ids = self.redis_client.client.smembers( 

789 f"simulated_positions:{user_id}" 

790 ) 

791 

792 if not position_ids: 

793 return True # 没有持仓数据,认为删除成功 

794 

795 # 删除所有持仓数据 

796 deleted_count = 0 

797 for position_id in position_ids: 

798 position_id_str = ( 

799 position_id.decode() if isinstance(position_id, bytes) else position_id 

800 ) 

801 result = self.redis_client.client.delete( 

802 f"simulated_position:{position_id_str}" 

803 ) 

804 if result > 0: 

805 deleted_count += 1 

806 

807 # 删除用户持仓列表 

808 self.redis_client.client.delete(f"simulated_positions:{user_id}") 

809 

810 return deleted_count > 0