Coverage for core/data_source/factories/config_factory.py: 66.87%

163 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-13 18:58 +0000

1""" 

2统一数据源配置工厂 

3负责所有券商配置的统一管理和缓存 

4""" 

5 

6import hashlib 

7import threading 

8import time 

9from dataclasses import dataclass 

10from enum import Enum 

11from typing import Any, Dict, Optional 

12 

13from core.repositories.settings_repository import SettingsRepository 

14 

15 

16class DataSourceType(Enum): 

17 """数据源类型""" 

18 

19 LONGPORT = "longport" 

20 FUTU = "futu" 

21 TIGER = "tiger" 

22 

23 

24@dataclass 

25class UnifiedDataSourceConfig: 

26 """统一数据源配置""" 

27 

28 type: DataSourceType 

29 user_id: str 

30 config: Dict[str, Any] 

31 

32 def __post_init__(self): 

33 """生成配置哈希用于缓存键""" 

34 self.config_hash = self._generate_config_hash() 

35 

36 def _generate_config_hash(self) -> str: 

37 """生成配置哈希值""" 

38 config_str = ( 

39 f"{self.type.value}:{self.user_id}:{str(sorted(self.config.items()))}" 

40 ) 

41 return hashlib.md5(config_str.encode()).hexdigest() 

42 

43 

44class ConfigFactory: 

45 """统一数据源配置工厂 - 单例模式""" 

46 

47 _instance = None 

48 _lock = threading.Lock() 

49 

50 def __new__(cls): 

51 if cls._instance is None: 

52 with cls._lock: 

53 if cls._instance is None: 

54 cls._instance = super().__new__(cls) 

55 cls._instance._initialized = False 

56 return cls._instance 

57 

58 def __init__(self): 

59 if not self._initialized: 

60 self._settings_repo = SettingsRepository() 

61 

62 # 配置缓存:user_id -> UnifiedDataSourceConfig 

63 self._config_cache: Dict[str, UnifiedDataSourceConfig] = {} 

64 

65 # 缓存元数据:user_id -> {'last_used': timestamp, 'access_count': int} 

66 self._cache_metadata: Dict[str, Dict[str, Any]] = {} 

67 

68 self._max_cache_size = 200 # 最大缓存配置数 

69 self._cache_ttl = 300 # 5分钟缓存有效期 

70 

71 self._initialized = True 

72 

73 def get_data_source_config( 

74 self, user_id: str, force_refresh: bool = False 

75 ) -> Optional[UnifiedDataSourceConfig]: 

76 """ 

77 获取用户的数据源配置 

78 

79 Args: 

80 user_id: 用户ID 

81 force_refresh: 是否强制刷新缓存 

82 

83 Returns: 

84 统一数据源配置,失败返回 None 

85 """ 

86 current_time = time.time() 

87 

88 # 检查缓存 

89 if not force_refresh and user_id in self._config_cache: 

90 cached_config = self._config_cache[user_id] 

91 cached_meta = self._cache_metadata[user_id] 

92 

93 # 检查缓存是否过期 

94 if current_time - cached_meta["last_used"] < self._cache_ttl: 

95 cached_meta["last_used"] = current_time 

96 cached_meta["access_count"] += 1 

97 print( 

98 f"♻️ 复用数据源配置缓存,用户: {user_id}, 类型: {cached_config.type}" 

99 ) 

100 return cached_config 

101 else: 

102 # 缓存过期,移除 

103 del self._config_cache[user_id] 

104 del self._cache_metadata[user_id] 

105 print(f"⏰ 数据源配置缓存过期,用户: {user_id}") 

106 

107 # 从数据库获取配置 

108 print(f"📋 从数据库获取用户 {user_id} 的数据源配置") 

109 

110 try: 

111 # 获取用户设置 

112 user_settings = self._settings_repo.get_user_settings(user_id) 

113 if not user_settings: 

114 print(f"❌ 用户 {user_id} 不存在用户设置") 

115 # 尝试使用环境变量fallback(仅在非生产环境) 

116 print(f"尝试环境变量fallback") 

117 fallback_config = self._try_environment_fallback() 

118 if fallback_config: 

119 data_source_type = DataSourceType.LONGPORT 

120 config_dict = fallback_config 

121 print(f"✅ 使用环境变量配置,用户: {user_id}") 

122 else: 

123 print(f"❌ 用户 {user_id} 不存在且环境变量不可用") 

124 return None 

125 else: 

126 # 解析数据源配置 - 支持两种配置格式 

127 data_source_config = user_settings.data_source_config 

