Coverage for core/repositories/asset_repository.py: 27.61%
355 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-13 18:58 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-13 18:58 +0000
1"""
2资产数据仓库(修复版本)
3"""
5import json
6from datetime import datetime, timezone
7from decimal import Decimal
8from typing import List, Optional
10from infrastructure.database.redis_client import get_redis
12from ..models.asset import (AssetOverview, AssetType, CurrencyType, MarketType,
13 SimulatedAsset, SimulatedAssetCreate,
14 SimulatedAssetOverview, SimulatedAssetResponse,
15 SimulatedAssetUpdate, SimulatedPosition,
16 SimulatedPositionCreate, SimulatedPositionResponse,
17 SimulatedPositionUpdate, UserAsset,
18 UserAssetCreate, UserAssetResponse,
19 UserAssetUpdate, UserPosition, UserPositionCreate,
20 UserPositionResponse, UserPositionUpdate)
23class AssetRepository:
24 """资产数据仓库"""
26 def __init__(self):
27 self.redis_client = get_redis()
29 # 用户资产相关方法
30 def create_user_asset(self, asset: UserAssetCreate) -> UserAssetResponse:
31 """创建用户资产(只存储现金,总资产动态计算)"""
32 asset_id = f"asset_{asset.user_id}_{int(datetime.now().timestamp())}"
34 asset_data = {
35 "id": asset_id,
36 "user_id": asset.user_id,
37 "cash_assets": float(asset.cash_assets),
38 "currency": asset.currency.value,
39 "created_at": datetime.now().isoformat(),
40 "updated_at": datetime.now().isoformat(),
41 }
43 # 存储到 Redis(包含货币信息)
44 self.redis_client.client.hset(
45 f"user_asset:{asset.user_id}:{asset.currency.value}", mapping=asset_data
46 )
48 # 确保有id字段
49 asset_data["id"] = asset_data.get(
50 "id", f"asset_{asset_data['user_id']}_{asset_data['currency']}"
51 )
53 return UserAssetResponse(**asset_data)
55 def get_user_asset(self, user_id: str) -> Optional[UserAssetResponse]:
56 """获取用户资产"""
57 # 获取所有资产记录
58 asset_keys = self.redis_client.client.keys(f"user_asset:{user_id}*")
59 if not asset_keys:
60 return None
62 # 获取最新的资产记录
63 latest_asset = None
64 latest_time = None
66 for asset_key in asset_keys:
67 asset_data = self.redis_client.client.hgetall(asset_key)
68 if asset_data and asset_data.get("updated_at"):
69 try:
70 # 解码字节字符串
71 asset_data = {
72 k.decode("utf-8") if isinstance(k, bytes) else k: (
73 v.decode("utf-8") if isinstance(v, bytes) else v
74 )
75 for k, v in asset_data.items()
76 }
77 updated_at = datetime.fromisoformat(asset_data["updated_at"])
78 if latest_time is None or updated_at > latest_time:
79 latest_time = updated_at
80 latest_asset = asset_data
81 except Exception:
82 continue
84 if not latest_asset:
85 return None
87 # 转换数据类型
88 latest_asset["id"] = latest_asset.get(
89 "id", f"asset_{latest_asset['user_id']}_{latest_asset['currency']}"
90 )
91 latest_asset["total_assets"] = Decimal(
92 str(latest_asset.get("total_assets", "0"))
93 )
94 latest_asset["cash_assets"] = Decimal(str(latest_asset["cash_assets"]))
95 latest_asset["position_assets"] = Decimal(
96 str(latest_asset.get("position_assets", "0"))
97 )
98 if latest_asset.get("today_pnl"):
99 latest_asset["today_pnl"] = Decimal(str(latest_asset["today_pnl"]))
100 latest_asset["currency"] = CurrencyType(latest_asset["currency"])
101 latest_asset["created_at"] = datetime.fromisoformat(latest_asset["created_at"])
102 latest_asset["updated_at"] = datetime.fromisoformat(latest_asset["updated_at"])
104 return UserAssetResponse(**latest_asset)
106 def get_latest_user_asset(self, user_id: str) -> Optional[UserAssetResponse]:
107 """获取最新用户资产记录(按更新时间排序)"""
108 # 获取所有资产记录
109 asset_keys = self.redis_client.client.keys(f"user_asset:{user_id}*")
110 if not asset_keys:
111 return None
113 latest_asset = None
114 latest_time = None
116 for asset_key in asset_keys:
117 asset_data = self.redis_client.client.hgetall(asset_key)
118 if asset_data and asset_data.get("updated_at"):
119 try:
120 # 解码字节字符串
121 asset_data = {
122 k.decode("utf-8") if isinstance(k, bytes) else k: (
123 v.decode("utf-8") if isinstance(v, bytes) else v
124 )
125 for k, v in asset_data.items()
126 }
127 updated_at = datetime.fromisoformat(asset_data["updated_at"])
128 if latest_time is None or updated_at > latest_time:
129 latest_time = updated_at
130 latest_asset = asset_data
131 except Exception:
132 continue
134 if not latest_asset:
135 return None
137 # 调试:打印latest_asset的内容
139 # 转换数据类型
140 latest_asset["id"] = latest_asset.get(
141 "id", f"asset_{latest_asset['user_id']}_{latest_asset['currency']}"
142 )
143 latest_asset["total_assets"] = Decimal(
144 str(latest_asset.get("total_assets", "0"))
145 )
146 latest_asset["cash_assets"] = Decimal(str(latest_asset["cash_assets"]))
147 latest_asset["position_assets"] = Decimal(
148 str(latest_asset.get("position_assets", "0"))
149 )
150 if latest_asset.get("today_pnl"):
151 latest_asset["today_pnl"] = Decimal(str(latest_asset["today_pnl"]))
152 latest_asset["currency"] = CurrencyType(latest_asset["currency"])
153 latest_asset["created_at"] = datetime.fromisoformat(latest_asset["created_at"])
154 latest_asset["updated_at"] = datetime.fromisoformat(latest_asset["updated_at"])
156 # 创建UserAssetResponse时保持naive datetime
157 return UserAssetResponse(
158 id=latest_asset.get(
159 "id", f"asset_{latest_asset['user_id']}_{latest_asset['currency']}"
160 ),
161 user_id=latest_asset["user_id"],
162 total_assets=latest_asset["total_assets"],
163 cash_assets=latest_asset["cash_assets"],
164 position_assets=latest_asset["position_assets"],
165 today_pnl=latest_asset.get("today_pnl"),
166 currency=latest_asset["currency"],
167 created_at=latest_asset["created_at"],
168 updated_at=latest_asset["updated_at"],
169 )
171 def update_user_asset(
172 self, user_id: str, asset_update: UserAssetUpdate
173 ) -> Optional[UserAssetResponse]:
174 """更新用户资产"""
175 existing_asset = self.get_user_asset(user_id)
176 if not existing_asset:
177 return None
179 # 更新字段
180 update_data = {}
181 if asset_update.total_assets is not None:
182 update_data["total_assets"] = float(asset_update.total_assets)
183 if asset_update.cash_assets is not None:
184 update_data["cash_assets"] = float(asset_update.cash_assets)
185 if asset_update.position_assets is not None:
186 update_data["position_assets"] = float(asset_update.position_assets)
187 if asset_update.today_pnl is not None:
188 update_data["today_pnl"] = float(asset_update.today_pnl)
189 if asset_update.currency is not None:
190 update_data["currency"] = asset_update.currency.value
192 update_data["updated_at"] = datetime.now().isoformat()
194 # 更新到 Redis
195 self.redis_client.client.hset(f"user_asset:{user_id}", mapping=update_data)
197 return self.get_user_asset(user_id)
199 def delete_user_asset(self, user_id: str) -> bool:
200 """删除用户资产(所有货币)"""
201 # 获取所有资产记录
202 asset_keys = self.redis_client.client.keys(f"user_asset:{user_id}*")
203 if not asset_keys:
204 return True # 没有资产记录,认为删除成功
206 # 删除所有资产记录
207 deleted_count = 0
208 for asset_key in asset_keys:
209 result = self.redis_client.client.delete(asset_key)
210 if result > 0:
211 deleted_count += 1
213 return deleted_count > 0
215 # 用户持仓相关方法
216 def create_user_position(
217 self, position: UserPositionCreate
218 ) -> UserPositionResponse:
219 """创建用户持仓"""
220 position_id = f"position_{position.user_id}_{position.symbol}_{int(datetime.now().timestamp())}"
222 position_data = {
223 "id": position_id,
224 "user_id": position.user_id,
225 "symbol": position.symbol,
226 "symbol_name": position.symbol_name,
227 "asset_type": position.asset_type.value,
228 "quantity": float(position.quantity),
229 "available_quantity": float(position.available_quantity),
230 "cost_price": float(position.cost_price),
231 "current_price": (
232 float(position.current_price) if position.current_price else None
233 ),
234 "market": position.market.value,
235 "currency": position.currency.value,
236 "created_at": datetime.now().isoformat(),
237 "updated_at": datetime.now().isoformat(),
238 }
240 # 存储到 Redis
241 self.redis_client.client.hset(
242 f"user_position:{position_id}", mapping=position_data
243 )
245 # 添加到用户持仓列表
246 self.redis_client.client.sadd(f"user_positions:{position.user_id}", position_id)
248 return UserPositionResponse(**position_data)
250 def get_user_positions(self, user_id: str) -> List[UserPositionResponse]:
251 """获取用户持仓列表"""
252 position_ids = self.redis_client.client.smembers(f"user_positions:{user_id}")
253 positions = []
255 for position_id in position_ids:
256 position_data = self.redis_client.client.hgetall(
257 f"user_position:{position_id}"
258 )
259 if position_data:
260 # 转换数据类型
261 position_data["quantity"] = Decimal(str(position_data["quantity"]))
262 position_data["available_quantity"] = Decimal(
263 str(position_data["available_quantity"])
264 )
265 position_data["cost_price"] = Decimal(str(position_data["cost_price"]))
266 if position_data.get("current_price"):
267 position_data["current_price"] = Decimal(
268 str(position_data["current_price"])
269 )
270 position_data["asset_type"] = AssetType(position_data["asset_type"])
271 position_data["market"] = MarketType(position_data["market"])
272 position_data["currency"] = CurrencyType(position_data["currency"])
273 position_data["created_at"] = datetime.fromisoformat(
274 position_data["created_at"]
275 )
276 position_data["updated_at"] = datetime.fromisoformat(
277 position_data["updated_at"]
278 )
280 positions.append(UserPositionResponse(**position_data))
282 return positions
284 def clear_user_positions(self, user_id: str) -> bool:
285 """清除用户的所有持仓数据"""
286 try:
287 # 获取用户的所有持仓ID
288 position_ids = self.redis_client.client.smembers(
289 f"user_positions:{user_id}"
290 )
292 # 删除每个持仓的详细数据
293 for position_id in position_ids:
294 position_id = (
295 position_id.decode("utf-8")
296 if isinstance(position_id, bytes)
297 else position_id
298 )
299 self.redis_client.client.delete(f"user_position:{position_id}")
301 # 删除用户的持仓列表
302 self.redis_client.client.delete(f"user_positions:{user_id}")
304 return True
305 except Exception as e:
306 print(f"❌ 清除用户持仓失败: {e}")
307 return False
309 def update_user_position(
310 self, position_id: str, position_update: UserPositionUpdate
311 ) -> Optional[UserPositionResponse]:
312 """更新用户持仓"""
313 existing_position = self.redis_client.client.hgetall(
314 f"user_position:{position_id}"
315 )
316 if not existing_position:
317 return None
319 # 更新字段
320 update_data = {}
321 if position_update.symbol_name is not None:
322 update_data["symbol_name"] = position_update.symbol_name
323 if position_update.quantity is not None:
324 update_data["quantity"] = float(position_update.quantity)
325 if position_update.available_quantity is not None:
326 update_data["available_quantity"] = float(
327 position_update.available_quantity
328 )
329 if position_update.cost_price is not None:
330 update_data["cost_price"] = float(position_update.cost_price)
331 if position_update.current_price is not None:
332 update_data["current_price"] = float(position_update.current_price)
333 if position_update.currency is not None:
334 update_data["currency"] = position_update.currency.value
336 update_data["updated_at"] = datetime.now().isoformat()
338 # 更新到 Redis
339 self.redis_client.client.hset(
340 f"user_position:{position_id}", mapping=update_data
341 )
343 # 重新获取更新后的数据
344 updated_data = self.redis_client.client.hgetall(f"user_position:{position_id}")
345 if updated_data:
346 # 转换数据类型
347 updated_data["quantity"] = Decimal(str(updated_data["quantity"]))
348 updated_data["available_quantity"] = Decimal(
349 str(updated_data["available_quantity"])
350 )
351 updated_data["cost_price"] = Decimal(str(updated_data["cost_price"]))
352 if updated_data.get("current_price"):
353 updated_data["current_price"] = Decimal(
354 str(updated_data["current_price"])
355 )
356 updated_data["asset_type"] = AssetType(updated_data["asset_type"])
357 updated_data["market"] = MarketType(updated_data["market"])
358 updated_data["currency"] = CurrencyType(updated_data["currency"])
359 updated_data["created_at"] = datetime.fromisoformat(
360 updated_data["created_at"]
361 )
362 updated_data["updated_at"] = datetime.fromisoformat(
363 updated_data["updated_at"]
364 )
366 return UserPositionResponse(**updated_data)
368 return None
370 def delete_user_position(self, position_id: str) -> bool:
371 """删除用户持仓"""
372 # 获取持仓信息以获取用户ID
373 position_data = self.redis_client.client.hgetall(f"user_position:{position_id}")
374 if not position_data:
375 return False
377 user_id = position_data["user_id"]
379 # 删除持仓数据
380 result = self.redis_client.client.delete(f"user_position:{position_id}")
382 # 从用户持仓列表中移除
383 self.redis_client.client.srem(f"user_positions:{user_id}", position_id)
385 return result > 0
387 def delete_user_positions(self, user_id: str) -> bool:
388 """删除用户的所有持仓"""
389 # 获取用户的所有持仓ID
390 position_ids = self.redis_client.client.smembers(f"user_positions:{user_id}")
392 if not position_ids:
393 return True # 没有持仓数据,认为删除成功
395 # 删除所有持仓数据
396 deleted_count = 0
397 for position_id in position_ids:
398 position_id_str = (
399 position_id.decode() if isinstance(position_id, bytes) else position_id
400 )
401 result = self.redis_client.client.delete(f"user_position:{position_id_str}")
402 if result > 0:
403 deleted_count += 1
405 # 删除用户持仓列表
406 self.redis_client.client.delete(f"user_positions:{user_id}")
408 return deleted_count > 0
410 # 模拟资产相关方法
411 def create_simulated_asset(
412 self, asset: SimulatedAssetCreate
413 ) -> SimulatedAssetResponse:
414 """创建模拟资产(只存储现金,总资产动态计算)"""
415 asset_id = f"sim_asset_{asset.user_id}_{int(datetime.now().timestamp())}"
417 asset_data = {
418 "id": asset_id,
419 "user_id": asset.user_id,
420 "cash_assets": float(asset.cash_assets),
421 "currency": asset.currency.value,
422 "created_at": datetime.now().isoformat(),
423 "updated_at": datetime.now().isoformat(),
424 }
426 # 存储到 Redis(包含货币信息)
427 self.redis_client.client.hset(
428 f"simulated_asset:{asset.user_id}:{asset.currency.value}",
429 mapping=asset_data,
430 )
432 return SimulatedAssetResponse(**asset_data)
434 def get_simulated_asset(
435 self, user_id: str, currency: Optional[CurrencyType] = None
436 ) -> Optional[SimulatedAssetResponse]:
437 """获取模拟资产(支持多货币)"""
438 if currency:
439 # 获取特定货币的资产
440 asset_data = self.redis_client.client.hgetall(
441 f"simulated_asset:{user_id}:{currency.value}"
442 )
443 else:
444 # 获取第一个可用的资产(向后兼容)
445 asset_keys = self.redis_client.client.keys(f"simulated_asset:{user_id}*")
446 if not asset_keys:
447 return None
448 asset_data = self.redis_client.client.hgetall(asset_keys[0])
450 if not asset_data:
451 return None
453 # 转换数据类型
454 asset_data["cash_assets"] = Decimal(str(asset_data["cash_assets"]))
455 asset_data["currency"] = CurrencyType(asset_data["currency"])
456 asset_data["created_at"] = datetime.fromisoformat(asset_data["created_at"])
457 asset_data["updated_at"] = datetime.fromisoformat(asset_data["updated_at"])
459 return SimulatedAssetResponse(**asset_data)
461 def update_simulated_asset(
462 self, user_id: str, asset_update: SimulatedAssetUpdate
463 ) -> Optional[SimulatedAssetResponse]:
464 """更新模拟资产(支持多货币现金编辑)"""
465 # 检查该货币的资产是否存在
466 existing_asset = self.get_simulated_asset(user_id, asset_update.currency)
468 if existing_asset:
469 # 更新现有资产
470 update_data = {
471 "cash_assets": float(asset_update.cash_assets),
472 "updated_at": datetime.now().isoformat(),
473 }
475 # 更新到 Redis
476 self.redis_client.client.hset(
477 f"simulated_asset:{user_id}:{asset_update.currency.value}",
478 mapping=update_data,
479 )
480 else:
481 # 创建新的资产记录
482 asset_data = {
483 "id": f"sim_asset_{user_id}_{int(datetime.now().timestamp())}",
484 "user_id": user_id,
485 "cash_assets": float(asset_update.cash_assets),
486 "currency": asset_update.currency.value,
487 "created_at": datetime.now().isoformat(),
488 "updated_at": datetime.now().isoformat(),
489 }
491 # 存储到 Redis
492 self.redis_client.client.hset(
493 f"simulated_asset:{user_id}:{asset_update.currency.value}",
494 mapping=asset_data,
495 )
497 # 返回更新后的数据
498 return self.get_simulated_asset(user_id, asset_update.currency)
500 # 模拟持仓相关方法
501 def create_simulated_position(
502 self, position: SimulatedPositionCreate
503 ) -> SimulatedPositionResponse:
504 """创建模拟持仓"""
505 position_id = f"sim_position_{position.user_id}_{position.symbol}_{int(datetime.now().timestamp())}"
507 position_data = {
508 "id": position_id,
509 "user_id": position.user_id,
510 "symbol": position.symbol,
511 "symbol_name": position.symbol_name,
512 "asset_type": position.asset_type.value,
513 "quantity": float(position.quantity),
514 "cost_price": float(position.cost_price),
515 "current_price": (
516 float(position.current_price) if position.current_price else None
517 ),
518 "market": position.market.value,
519 "currency": position.currency.value,
520 "created_at": datetime.now().isoformat(),
521 "updated_at": datetime.now().isoformat(),
522 }
524 # 存储到 Redis
525 self.redis_client.client.hset(
526 f"simulated_position:{position_id}", mapping=position_data
527 )
529 # 添加到用户模拟持仓列表
530 self.redis_client.client.sadd(
531 f"simulated_positions:{position.user_id}", position_id
532 )
534 return SimulatedPositionResponse(**position_data)
536 def get_simulated_positions(self, user_id: str) -> List[SimulatedPositionResponse]:
537 """获取模拟持仓列表"""
538 position_ids = self.redis_client.client.smembers(
539 f"simulated_positions:{user_id}"
540 )
541 positions = []
543 for position_id in position_ids:
544 position_data = self.redis_client.client.hgetall(
545 f"simulated_position:{position_id}"
546 )
547 if position_data:
548 # 转换数据类型
549 position_data["quantity"] = Decimal(str(position_data["quantity"]))
550 position_data["cost_price"] = Decimal(str(position_data["cost_price"]))
551 if position_data.get("current_price"):
552 position_data["current_price"] = Decimal(
553 str(position_data["current_price"])
554 )
555 position_data["asset_type"] = AssetType(position_data["asset_type"])
556 position_data["market"] = MarketType(position_data["market"])
557 position_data["currency"] = CurrencyType(position_data["currency"])
558 position_data["created_at"] = datetime.fromisoformat(
559 position_data["created_at"]
560 )
561 position_data["updated_at"] = datetime.fromisoformat(
562 position_data["updated_at"]
563 )
565 positions.append(SimulatedPositionResponse(**position_data))
567 return positions
569 def update_simulated_position(
570 self, position_id: str, position_update: SimulatedPositionUpdate
571 ) -> Optional[SimulatedPositionResponse]:
572 """更新模拟持仓"""
573 existing_position = self.redis_client.client.hgetall(
574 f"simulated_position:{position_id}"
575 )
576 if not existing_position:
577 return None
579 # 更新字段
580 update_data = {}
581 if position_update.symbol_name is not None:
582 update_data["symbol_name"] = position_update.symbol_name
583 if position_update.quantity is not None:
584 update_data["quantity"] = float(position_update.quantity)
585 if position_update.cost_price is not None:
586 update_data["cost_price"] = float(position_update.cost_price)
587 if position_update.current_price is not None:
588 update_data["current_price"] = float(position_update.current_price)
589 if position_update.currency is not None:
590 update_data["currency"] = position_update.currency.value
592 update_data["updated_at"] = datetime.now().isoformat()
594 # 更新到 Redis
595 self.redis_client.client.hset(
596 f"simulated_position:{position_id}", mapping=update_data
597 )
599 # 重新获取更新后的数据
600 updated_data = self.redis_client.client.hgetall(
601 f"simulated_position:{position_id}"
602 )
603 if updated_data:
604 # 转换数据类型
605 updated_data["quantity"] = Decimal(str(updated_data["quantity"]))
606 updated_data["cost_price"] = Decimal(str(updated_data["cost_price"]))
607 if updated_data.get("current_price"):
608 updated_data["current_price"] = Decimal(
609 str(updated_data["current_price"])
610 )
611 updated_data["asset_type"] = AssetType(updated_data["asset_type"])
612 updated_data["market"] = MarketType(updated_data["market"])
613 updated_data["currency"] = CurrencyType(updated_data["currency"])
614 updated_data["created_at"] = datetime.fromisoformat(
615 updated_data["created_at"]
616 )
617 updated_data["updated_at"] = datetime.fromisoformat(
618 updated_data["updated_at"]
619 )
621 return SimulatedPositionResponse(**updated_data)
623 return None
625 def delete_simulated_position(self, position_id: str) -> bool:
626 """删除模拟持仓"""
627 # 获取持仓信息以获取用户ID
628 position_data = self.redis_client.client.hgetall(
629 f"simulated_position:{position_id}"
630 )
631 if not position_data:
632 return False
634 user_id = position_data["user_id"]
636 # 删除持仓数据
637 result = self.redis_client.client.delete(f"simulated_position:{position_id}")
639 # 从用户模拟持仓列表中移除
640 self.redis_client.client.srem(f"simulated_positions:{user_id}", position_id)
642 return result > 0
644 # 资产概览方法
645 def get_asset_overview(self, user_id: str) -> Optional[AssetOverview]:
646 """获取资产概览(支持多货币,动态计算总资产)"""
647 # 获取用户的所有资产记录(按货币分组)
648 cash_assets_by_currency = {}
650 # 获取所有资产记录(只获取现金部分)
651 asset_keys = self.redis_client.client.keys(f"user_asset:{user_id}*")
652 for asset_key in asset_keys:
653 asset_data = self.redis_client.client.hgetall(asset_key)
654 if asset_data:
655 currency = CurrencyType(asset_data["currency"])
656 cash_assets_by_currency[currency] = Decimal(
657 str(asset_data["cash_assets"])
658 )
660 # 获取所有持仓
661 positions = self.get_user_positions(user_id)
663 # 按货币分组持仓并计算持仓资产
664 positions_by_currency = {}
665 position_assets_by_currency = {}
667 for position in positions:
668 currency = position.currency
669 if currency not in positions_by_currency:
670 positions_by_currency[currency] = []
671 position_assets_by_currency[currency] = Decimal("0")
672 positions_by_currency[currency].append(position)
674 # 计算持仓资产(数量 × 当前价格)
675 position_value = position.quantity * position.current_price
676 position_assets_by_currency[currency] += position_value
678 # 动态计算总资产(现金 + 持仓资产)
679 total_assets_by_currency = {}
680 for currency in set(
681 list(cash_assets_by_currency.keys())
682 + list(position_assets_by_currency.keys())
683 ):
684 cash = cash_assets_by_currency.get(currency, Decimal("0"))
685 position_assets = position_assets_by_currency.get(currency, Decimal("0"))
686 total_assets = cash + position_assets
688 total_assets_by_currency[currency] = {
689 "total_assets": total_assets,
690 "cash_assets": cash,
691 "position_assets": position_assets,
692 "today_pnl": Decimal("0"), # 暂时设为0,后续可以从API获取
693 }
695 if not total_assets_by_currency:
696 return None
698 # 创建多货币概览
699 return AssetOverview(
700 total_assets_by_currency=total_assets_by_currency,
701 cash_assets_by_currency=cash_assets_by_currency,
702 position_assets_by_currency=position_assets_by_currency,
703 positions_by_currency=positions_by_currency,
704 positions=positions,
705 )
707 def get_simulated_asset_overview(
708 self, user_id: str
709 ) -> Optional[SimulatedAssetOverview]:
710 """获取模拟资产概览(支持多货币,动态计算总资产)"""
711 # 获取用户的所有模拟资产记录(按货币分组)
712 cash_assets_by_currency = {}
714 # 获取所有模拟资产记录(只获取现金部分)
715 asset_keys = self.redis_client.client.keys(f"simulated_asset:{user_id}*")
716 for asset_key in asset_keys:
717 asset_data = self.redis_client.client.hgetall(asset_key)
718 if asset_data:
719 currency = CurrencyType(asset_data["currency"])
720 cash_assets_by_currency[currency] = Decimal(
721 str(asset_data["cash_assets"])
722 )
724 # 获取所有模拟持仓
725 positions = self.get_simulated_positions(user_id)
727 # 按货币分组持仓并计算持仓资产
728 positions_by_currency = {}
729 position_assets_by_currency = {}
731 for position in positions:
732 currency = position.currency
733 if currency not in positions_by_currency:
734 positions_by_currency[currency] = []
735 position_assets_by_currency[currency] = Decimal("0")
736 positions_by_currency[currency].append(position)
738 # 计算持仓资产(数量 × 当前价格)
739 position_value = position.quantity * position.current_price
740 position_assets_by_currency[currency] += position_value
742 # 动态计算总资产(现金 + 持仓资产)
743 total_assets_by_currency = {}
744 for currency in set(
745 list(cash_assets_by_currency.keys())
746 + list(position_assets_by_currency.keys())
747 ):
748 cash = cash_assets_by_currency.get(currency, Decimal("0"))
749 position_assets = position_assets_by_currency.get(currency, Decimal("0"))
750 total_assets = cash + position_assets
752 total_assets_by_currency[currency] = {
753 "total_assets": total_assets,
754 "cash_assets": cash,
755 "position_assets": position_assets,
756 "today_pnl": Decimal("0"), # 暂时设为0,后续可以从API获取
757 }
759 if not total_assets_by_currency:
760 return None
762 # 创建多货币概览
763 return SimulatedAssetOverview(
764 total_assets_by_currency=total_assets_by_currency,
765 positions_by_currency=positions_by_currency,
766 positions=positions,
767 )
769 def delete_simulated_asset(self, user_id: str) -> bool:
770 """删除模拟资产(所有货币)"""
771 # 获取所有模拟资产记录
772 asset_keys = self.redis_client.client.keys(f"simulated_asset:{user_id}*")
773 if not asset_keys:
774 return True # 没有资产记录,认为删除成功
776 # 删除所有资产记录
777 deleted_count = 0
778 for asset_key in asset_keys:
779 result = self.redis_client.client.delete(asset_key)
780 if result > 0:
781 deleted_count += 1
783 return deleted_count > 0
785 def delete_simulated_positions(self, user_id: str) -> bool:
786 """删除用户的所有模拟持仓"""
787 # 获取用户的所有模拟持仓ID
788 position_ids = self.redis_client.client.smembers(
789 f"simulated_positions:{user_id}"
790 )
792 if not position_ids:
793 return True # 没有持仓数据,认为删除成功
795 # 删除所有持仓数据
796 deleted_count = 0
797 for position_id in position_ids:
798 position_id_str = (
799 position_id.decode() if isinstance(position_id, bytes) else position_id
800 )
801 result = self.redis_client.client.delete(
802 f"simulated_position:{position_id_str}"
803 )
804 if result > 0:
805 deleted_count += 1
807 # 删除用户持仓列表
808 self.redis_client.client.delete(f"simulated_positions:{user_id}")
810 return deleted_count > 0