Coverage for core/data_source/adapters/asset_adapter.py: 42.91%
247 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-13 18:58 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-13 18:58 +0000
1"""
2资产业务适配器
3统一资产业务逻辑,替代AssetDataService + LongPortService
4"""
6from datetime import datetime, timezone
7from decimal import Decimal
8from typing import Any, Dict, List, Optional
10from core.data_source.adapters.data_source_adapter import (
11 AssetDataSourceAdapter, QuoteDataSourceAdapter)
12from core.models.asset import (AssetOverview, AssetType, CurrencyType,
13 MarketType, SimulatedAssetCreate,
14 SimulatedAssetResponse, SimulatedAssetUpdate,
15 SimulatedPositionCreate,
16 SimulatedPositionResponse,
17 SimulatedPositionUpdate, UserAssetCreate,
18 UserPositionCreate, UserPositionResponse)
19from core.repositories.asset_repository import AssetRepository
22class AssetAdapter:
23 """资产业务适配器 - 统一资产业务逻辑"""
25 def __init__(self, user_id: str):
26 self.user_id = user_id
27 self.asset_adapter = AssetDataSourceAdapter(user_id)
28 self.quote_adapter = QuoteDataSourceAdapter(user_id)
29 self.asset_repo = AssetRepository()
31 # ===== 基础资产CRUD方法 =====
32 def create_user_asset(self, asset: UserAssetCreate):
33 """创建用户资产"""
34 # 注意:这里需要实现真正的用户资产创建逻辑
35 # 现在暂时抛出一个未实现错误,提示需要真正的实现
36 raise NotImplementedError("create_user_asset方法需要在具体业务逻辑中实现")
38 def get_user_asset(self, user_id: str):
39 """获取用户资产"""
40 # 直接调用AssetRepository的get_user_asset方法
41 return self.asset_repo.get_user_asset(user_id)
43 def update_user_asset(self, user_id: str, asset_update):
44 """更新用户资产"""
45 raise NotImplementedError("update_user_asset方法需要在具体业务逻辑中实现")
47 def delete_user_asset(self, user_id: str) -> bool:
48 """删除用户资产"""
49 raise NotImplementedError("delete_user_asset方法需要在具体业务逻辑中实现")
51 # ===== 模拟资产管理方法 =====
52 def create_simulated_asset(self, asset):
53 """创建模拟资产"""
54 raise NotImplementedError("create_simulated_asset方法需要在具体业务逻辑中实现")
56 def get_simulated_asset(self):
57 """获取模拟资产"""
58 raise NotImplementedError("get_simulated_asset方法需要在具体业务逻辑中实现")
60 def update_simulated_asset(
61 self, user_id: str, asset_update: SimulatedAssetUpdate
62 ) -> SimulatedAssetResponse:
63 """更新模拟资产"""
64 return self.asset_repo.update_simulated_asset(user_id, asset_update)
66 # ===== 模拟持仓管理方法 =====
67 def create_simulated_position(
68 self, position: SimulatedPositionCreate
69 ) -> SimulatedPositionResponse:
70 """创建模拟持仓"""
71 return self.asset_repo.create_simulated_position(position)
73 def get_simulated_positions(self, user_id: str) -> List[SimulatedPositionResponse]:
74 """获取模拟持仓列表"""
75 return self.asset_repo.get_simulated_positions(user_id)
77 def update_simulated_position(
78 self, position_id: str, position_update: SimulatedPositionUpdate
79 ) -> Optional[SimulatedPositionResponse]:
80 """更新模拟持仓"""
81 return self.asset_repo.update_simulated_position(position_id, position_update)
83 def delete_simulated_position(self, position_id: str) -> bool:
84 """删除模拟持仓"""
85 return self.asset_repo.delete_simulated_position(position_id)
87 # ===== 传统同步方法 =====
88 def sync_from_broker(self, user_id: str) -> bool:
89 """从券商同步资产数据 (传统接口)"""
90 # 实际上调用新的长桥同步方法
91 result = self.sync_from_longport(force_refresh=True)
92 return result.get("success", False) if result else False
94 def sync_to_simulated(self, user_id: str) -> bool:
95 """同步真实资产到模拟资产"""
96 try:
97 # 获取真实资产概览
98 real_overview = self.get_or_create_asset_overview()
99 if not real_overview:
100 print("❌ 无法获取真实资产概览")
101 return False
103 # 清理现有模拟资产和持仓
104 self.asset_repo.delete_simulated_asset(user_id)
105 # 清理现有模拟持仓
106 simulated_positions = self.asset_repo.get_simulated_positions(user_id)
107 for position in simulated_positions:
108 self.asset_repo.delete_simulated_position(position.id)
110 # 同步现金资产
111 for (
112 currency_str,
113 cash_assets,
114 ) in real_overview.cash_assets_by_currency.items():
115 simulated_asset_data = SimulatedAssetCreate(
116 user_id=user_id,
117 total_assets=cash_assets,
118 cash_assets=cash_assets,
119 position_assets=Decimal("0"),
120 currency=currency_str, # 添加货币字段
121 )
122 self.asset_repo.create_simulated_asset(simulated_asset_data)
124 # 同步持仓数据
125 for position in real_overview.positions:
126 simulated_position_data = SimulatedPositionCreate(
127 user_id=user_id,
128 symbol=position.symbol,
129 symbol_name=position.symbol_name,
130 asset_type=position.asset_type,
131 quantity=position.quantity,
132 cost_price=position.cost_price,
133 current_price=position.current_price,
134 market=position.market,
135 currency=position.currency,
136 )
137 self.asset_repo.create_simulated_position(simulated_position_data)
139 print(f"✅ 成功同步用户 {user_id} 的真实资产到模拟资产")
140 return True
142 except Exception as e:
143 print(f"❌ 同步到模拟资产失败: {e}")
144 return False
146 def sync_from_longport(
147 self, force_refresh: bool = False
148 ) -> Optional[Dict[str, Any]]:
149 """从长桥API同步真实资产数据(统一接口)"""
150 # 检查数据源是否可用
151 if not self.asset_adapter.is_available():
152 print("❌ 长桥数据源不可用,无法同步数据")
153 return None
155 try:
156 # 获取账户余额
157 balance_data = self.asset_adapter.get_account_balance()
158 if not balance_data:
159 print("❌ 无法获取账户余额")
160 return None
162 # 获取持仓信息
163 positions = self.asset_adapter.get_positions()
164 print(f"🔍 业务适配器获取的持仓数量: {len(positions)}")
165 if positions:
166 # 移除调试日志
167 pass
168 else:
169 print("❌ 没有获取到任何持仓数据")
171 # 按货币分组计算持仓资产
172 position_assets_by_currency = {}
173 parsed_positions = []
175 for pos in positions:
176 # 解析持仓数据
177 parsed_position = self._parse_position_data(pos)
178 if parsed_position:
179 parsed_positions.append(parsed_position)
181 # 如果有市值数据,计算持仓资产
182 if pos.get("market_value") is not None:
183 currency = CurrencyType(pos.get("currency", "USD"))
184 if currency not in position_assets_by_currency:
185 position_assets_by_currency[currency] = Decimal("0")
186 position_assets_by_currency[currency] += pos["market_value"]
187 else:
188 # 如果没有市值,用当前价格和数量计算
189 currency = CurrencyType(pos.get("currency", "USD"))
190 quantity = Decimal(str(pos.get("quantity") or "0"))
191 current_price = Decimal(str(pos.get("current_price") or "0"))
192 market_value = quantity * current_price
194 if currency not in position_assets_by_currency:
195 position_assets_by_currency[currency] = Decimal("0")
196 position_assets_by_currency[currency] += market_value
198 # 计算总资产(按货币分组)
199 total_assets_by_currency = {}
200 cash_assets_by_currency = {}
201 # position_assets_by_currency = {} # 移除此行,避免清空已计算的持仓资产
202 today_pnl = Decimal("0") # TODO: 从API获取真实P&L
204 # 先统计持仓资产
205 for position in parsed_positions:
206 currency = CurrencyType(position.get("currency", "USD"))
207 position_value = position.get(
208 "current_price", Decimal("0")
209 ) * position.get("quantity", Decimal("0"))
211 if currency not in position_assets_by_currency:
212 position_assets_by_currency[currency] = Decimal("0")
213 position_assets_by_currency[currency] += position_value
215 # 处理现金资产(从账户余额获取)
216 for cash_detail in balance_data.get("cash_details", []):
217 currency = CurrencyType(cash_detail["currency"])
218 cash_amount = cash_detail["available_cash"]
219 cash_assets_by_currency[currency] = cash_amount
221 # 按模型定义构建total_assets_by_currency
222 total_assets_by_currency[currency] = {
223 "total_assets": cash_amount
224 + position_assets_by_currency.get(currency, Decimal("0")),
225 "cash_assets": cash_amount,
226 "position_assets": position_assets_by_currency.get(
227 currency, Decimal("0")
228 ),
229 "today_pnl": Decimal("0"), # TODO: 计算真实今日盈亏
230 }
232 # 为只有持仓但没有现金的货币创建资产记录
233 for currency, position_value in position_assets_by_currency.items():
234 if currency not in total_assets_by_currency:
235 cash_assets_by_currency[currency] = Decimal("0")
236 total_assets_by_currency[currency] = {
237 "total_assets": position_value,
238 "cash_assets": Decimal("0"),
239 "position_assets": position_value,
240 "today_pnl": Decimal("0"),
241 }
243 # 先删除现有的资产和持仓数据(每次同步都清理)
244 print(f"清理用户 {self.user_id} 的现有资产数据...")
245 self.asset_repo.delete_user_asset(self.user_id)
246 self.asset_repo.delete_user_positions(self.user_id)
248 # 为每种货币创建资产记录
249 for currency, assets_data in total_assets_by_currency.items():
250 cash_assets = cash_assets_by_currency.get(currency, Decimal("0"))
251 position_assets = position_assets_by_currency.get(
252 currency, Decimal("0")
253 )
255 asset_data = UserAssetCreate(
256 user_id=self.user_id,
257 total_assets=assets_data["total_assets"],
258 cash_assets=cash_assets,
259 position_assets=position_assets,
260 today_pnl=today_pnl,
261 currency=currency,
262 )
264 # 创建新的资产记录
265 self.asset_repo.create_user_asset(asset_data)
267 # 清除之前的真实持仓数据
268 self.asset_repo.clear_user_positions(self.user_id)
269 # 已清除用户旧持仓数据
271 # 保存持仓数据
272 for position_data in parsed_positions:
273 position = UserPositionCreate(
274 user_id=self.user_id,
275 symbol=position_data["symbol"],
276 symbol_name=position_data["symbol_name"],
277 asset_type=AssetType.STOCK, # TODO: 根据持仓类型确定
278 quantity=position_data["quantity"],
279 available_quantity=position_data["available_quantity"],
280 cost_price=position_data["cost_price"],
281 current_price=position_data["current_price"],
282 market=position_data["market"],
283 currency=position_data["currency"],
284 )
285 self.asset_repo.create_user_position(position)
287 print(f"✅ 成功从长桥API同步用户 {self.user_id} 的资产数据")
289 return {
290 "total_assets_by_currency": total_assets_by_currency,
291 "cash_assets_by_currency": cash_assets_by_currency,
292 "position_assets_by_currency": position_assets_by_currency,
293 "today_pnl": today_pnl,
294 "positions": parsed_positions,
295 }
297 except Exception as e:
298 print(f"❌ 同步资产数据失败: {e}")
299 return None
301 def _parse_position_data(self, pos: Dict[str, Any]) -> Optional[Dict[str, Any]]:
302 """解析持仓数据"""
303 if not pos.get("symbol"):
304 return None
306 # 转换为AssetType
307 asset_type_map = {"STOCK": AssetType.STOCK, "FUND": AssetType.FUND}
309 # 转换为MarketType
310 market_map = {
311 "US": MarketType.US,
312 "HK": MarketType.HK,
313 "CN": MarketType.CN,
314 "SG": MarketType.SG,
315 "CRYPTO": MarketType.CRYPTO,
316 "UNKNOWN": MarketType.US, # 默认使用US市场
317 }
319 try:
320 return {
321 "symbol": pos["symbol"],
322 "symbol_name": pos.get("symbol_name", pos["symbol"]),
323 "asset_type": asset_type_map.get(
324 pos.get("asset_type", "STOCK"), AssetType.STOCK
325 ),
326 "quantity": Decimal(str(pos.get("quantity") or "0")),
327 "available_quantity": Decimal(
328 str(pos.get("available_quantity") or "0")
329 ),
330 "cost_price": Decimal(str(pos.get("cost_price") or "0")),
331 "current_price": Decimal(str(pos.get("current_price") or "0")),
332 "market": market_map.get(pos.get("market", "UNKNOWN"), MarketType.US),
333 "currency": CurrencyType(pos.get("currency", "USD")),
334 }
335 except Exception as e:
336 print(f"❌ 解析持仓数据失败: {e}")
337 return None
339 def get_or_create_asset_overview(
340 self, force_refresh: bool = False
341 ) -> Optional[AssetOverview]:
342 """获取或创建资产概览(带缓存机制)"""
343 # 如果强制刷新,直接从长桥API获取最新数据
344 if force_refresh:
345 print(f"🔄 强制刷新资产数据,用户: {self.user_id}")
346 real_data = self.sync_from_longport(force_refresh=True)
347 if real_data:
348 return self._build_asset_overview_from_data(real_data)
349 # 如果长桥API失败,返回现有数据
351 # 先尝试获取现有数据
352 overview = self.asset_repo.get_asset_overview(self.user_id)
353 if overview:
354 # 检查是否需要更新(超过1分钟)
355 if self._should_update_data():
356 print(f"⏰ 资产数据超过1分钟,从长桥API获取最新数据")
357 real_data = self.sync_from_longport(force_refresh=False)
358 if real_data:
359 return self._build_asset_overview_from_data(real_data)
360 # 如果长桥API失败,返回现有缓存数据
361 return overview
362 else:
363 print(f"♻️ 使用缓存资产数据,用户: {self.user_id}")
364 return overview
366 # 如果没有现有数据,尝试从长桥API获取
367 print(f"📊 初始化资产数据,用户: {self.user_id}")
368 real_data = self.sync_from_longport(force_refresh=False)
370 if real_data:
371 return self._build_asset_overview_from_data(real_data)
373 # 如果长桥API不可用,创建默认资产数据
374 return self._create_default_asset_overview()
376 def _build_asset_overview_from_data(
377 self, real_data: Dict[str, Any] = None
378 ) -> AssetOverview:
379 """从数据库构建资产概览(重新读取数据以保证完整性)"""
380 # 重新从数据库获取最新的持仓数据,不使用同步时的临时数据
381 positions = self.asset_repo.get_user_positions(self.user_id)
383 # 需要重新按货币分组构建概览数据
384 if real_data and real_data.get("total_assets_by_currency") is not None:
385 # 重新计算实际持仓资产(从数据库持仓数据)
386 position_assets_by_currency_recalculated = {}
387 for position in positions:
388 currency = position.currency
389 position_value = position.current_price * position.quantity
390 if currency not in position_assets_by_currency_recalculated:
391 position_assets_by_currency_recalculated[currency] = Decimal("0")
392 position_assets_by_currency_recalculated[currency] += position_value
394 # 重新计算总资产(现金+持仓资产)
395 total_assets_by_currency_recalculated = {}
396 for currency, cash_assets in real_data["cash_assets_by_currency"].items():
397 position_assets = position_assets_by_currency_recalculated.get(
398 currency, Decimal("0")
399 )
400 total_assets_by_currency_recalculated[currency] = {
401 "total_assets": cash_assets + position_assets,
402 "cash_assets": cash_assets,
403 "position_assets": position_assets,
404 "today_pnl": real_data["today_pnl"],
405 }
407 return AssetOverview(
408 total_assets_by_currency=total_assets_by_currency_recalculated,
409 cash_assets_by_currency=real_data["cash_assets_by_currency"],
410 position_assets_by_currency=position_assets_by_currency_recalculated,
411 today_pnl=real_data["today_pnl"],
412 currency=None, # 多币种模式
413 total_assets=None, # 多币种模式下不设总值
414 cash_assets=None,
415 position_assets=None,
416 positions_by_currency=self._group_positions_by_currency(positions),
417 positions=self._convert_to_position_responses(
418 real_data.get("positions", [])
419 ), # 转换数据格式
420 )
421 else:
422 # 如果同步数据不可用,从数据库资产记录构建 - 重新计算持仓资产
423 latest_assets = self.asset_repo.get_latest_user_asset(self.user_id)
425 # 重新计算真实的持仓资产(从当前持仓数据)
426 position_assets_by_currency = {}
427 for position in positions:
428 currency = position.currency
429 position_value = position.current_price * position.quantity
430 if currency not in position_assets_by_currency:
431 position_assets_by_currency[currency] = Decimal("0")
432 position_assets_by_currency[currency] += position_value
434 # 构建资产概览数据
435 if latest_assets:
436 currency = latest_assets.currency
437 cash_assets = latest_assets.cash_assets
438 position_assets = position_assets_by_currency.get(
439 currency, Decimal("0")
440 )
442 total_assets_by_currency = {
443 currency: {
444 "total_assets": cash_assets + position_assets,
445 "cash_assets": cash_assets,
446 "position_assets": position_assets,
447 "today_pnl": latest_assets.today_pnl or Decimal("0"),
448 }
449 }
450 cash_assets_by_currency = {currency: cash_assets}
451 else:
452 # 如果数据库没有资产记录,只有持仓数据
453 total_assets_by_currency = {}
454 cash_assets_by_currency = {}
456 for currency, position_value in position_assets_by_currency.items():
457 total_assets_by_currency[currency] = {
458 "total_assets": position_value,
459 "cash_assets": Decimal("0"),
460 "position_assets": position_value,
461 "today_pnl": Decimal("0"),
462 }
463 cash_assets_by_currency[currency] = Decimal("0")
465 return AssetOverview(
466 total_assets_by_currency=total_assets_by_currency,
467 cash_assets_by_currency=cash_assets_by_currency,
468 position_assets_by_currency=position_assets_by_currency,
469 positions_by_currency=self._group_positions_by_currency(positions),
470 today_pnl=latest_assets.today_pnl or Decimal("0"),
471 currency=None,
472 total_assets=None,
473 cash_assets=None,
474 position_assets=None,
475 positions=positions,
476 )
478 def _group_positions_by_currency(
479 self, positions: List[UserPositionResponse]
480 ) -> Dict[str, List[UserPositionResponse]]:
481 """按货币分组持仓"""
482 positions_by_currency = {}
483 for position in positions:
484 currency = position.currency
485 if currency not in positions_by_currency:
486 positions_by_currency[currency] = []
487 positions_by_currency[currency].append(position)
488 return positions_by_currency
490 def _convert_to_position_responses(
491 self, position_data_list: List[Dict[str, Any]]
492 ) -> List[UserPositionResponse]:
493 """将持仓字典列表转换为UserPositionResponse对象列表"""
494 position_responses = []
495 for pos_data in position_data_list:
496 try:
497 position_response = UserPositionResponse(
498 id=f"temp_{pos_data['symbol']}", # 临时ID
499 user_id=self.user_id,
500 symbol=pos_data["symbol"],
501 symbol_name=pos_data["symbol_name"],
502 asset_type=pos_data["asset_type"],
503 quantity=pos_data["quantity"],
504 available_quantity=pos_data["available_quantity"],
505 cost_price=pos_data["cost_price"],
506 current_price=pos_data["current_price"],
507 market=pos_data["market"],
508 currency=pos_data["currency"],
509 created_at=datetime.now().isoformat(),
510 updated_at=datetime.now().isoformat(),
511 )
512 position_responses.append(position_response)
513 except Exception as e:
514 print(f"❌ 转换持仓数据失败: {e}")
515 continue
516 return position_responses
518 def _create_default_asset_overview(self) -> AssetOverview:
519 """创建默认资产概览(长桥API不可用时)"""
520 print(f"⚠️ 长桥API不可用,创建默认资产概览,用户: {self.user_id}")
522 # 创建默认资产数据
523 asset_data = UserAssetCreate(
524 user_id=self.user_id,
525 total_assets=Decimal("100000.00"),
526 cash_assets=Decimal("50000.00"),
527 position_assets=Decimal("50000.00"),
528 today_pnl=Decimal("0.00"),
529 currency=CurrencyType.USD,
530 )
532 self.asset_repo.create_user_asset(asset_data)
534 # 返回默认概览
535 return AssetOverview(
536 total_assets_by_currency={
537 "USD": {
538 "total_assets": Decimal("100000.00"),
539 "cash_assets": Decimal("50000.00"),
540 "position_assets": Decimal("50000.00"),
541 "today_pnl": Decimal("0.00"),
542 }
543 },
544 cash_assets_by_currency={"USD": Decimal("50000.00")},
545 position_assets_by_currency={"USD": Decimal("50000.00")},
546 today_pnl=Decimal("0.00"),
547 currency=None,
548 total_assets=None,
549 cash_assets=None,
550 position_assets=None,
551 positions=[],
552 )
554 def _should_update_data(self) -> bool:
555 """检查是否需要更新数据(超过1分钟)"""
556 try:
557 # 使用AssetRepository的抽象接口获取最新资产记录
558 latest_asset = self.asset_repo.get_latest_user_asset(self.user_id)
559 if not latest_asset:
560 return True
562 # 检查更新时间
563 try:
564 updated_at = latest_asset.updated_at
565 if isinstance(updated_at, str):
566 updated_at = datetime.fromisoformat(updated_at)
568 # 计算时间差
569 now = datetime.now()
570 time_diff = (now - updated_at).total_seconds()
572 # 如果超过1分钟,需要更新
573 return time_diff > 60
574 except Exception:
575 return True
577 except Exception as e:
578 print(f"检查更新时间失败: {e}")
579 return True # 出错时默认更新
581 def get_positions(self) -> List[UserPositionResponse]:
582 """获取用户持仓列表"""
583 return self.asset_repo.get_user_positions(self.user_id)
585 def create_position(
586 self, position_data: UserPositionCreate
587 ) -> UserPositionResponse:
588 """创建持仓"""
589 return self.asset_repo.create_user_position(position_data)
591 def update_position(
592 self, position_id: str, position_data: UserPositionCreate
593 ) -> Optional[UserPositionResponse]:
594 """更新持仓"""
595 return self.asset_repo.update_user_position(position_id, position_data)
597 def delete_position(self, position_id: str) -> bool:
598 """删除持仓"""
599 return self.asset_repo.delete_user_position(position_id)
601 def get_asset_summary(self) -> Optional[Dict[str, Any]]:
602 """获取资产摘要"""
603 try:
604 overview = self.get_or_create_asset_overview()
605 if overview:
606 return {
607 "user_id": self.user_id,
608 "total_assets_by_currency": overview.total_assets_by_currency,
609 "today_pnl": overview.today_pnl,
610 "positions_count": len(overview.positions),
611 }
612 except Exception as e:
613 print(f"❌ 获取资产摘要失败: {e}")
615 return None