128 longport_config = user_settings.longport_config 

129 

130 # 确定数据源类型和有效配置 

131 if data_source_config and data_source_config.longport_config: 

132 # 新的统一配置格式 

133 data_source_type = DataSourceType( 

134 data_source_config.type or "longport" 

135 ) 

136 config_dict = data_source_config.longport_config.model_dump() 

137 elif longport_config: 

138 # 兼容旧的长桥配置格式 

139 data_source_type = DataSourceType.LONGPORT 

140 config_dict = ( 

141 longport_config.model_dump() 

142 if hasattr(longport_config, "model_dump") 

143 else longport_config 

144 ) 

145 print(f"📋 使用旧格式长桥配置,用户: {user_id}") 

146 else: 

147 # 尝试使用环境变量fallback(仅在非生产环境) 

148 print(f"❌ 用户 {user_id} 没有数据源配置,尝试环境变量fallback") 

149 fallback_config = self._try_environment_fallback() 

150 if fallback_config: 

151 data_source_type = DataSourceType.LONGPORT 

152 config_dict = fallback_config 

153 print(f"✅ 使用环境变量配置,用户: {user_id}") 

154 else: 

155 print(f"❌ 用户 {user_id} 没有数据源配置且环境变量不可用") 

156 return None 

157 

158 # 根据类型构建具体配置 

159 if data_source_type == DataSourceType.LONGPORT: 

160 config = self._build_longport_config_from_dict(config_dict) 

161 elif data_source_type == DataSourceType.FUTU: 

162 config = self._build_futu_config(user_settings) 

163 elif data_source_type == DataSourceType.TIGER: 

164 config = self._build_tiger_config(user_settings) 

165 else: 

166 print(f"❌ 不支持的数据源类型: {data_source_type}") 

167 return None 

168 

169 if not config: 

170 print(f"❌ 无法构建用户 {user_id} 的数据源配置") 

171 return None 

172 

173 # 创建统一配置对象 

174 unified_config = UnifiedDataSourceConfig( 

175 type=data_source_type, user_id=user_id, config=config 

176 ) 

177 

178 # 缓存配置 

179 self._enforce_cache_limit() 

180 self._config_cache[user_id] = unified_config 

181 self._cache_metadata[user_id] = { 

182 "last_used": current_time, 

183 "access_count": 1, 

184 } 

185 

186 print(f"✅ 数据源配置已缓存,用户: {user_id}, 类型: {data_source_type}") 

187 return unified_config 

188 

189 except Exception as e: 

190 print(f"❌ 获取数据源配置失败: {e}") 

191 return None 

192 

193 def _try_environment_fallback(self) -> Optional[Dict[str, Any]]: 

194 """尝试从环境变量获取配置(仅在非生产环境)""" 

195 from infrastructure.config.settings import settings 

196 

197 # 仅在非生产环境使用环境变量fallback 

198 if settings.environment == "production": 

199 print("🚫 生产环境不允许使用环境变量fallback") 

200 return None 

201 

202 # 检查环境变量 

203 app_key = getattr(settings, "longport_app_key", None) 

204 app_secret = getattr(settings, "longport_app_secret", None) 

205 access_token = getattr(settings, "longport_access_token", None) 

206 

207 if not app_key or not app_secret or not access_token: 

208 print("❌ 环境变量配置不完整") 

209 return None 

210 

211 config = { 

212 "app_key": app_key, 

213 "app_secret": app_secret, 

214 "access_token": access_token, 

215 "language": "zh-CN", 

216 "enable_overnight": True, 

217 } 

218 

219 print( 

220 f"✅ 环境变量配置可用: app_key={app_key[:10]}..., app_secret={app_secret[:10]}..., access_token={access_token[:10]}..." 

221 ) 

222 return config 

223 

224 def _build_longport_config_from_dict(self, config_dict) -> Optional[Dict[str, Any]]: 

225 """从配置字典构建长桥配置""" 

226 # print(f"🔧 构建长桥配置: {config_dict}") # 注释掉调试日志 

227 

228 # 检查必要配置是否存在 

229 required_fields = ["app_key", "app_secret", "access_token"] 

230 for field in required_fields: 

231 if not config_dict.get(field): 

232 print(f"❌ 长桥配置缺少必要字段: {field}") 

233 return None 

234 

235 config = { 

236 "app_key": config_dict["app_key"], 

237 "app_secret": config_dict["app_secret"], 

238 "access_token": config_dict["access_token"], 

239 "language": config_dict.get("language", "zh-CN"), 

240 "enable_overnight": config_dict.get("enable_overnight", True), 

241 } 

