Coverage for core/data_source/adapters/quote_adapter.py: 52.34%
128 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-13 18:58 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-13 18:58 +0000
1"""
2行情业务适配器
3统一行情业务逻辑,替代API测试中的直接调用
4"""
6from datetime import date
7from decimal import Decimal
8from typing import Any, Dict, List, Optional
10from core.data_source.adapters.data_source_adapter import \
11 QuoteDataSourceAdapter
14class QuoteAdapter:
15 """行情业务适配器 - 统一行情业务逻辑"""
17 def __init__(self, user_id: str):
18 self.user_id = user_id
19 self.quote_adapter = QuoteDataSourceAdapter(user_id)
21 def get_static_info(self, symbols: List[str]) -> List[Any]:
22 """获取标的基础信息"""
23 if not self.quote_adapter.is_available():
24 return []
26 try:
27 return self.quote_adapter.get_static_info(symbols)
28 except Exception as e:
29 print(f"❌ 获取标的基础信息失败: {e}")
30 return []
32 def get_quote(self, symbols: List[str]):
33 """获取实时行情"""
34 if not self.quote_adapter.is_available():
35 return []
37 try:
38 return self.quote_adapter.get_quote(symbols)
39 except Exception as e:
40 print(f"❌ 获取实时行情失败: {e}")
41 return []
43 def get_depth(self, symbol: str):
44 """获取盘口信息"""
45 if not self.quote_adapter.is_available():
46 return None
48 try:
49 return self.quote_adapter.get_depth(symbol)
50 except Exception as e:
51 print(f"❌ 获取盘口信息失败: {e}")
52 return None
54 def get_trades(self, symbol: str, count: int):
55 """获取成交明细"""
56 if not self.quote_adapter.is_available():
57 return []
59 try:
60 return self.quote_adapter.get_trades(symbol, count)
61 except Exception as e:
62 print(f"❌ 获取成交明细失败: {e}")
63 return []
65 def get_candlesticks(
66 self,
67 symbol: str,
68 period: str,
69 count: int,
70 adjust_type: str = "NoAdjust",
71 trade_sessions: str = "Intraday",
72 ):
73 """获取K线数据"""
74 if not self.quote_adapter.is_available():
75 return []
77 try:
78 return self.quote_adapter.get_candlesticks(
79 symbol, period, count, adjust_type, trade_sessions
80 )
81 except Exception as e:
82 print(f"❌ 获取K线数据失败: {e}")
83 return []
85 def get_trading_days(self, market: str, begin: date, end: date):
86 """获取交易日期"""
87 if not self.quote_adapter.is_available():
88 return None
90 try:
91 return self.quote_adapter.get_trading_days(market, begin, end)
92 except Exception as e:
93 print(f"❌ 获取交易日期失败: {e}")
94 return None
96 def get_trading_session(self):
97 """获取交易时段"""
98 if not self.quote_adapter.is_available():
99 return []
101 try:
102 return self.quote_adapter.get_trading_session()
103 except Exception as e:
104 print(f"❌ 获取交易时段失败: {e}")
105 return []
107 def get_calc_indexes(self, symbols: List[str], indexes: List[str]):
108 """获取计算指标"""
109 if not self.quote_adapter.is_available():
110 return []
112 try:
113 return self.quote_adapter.get_calc_indexes(symbols, indexes)
114 except Exception as e:
115 print(f"❌ 获取计算指标失败: {e}")
116 return []
118 def get_historical_data(self, symbol: str, start_time, end_time, log_callback=None):
119 """获取历史数据 - 直接从数据库获取,不检查长桥客户端可用性"""
120 try:
121 return self.quote_adapter.get_historical_data(
122 symbol, start_time, end_time, log_callback
123 )
124 except Exception as e:
125 error_message = f"❌ 获取历史数据失败: {e}"
126 if log_callback:
127 log_callback(error_message, "error")
128 # 移除fallback的print语句,确保所有日志都通过统一日志管理
129 return []
131 def subscribe(
132 self, symbols: List[str], sub_types: List[str], is_first_push: bool = False
133 ):
134 """订阅行情数据"""
135 if not self.quote_adapter.is_available():
136 return False
138 try:
139 return self.quote_adapter.subscribe(symbols, sub_types, is_first_push)
140 except Exception as e:
141 print(f"❌ 订阅失败: {e}")
142 return False
144 def unsubscribe(self, symbols: List[str], sub_types: List[str]):
145 """取消订阅"""
146 if not self.quote_adapter.is_available():
147 return False
149 try:
150 return self.quote_adapter.unsubscribe(symbols, sub_types)
151 except Exception as e:
152 print(f"❌ 取消订阅失败: {e}")
153 return False
155 def get_realtime_quote(self, symbols: List[str]):
156 """获取实时行情(WebSocket)"""
157 if not self.quote_adapter.is_available():
158 return []
160 try:
161 return self.quote_adapter.get_realtime_quote(symbols)
162 except Exception as e:
163 print(f"❌ 获取实时行情失败: {e}")
164 return []
166 def get_realtime_depth(self, symbol: str):
167 """获取实时盘口"""
168 if not self.quote_adapter.is_available():
169 return None
171 try:
172 return self.quote_adapter.get_realtime_depth(symbol)
173 except Exception as e:
174 print(f"❌ 获取实时盘口失败: {e}")
175 return None
177 def get_realtime_trades(self, symbol: str, count: int):
178 """获取实时成交明细"""
179 if not self.quote_adapter.is_available():
180 return []
182 try:
183 return self.quote_adapter.get_realtime_trades(symbol, count)
184 except Exception as e:
185 print(f"❌ 获取实时成交明细失败: {e}")
186 return []
188 def get_subscription_summary(self):
189 """获取订阅概览"""
190 if not self.quote_adapter.is_available():
191 return {}
193 try:
194 return self.quote_adapter.get_subscription_summary()
195 except Exception as e:
196 print(f"❌ 获取订阅概览失败: {e}")
197 return {}