Coverage for core/data_source/adapters/quote_adapter.py: 52.34%

128 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-13 18:58 +0000

1""" 

2行情业务适配器 

3统一行情业务逻辑,替代API测试中的直接调用 

4""" 

5 

6from datetime import date 

7from decimal import Decimal 

8from typing import Any, Dict, List, Optional 

9 

10from core.data_source.adapters.data_source_adapter import \ 

11 QuoteDataSourceAdapter 

12 

13 

14class QuoteAdapter: 

15 """行情业务适配器 - 统一行情业务逻辑""" 

16 

17 def __init__(self, user_id: str): 

18 self.user_id = user_id 

19 self.quote_adapter = QuoteDataSourceAdapter(user_id) 

20 

21 def get_static_info(self, symbols: List[str]) -> List[Any]: 

22 """获取标的基础信息""" 

23 if not self.quote_adapter.is_available(): 

24 return [] 

25 

26 try: 

27 return self.quote_adapter.get_static_info(symbols) 

28 except Exception as e: 

29 print(f"❌ 获取标的基础信息失败: {e}") 

30 return [] 

31 

32 def get_quote(self, symbols: List[str]): 

33 """获取实时行情""" 

34 if not self.quote_adapter.is_available(): 

35 return [] 

36 

37 try: 

38 return self.quote_adapter.get_quote(symbols) 

39 except Exception as e: 

40 print(f"❌ 获取实时行情失败: {e}") 

41 return [] 

42 

43 def get_depth(self, symbol: str): 

44 """获取盘口信息""" 

45 if not self.quote_adapter.is_available(): 

46 return None 

47 

48 try: 

49 return self.quote_adapter.get_depth(symbol) 

50 except Exception as e: 

51 print(f"❌ 获取盘口信息失败: {e}") 

52 return None 

53 

54 def get_trades(self, symbol: str, count: int): 

55 """获取成交明细""" 

56 if not self.quote_adapter.is_available(): 

57 return [] 

58 

59 try: 

60 return self.quote_adapter.get_trades(symbol, count) 

61 except Exception as e: 

62 print(f"❌ 获取成交明细失败: {e}") 

63 return [] 

64 

65 def get_candlesticks( 

66 self, 

67 symbol: str, 

68 period: str, 

69 count: int, 

70 adjust_type: str = "NoAdjust", 

71 trade_sessions: str = "Intraday", 

72 ): 

73 """获取K线数据""" 

74 if not self.quote_adapter.is_available(): 

75 return [] 

76 

77 try: 

78 return self.quote_adapter.get_candlesticks( 

79 symbol, period, count, adjust_type, trade_sessions 

80 ) 

81 except Exception as e: 

82 print(f"❌ 获取K线数据失败: {e}") 

83 return [] 

84 

85 def get_trading_days(self, market: str, begin: date, end: date): 

86 """获取交易日期""" 

87 if not self.quote_adapter.is_available(): 

88 return None 

89 

90 try: 

91 return self.quote_adapter.get_trading_days(market, begin, end) 

92 except Exception as e: 

93 print(f"❌ 获取交易日期失败: {e}") 

94 return None 

95 

96 def get_trading_session(self): 

97 """获取交易时段""" 

98 if not self.quote_adapter.is_available(): 

99 return [] 

100 

101 try: 

102 return self.quote_adapter.get_trading_session() 

103 except Exception as e: 

104 print(f"❌ 获取交易时段失败: {e}") 

105 return [] 

106 

107 def get_calc_indexes(self, symbols: List[str], indexes: List[str]): 

108 """获取计算指标""" 

109 if not self.quote_adapter.is_available(): 

110 return [] 

111 

112 try: 

113 return self.quote_adapter.get_calc_indexes(symbols, indexes) 

114 except Exception as e: 

115 print(f"❌ 获取计算指标失败: {e}") 

116 return [] 

117 

118 def get_historical_data(self, symbol: str, start_time, end_time, log_callback=None): 

119 """获取历史数据 - 直接从数据库获取,不检查长桥客户端可用性""" 

120 try: 

121 return self.quote_adapter.get_historical_data( 

122 symbol, start_time, end_time, log_callback 

123 ) 

124 except Exception as e: 

125 error_message = f"❌ 获取历史数据失败: {e}" 

126 if log_callback: 

127 log_callback(error_message, "error") 

128 # 移除fallback的print语句,确保所有日志都通过统一日志管理 

129 return [] 

130 

131 def subscribe( 

132 self, symbols: List[str], sub_types: List[str], is_first_push: bool = False 

133 ): 

134 """订阅行情数据""" 

135 if not self.quote_adapter.is_available(): 

136 return False 

137 

138 try: 

139 return self.quote_adapter.subscribe(symbols, sub_types, is_first_push) 

140 except Exception as e: 

141 print(f"❌ 订阅失败: {e}") 

142 return False 

143 

144 def unsubscribe(self, symbols: List[str], sub_types: List[str]): 

145 """取消订阅""" 

146 if not self.quote_adapter.is_available(): 

147 return False 

148 

149 try: 

150 return self.quote_adapter.unsubscribe(symbols, sub_types) 

151 except Exception as e: 

152 print(f"❌ 取消订阅失败: {e}") 

153 return False 

154 

155 def get_realtime_quote(self, symbols: List[str]): 

156 """获取实时行情(WebSocket)""" 

157 if not self.quote_adapter.is_available(): 

158 return [] 

159 

160 try: 

161 return self.quote_adapter.get_realtime_quote(symbols) 

162 except Exception as e: 

163 print(f"❌ 获取实时行情失败: {e}") 

164 return [] 

165 

166 def get_realtime_depth(self, symbol: str): 

167 """获取实时盘口""" 

168 if not self.quote_adapter.is_available(): 

169 return None 

170 

171 try: 

172 return self.quote_adapter.get_realtime_depth(symbol) 

173 except Exception as e: 

174 print(f"❌ 获取实时盘口失败: {e}") 

175 return None 

176 

177 def get_realtime_trades(self, symbol: str, count: int): 

178 """获取实时成交明细""" 

179 if not self.quote_adapter.is_available(): 

180 return [] 

181 

182 try: 

183 return self.quote_adapter.get_realtime_trades(symbol, count) 

184 except Exception as e: 

185 print(f"❌ 获取实时成交明细失败: {e}") 

186 return [] 

187 

188 def get_subscription_summary(self): 

189 """获取订阅概览""" 

190 if not self.quote_adapter.is_available(): 

191 return {} 

192 

193 try: 

194 return self.quote_adapter.get_subscription_summary() 

195 except Exception as e: 

196 print(f"❌ 获取订阅概览失败: {e}") 

197 return {}