242 

243 print(f"✅ 长桥配置构建成功") 

244 return config 

245 

246 def _build_longport_config(self, user_settings) -> Optional[Dict[str, Any]]: 

247 """构建长桥配置""" 

248 from infrastructure.config.settings import settings 

249 

250 # 优先使用用户配置,其次使用环境变量fallback 

251 longport_config = user_settings.longport_config 

252 fallback_to_env = user_settings.data_source_config.get("fallback_to_env", True) 

253 

254 config = {} 

255 

256 # App Key 

257 config["app_key"] = ( 

258 longport_config.app_key 

259 if longport_config and longport_config.app_key 

260 else ( 

261 settings.longport_app_key 

262 if fallback_to_env and hasattr(settings, "longport_app_key") 

263 else None 

264 ) 

265 ) 

266 

267 # App Secret 

268 config["app_secret"] = ( 

269 longport_config.app_secret 

270 if longport_config and longport_config.app_secret 

271 else ( 

272 settings.longport_app_secret 

273 if fallback_to_env and hasattr(settings, "longport_app_secret") 

274 else None 

275 ) 

276 ) 

277 

278 # Access Token 

279 config["access_token"] = ( 

280 longport_config.access_token 

281 if longport_config and longport_config.access_token 

282 else ( 

283 settings.longport_access_token 

284 if fallback_to_env and hasattr(settings, "longport_access_token") 

285 else None 

286 ) 

287 ) 

288 

289 # Language (默认中文) 

290 config["language"] = ( 

291 longport_config.language 

292 if longport_config and longport_config.language 

293 else "zh-CN" 

294 ) 

295 

296 # Enable Overnight (默认关闭) 

297 config["enable_overnight"] = ( 

298 longport_config.enable_overnight 

299 if longport_config and hasattr(longport_config, "enable_overnight") 

300 else False 

301 ) 

302 

303 # 检查必要配置是否存在 

304 if ( 

305 not config["app_key"] 

306 or not config["app_secret"] 

307 or not config["access_token"] 

308 ): 

309 print( 

310 f"❌ 长桥配置不完整: app_key={bool(config['app_key'])}, app_secret={bool(config['app_secret'])}, access_token={bool(config['access_token'])}" 

311 ) 

312 return None 

313 

314 return config 

315 

316 def _build_futu_config(self, user_settings) -> Optional[Dict[str, Any]]: 

317 """构建富途配置(预留)""" 

318 # TODO: 实现富途配置构建 

319 print("🚧 富途配置构建功能待实现") 

320 return None 

321 

322 def _build_tiger_config(self, user_settings) -> Optional[Dict[str, Any]]: 

323 """构建老虎配置(预留)""" 

324 # TODO: 实现老虎配置构建 

325 print("🚧 老虎配置构建功能待实现") 

326 return None 

327 

328 def _enforce_cache_limit(self): 

329 """强制执行缓存限制(LRU淘汰)""" 

330 if len(self._config_cache) >= self._max_cache_size: 

331 # 找到最久未使用的配置 

332 oldest_user = min( 

333 self._cache_metadata.keys(), 

334 key=lambda k: self._cache_metadata[k]["last_used"], 

335 ) 

336 

337 # 移除缓存 

338 del self._config_cache[oldest_user] 

339 del self._cache_metadata[oldest_user] 

340 print(f"🗑️ LRU淘汰数据源配置缓存,用户: {oldest_user}") 

341 

342 def clear_cache(self, user_id: Optional[str] = None): 

343 """ 

344 清理配置缓存 

345 

346 Args: 

347 user_id: 特定用户ID,None表示清理所有缓存 

348 """ 

349 if user_id: 

350 if user_id in self._config_cache: 

351 del self._config_cache[user_id] 

352 del self._cache_metadata[user_id] 

353 print(f"🗑️ 已清理用户 {user_id} 的配置缓存") 

354 else: 

355 self._config_cache.clear() 

356 self._cache_metadata.clear() 

357 print("🗑️ 已清理所有配置缓存") 

358 

359 def get_cache_info(self) -> Dict[str, Any]: 

360 """获取缓存信息(用于调试)""" 

361 return { 

362 "cached_users": list(self._config_cache.keys()), 

363 "cache_count": len(self._config_cache), 

364 "max_cache_size": self._max_cache_size, 

365 "cache_usage_percent": (len(self._config_cache) / self._max_cache_size) 

366 * 100, 

367 } 

368 

369 

370# 全局单例实例 

371unified_config_factory = ConfigFactory()