Coverage for core/data_source/factories/config_factory.py: 66.87%
163 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-13 18:58 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-13 18:58 +0000
1"""
2统一数据源配置工厂
3负责所有券商配置的统一管理和缓存
4"""
6import hashlib
7import threading
8import time
9from dataclasses import dataclass
10from enum import Enum
11from typing import Any, Dict, Optional
13from core.repositories.settings_repository import SettingsRepository
16class DataSourceType(Enum):
17 """数据源类型"""
19 LONGPORT = "longport"
20 FUTU = "futu"
21 TIGER = "tiger"
24@dataclass
25class UnifiedDataSourceConfig:
26 """统一数据源配置"""
28 type: DataSourceType
29 user_id: str
30 config: Dict[str, Any]
32 def __post_init__(self):
33 """生成配置哈希用于缓存键"""
34 self.config_hash = self._generate_config_hash()
36 def _generate_config_hash(self) -> str:
37 """生成配置哈希值"""
38 config_str = (
39 f"{self.type.value}:{self.user_id}:{str(sorted(self.config.items()))}"
40 )
41 return hashlib.md5(config_str.encode()).hexdigest()
44class ConfigFactory:
45 """统一数据源配置工厂 - 单例模式"""
47 _instance = None
48 _lock = threading.Lock()
50 def __new__(cls):
51 if cls._instance is None:
52 with cls._lock:
53 if cls._instance is None:
54 cls._instance = super().__new__(cls)
55 cls._instance._initialized = False
56 return cls._instance
58 def __init__(self):
59 if not self._initialized:
60 self._settings_repo = SettingsRepository()
62 # 配置缓存:user_id -> UnifiedDataSourceConfig
63 self._config_cache: Dict[str, UnifiedDataSourceConfig] = {}
65 # 缓存元数据:user_id -> {'last_used': timestamp, 'access_count': int}
66 self._cache_metadata: Dict[str, Dict[str, Any]] = {}
68 self._max_cache_size = 200 # 最大缓存配置数
69 self._cache_ttl = 300 # 5分钟缓存有效期
71 self._initialized = True
73 def get_data_source_config(
74 self, user_id: str, force_refresh: bool = False
75 ) -> Optional[UnifiedDataSourceConfig]:
76 """
77 获取用户的数据源配置
79 Args:
80 user_id: 用户ID
81 force_refresh: 是否强制刷新缓存
83 Returns:
84 统一数据源配置,失败返回 None
85 """
86 current_time = time.time()
88 # 检查缓存
89 if not force_refresh and user_id in self._config_cache:
90 cached_config = self._config_cache[user_id]
91 cached_meta = self._cache_metadata[user_id]
93 # 检查缓存是否过期
94 if current_time - cached_meta["last_used"] < self._cache_ttl:
95 cached_meta["last_used"] = current_time
96 cached_meta["access_count"] += 1
97 print(
98 f"♻️ 复用数据源配置缓存,用户: {user_id}, 类型: {cached_config.type}"
99 )
100 return cached_config
101 else:
102 # 缓存过期,移除
103 del self._config_cache[user_id]
104 del self._cache_metadata[user_id]
105 print(f"⏰ 数据源配置缓存过期,用户: {user_id}")
107 # 从数据库获取配置
108 print(f"📋 从数据库获取用户 {user_id} 的数据源配置")
110 try:
111 # 获取用户设置
112 user_settings = self._settings_repo.get_user_settings(user_id)
113 if not user_settings:
114 print(f"❌ 用户 {user_id} 不存在用户设置")
115 # 尝试使用环境变量fallback(仅在非生产环境)
116 print(f"尝试环境变量fallback")
117 fallback_config = self._try_environment_fallback()
118 if fallback_config:
119 data_source_type = DataSourceType.LONGPORT
120 config_dict = fallback_config
121 print(f"✅ 使用环境变量配置,用户: {user_id}")
122 else:
123 print(f"❌ 用户 {user_id} 不存在且环境变量不可用")
124 return None
125 else:
126 # 解析数据源配置 - 支持两种配置格式
127 data_source_config = user_settings.data_source_config
128 longport_config = user_settings.longport_config
130 # 确定数据源类型和有效配置
131 if data_source_config and data_source_config.longport_config:
132 # 新的统一配置格式
133 data_source_type = DataSourceType(
134 data_source_config.type or "longport"
135 )
136 config_dict = data_source_config.longport_config.model_dump()
137 elif longport_config:
138 # 兼容旧的长桥配置格式
139 data_source_type = DataSourceType.LONGPORT
140 config_dict = (
141 longport_config.model_dump()
142 if hasattr(longport_config, "model_dump")
143 else longport_config
144 )
145 print(f"📋 使用旧格式长桥配置,用户: {user_id}")
146 else:
147 # 尝试使用环境变量fallback(仅在非生产环境)
148 print(f"❌ 用户 {user_id} 没有数据源配置,尝试环境变量fallback")
149 fallback_config = self._try_environment_fallback()
150 if fallback_config:
151 data_source_type = DataSourceType.LONGPORT
152 config_dict = fallback_config
153 print(f"✅ 使用环境变量配置,用户: {user_id}")
154 else:
155 print(f"❌ 用户 {user_id} 没有数据源配置且环境变量不可用")
156 return None
158 # 根据类型构建具体配置
159 if data_source_type == DataSourceType.LONGPORT:
160 config = self._build_longport_config_from_dict(config_dict)
161 elif data_source_type == DataSourceType.FUTU:
162 config = self._build_futu_config(user_settings)
163 elif data_source_type == DataSourceType.TIGER:
164 config = self._build_tiger_config(user_settings)
165 else:
166 print(f"❌ 不支持的数据源类型: {data_source_type}")
167 return None
169 if not config:
170 print(f"❌ 无法构建用户 {user_id} 的数据源配置")
171 return None
173 # 创建统一配置对象
174 unified_config = UnifiedDataSourceConfig(
175 type=data_source_type, user_id=user_id, config=config
176 )
178 # 缓存配置
179 self._enforce_cache_limit()
180 self._config_cache[user_id] = unified_config
181 self._cache_metadata[user_id] = {
182 "last_used": current_time,
183 "access_count": 1,
184 }
186 print(f"✅ 数据源配置已缓存,用户: {user_id}, 类型: {data_source_type}")
187 return unified_config
189 except Exception as e:
190 print(f"❌ 获取数据源配置失败: {e}")
191 return None
193 def _try_environment_fallback(self) -> Optional[Dict[str, Any]]:
194 """尝试从环境变量获取配置(仅在非生产环境)"""
195 from infrastructure.config.settings import settings
197 # 仅在非生产环境使用环境变量fallback
198 if settings.environment == "production":
199 print("🚫 生产环境不允许使用环境变量fallback")
200 return None
202 # 检查环境变量
203 app_key = getattr(settings, "longport_app_key", None)
204 app_secret = getattr(settings, "longport_app_secret", None)
205 access_token = getattr(settings, "longport_access_token", None)
207 if not app_key or not app_secret or not access_token:
208 print("❌ 环境变量配置不完整")
209 return None
211 config = {
212 "app_key": app_key,
213 "app_secret": app_secret,
214 "access_token": access_token,
215 "language": "zh-CN",
216 "enable_overnight": True,
217 }
219 print(
220 f"✅ 环境变量配置可用: app_key={app_key[:10]}..., app_secret={app_secret[:10]}..., access_token={access_token[:10]}..."
221 )
222 return config
224 def _build_longport_config_from_dict(self, config_dict) -> Optional[Dict[str, Any]]:
225 """从配置字典构建长桥配置"""
226 # print(f"🔧 构建长桥配置: {config_dict}") # 注释掉调试日志
228 # 检查必要配置是否存在
229 required_fields = ["app_key", "app_secret", "access_token"]
230 for field in required_fields:
231 if not config_dict.get(field):
232 print(f"❌ 长桥配置缺少必要字段: {field}")
233 return None
235 config = {
236 "app_key": config_dict["app_key"],
237 "app_secret": config_dict["app_secret"],
238 "access_token": config_dict["access_token"],
239 "language": config_dict.get("language", "zh-CN"),
240 "enable_overnight": config_dict.get("enable_overnight", True),
241 }
243 print(f"✅ 长桥配置构建成功")
244 return config
246 def _build_longport_config(self, user_settings) -> Optional[Dict[str, Any]]:
247 """构建长桥配置"""
248 from infrastructure.config.settings import settings
250 # 优先使用用户配置,其次使用环境变量fallback
251 longport_config = user_settings.longport_config
252 fallback_to_env = user_settings.data_source_config.get("fallback_to_env", True)
254 config = {}
256 # App Key
257 config["app_key"] = (
258 longport_config.app_key
259 if longport_config and longport_config.app_key
260 else (
261 settings.longport_app_key
262 if fallback_to_env and hasattr(settings, "longport_app_key")
263 else None
264 )
265 )
267 # App Secret
268 config["app_secret"] = (
269 longport_config.app_secret
270 if longport_config and longport_config.app_secret
271 else (
272 settings.longport_app_secret
273 if fallback_to_env and hasattr(settings, "longport_app_secret")
274 else None
275 )
276 )
278 # Access Token
279 config["access_token"] = (
280 longport_config.access_token
281 if longport_config and longport_config.access_token
282 else (
283 settings.longport_access_token
284 if fallback_to_env and hasattr(settings, "longport_access_token")
285 else None
286 )
287 )
289 # Language (默认中文)
290 config["language"] = (
291 longport_config.language
292 if longport_config and longport_config.language
293 else "zh-CN"
294 )
296 # Enable Overnight (默认关闭)
297 config["enable_overnight"] = (
298 longport_config.enable_overnight
299 if longport_config and hasattr(longport_config, "enable_overnight")
300 else False
301 )
303 # 检查必要配置是否存在
304 if (
305 not config["app_key"]
306 or not config["app_secret"]
307 or not config["access_token"]
308 ):
309 print(
310 f"❌ 长桥配置不完整: app_key={bool(config['app_key'])}, app_secret={bool(config['app_secret'])}, access_token={bool(config['access_token'])}"
311 )
312 return None
314 return config
316 def _build_futu_config(self, user_settings) -> Optional[Dict[str, Any]]:
317 """构建富途配置(预留)"""
318 # TODO: 实现富途配置构建
319 print("🚧 富途配置构建功能待实现")
320 return None
322 def _build_tiger_config(self, user_settings) -> Optional[Dict[str, Any]]:
323 """构建老虎配置(预留)"""
324 # TODO: 实现老虎配置构建
325 print("🚧 老虎配置构建功能待实现")
326 return None
328 def _enforce_cache_limit(self):
329 """强制执行缓存限制(LRU淘汰)"""
330 if len(self._config_cache) >= self._max_cache_size:
331 # 找到最久未使用的配置
332 oldest_user = min(
333 self._cache_metadata.keys(),
334 key=lambda k: self._cache_metadata[k]["last_used"],
335 )
337 # 移除缓存
338 del self._config_cache[oldest_user]
339 del self._cache_metadata[oldest_user]
340 print(f"🗑️ LRU淘汰数据源配置缓存,用户: {oldest_user}")
342 def clear_cache(self, user_id: Optional[str] = None):
343 """
344 清理配置缓存
346 Args:
347 user_id: 特定用户ID,None表示清理所有缓存
348 """
349 if user_id:
350 if user_id in self._config_cache:
351 del self._config_cache[user_id]
352 del self._cache_metadata[user_id]
353 print(f"🗑️ 已清理用户 {user_id} 的配置缓存")
354 else:
355 self._config_cache.clear()
356 self._cache_metadata.clear()
357 print("🗑️ 已清理所有配置缓存")
359 def get_cache_info(self) -> Dict[str, Any]:
360 """获取缓存信息(用于调试)"""
361 return {
362 "cached_users": list(self._config_cache.keys()),
363 "cache_count": len(self._config_cache),
364 "max_cache_size": self._max_cache_size,
365 "cache_usage_percent": (len(self._config_cache) / self._max_cache_size)
366 * 100,
367 }
370# 全局单例实例
371unified_config_factory = ConfigFactory()