Coverage for infrastructure/database/redis_connection_factory.py: 47.01%

134 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-13 18:58 +0000

1""" 

2Redis连接工厂 

3统一管理Redis连接,实现连接复用和缓存 

4""" 

5 

6import os 

7import threading 

8from typing import Any, Dict, Optional 

9 

10import redis 

11from dotenv import load_dotenv 

12 

13# 加载环境变量 

14load_dotenv() 

15 

16 

17class RedisConnectionFactory: 

18 """Redis连接工厂 - 单例模式""" 

19 

20 _instance = None 

21 _lock = threading.Lock() 

22 

23 def __new__(cls): 

24 """单例模式实现""" 

25 if cls._instance is None: 

26 with cls._lock: 

27 if cls._instance is None: 

28 cls._instance = super().__new__(cls) 

29 return cls._instance 

30 

31 def __init__(self): 

32 if not hasattr(self, "_initialized"): 

33 # 缓存结构: {connection_key: {client, last_used, config}} 

34 self._connections_cache: Dict[str, Dict[str, Any]] = {} 

35 self._max_connections = 10 # 最大连接数 

36 self._initialized = True 

37 

38 def get_connection( 

39 self, host: str = None, port: int = None, db: int = None, password: str = None 

40 ) -> Optional[redis.Redis]: 

41 """ 

42 获取Redis连接(复用或创建) 

43 

44 Args: 

45 host: Redis主机地址 

46 port: Redis端口 

47 db: Redis数据库编号 

48 password: Redis密码 

49 

50 Returns: 

51 Redis连接实例,失败返回 None 

52 """ 

53 try: 

54 # 使用默认配置 

55 if host is None: 

56 host = os.getenv("REDIS_HOST", "localhost") 

57 if port is None: 

58 port = int(os.getenv("REDIS_PORT", 6379)) 

59 if db is None: 

60 db = int(os.getenv("REDIS_DB", 0)) 

61 if password is None: 

62 password = os.getenv("REDIS_PASSWORD", "") 

63 

64 # 生成连接键 

65 connection_key = f"{host}:{port}:{db}:{password or 'no_password'}" 

66 

67 # 检查是否已有该配置的连接 

68 if connection_key in self._connections_cache: 

69 cached_connection = self._connections_cache[connection_key] 

70 # 更新最后使用时间 

71 cached_connection["last_used"] = self._get_current_timestamp() 

72 print(f"♻️ 复用Redis连接: {connection_key}") 

73 return cached_connection["client"] 

74 

75 # 检查连接数量限制 

76 self._enforce_connection_limit() 

77 

78 # 创建新的连接 

79 print(f"🆕 创建新的Redis连接: {connection_key}") 

80 client = self._create_connection(host, port, db, password) 

81 if client: 

82 # 添加最后使用时间和配置 

83 connection_data = { 

84 "client": client, 

85 "last_used": self._get_current_timestamp(), 

86 "config": { 

87 "host": host, 

88 "port": port, 

89 "db": db, 

90 "password": password, 

91 }, 

92 } 

93 self._connections_cache[connection_key] = connection_data 

94 print( 

95 f"✅ Redis连接创建成功并缓存,当前连接数量: {len(self._connections_cache)}" 

96 ) 

97 return client 

98 

99 return None 

100 

101 except Exception as e: 

102 print(f"❌ 获取Redis连接失败: {e}") 

103 return None 

104 

105 def _get_current_timestamp(self) -> float: 

106 """获取当前时间戳""" 

107 import time 

108 

109 return time.time() 

110 

111 def _enforce_connection_limit(self): 

112 """强制执行连接限制(LRU淘汰)""" 

113 if len(self._connections_cache) >= self._max_connections: 

114 # 按最后使用时间排序,删除最久未使用的 

115 sorted_connections = sorted( 

116 self._connections_cache.items(), key=lambda x: x[1].get("last_used", 0) 

117 ) 

118 

119 # 删除最久未使用的连接 

120 oldest_key, oldest_connection = sorted_connections[0] 

121 del self._connections_cache[oldest_key] 

122 print(f"🗑️ LRU淘汰最久未使用的Redis连接: {oldest_key}") 

123 

124 def _create_connection( 

125 self, host: str, port: int, db: int, password: str 

126 ) -> Optional[redis.Redis]: 

127 """创建Redis连接""" 

128 try: 

129 if password: 

130 client = redis.Redis( 

131 host=host, 

132 port=port, 

133 db=db, 

134 password=password, 

135 decode_responses=True, 

136 socket_connect_timeout=5, 

137 socket_timeout=5, 

138 health_check_interval=30, 

139 ) 

140 else: 

141 client = redis.Redis( 

142 host=host, 

143 port=port, 

144 db=db, 

145 decode_responses=True, 

146 socket_connect_timeout=5, 

147 socket_timeout=5, 

148 health_check_interval=30, 

149 ) 

150 

151 # 测试连接 

152 client.ping() 

153 print(f"✅ Redis连接测试成功: {host}:{port}") 

154 return client 

155 

156 except Exception as e: 

157 print(f"❌ Redis连接创建失败: {e}") 

158 return None 

159 

160 def is_connection_available( 

161 self, host: str = None, port: int = None, db: int = None, password: str = None 

162 ) -> bool: 

163 """检查连接是否可用""" 

164 connection = self.get_connection(host, port, db, password) 

165 return connection is not None 

166 

167 def refresh_connection_if_needed( 

168 self, host: str = None, port: int = None, db: int = None, password: str = None 

169 ) -> bool: 

170 """ 

171 检查并刷新连接(如果配置发生变化) 

172 

173 Args: 

174 host: Redis主机地址 

175 port: Redis端口 

176 db: Redis数据库编号 

177 password: Redis密码 

178 

179 Returns: 

180 是否需要刷新连接 

181 """ 

182 try: 

183 # 使用默认配置 

184 if host is None: 

185 host = os.getenv("REDIS_HOST", "localhost") 

186 if port is None: 

187 port = int(os.getenv("REDIS_PORT", 6379)) 

188 if db is None: 

189 db = int(os.getenv("REDIS_DB", 0)) 

190 if password is None: 

191 password = os.getenv("REDIS_PASSWORD", "") 

192 

193 connection_key = f"{host}:{port}:{db}:{password or 'no_password'}" 

194 

195 # 检查缓存中是否有该配置的连接 

196 if connection_key not in self._connections_cache: 

197 print(f"🔄 Redis配置变化,需要刷新连接: {connection_key}") 

198 return True 

199 

200 print(f"✅ Redis配置未变化,连接可用: {connection_key}") 

201 return False 

202 

203 except Exception as e: 

204 print(f"❌ 检查连接刷新需求失败: {e}") 

205 return True 

206 

207 def force_refresh_connection( 

208 self, host: str = None, port: int = None, db: int = None, password: str = None 

209 ) -> Optional[redis.Redis]: 

210 """ 

211 强制刷新连接(清理缓存后重新创建) 

212 

213 Args: 

214 host: Redis主机地址 

215 port: Redis端口 

216 db: Redis数据库编号 

217 password: Redis密码 

218 

219 Returns: 

220 新的连接实例,失败返回 None 

221 """ 

222 try: 

223 # 使用默认配置 

224 if host is None: 

225 host = os.getenv("REDIS_HOST", "localhost") 

226 if port is None: 

227 port = int(os.getenv("REDIS_PORT", 6379)) 

228 if db is None: 

229 db = int(os.getenv("REDIS_DB", 0)) 

230 if password is None: 

231 password = os.getenv("REDIS_PASSWORD", "") 

232 

233 connection_key = f"{host}:{port}:{db}:{password or 'no_password'}" 

234 

235 # 清理该配置的旧连接 

236 if connection_key in self._connections_cache: 

237 del self._connections_cache[connection_key] 

238 print(f"🗑️ 清理旧Redis连接缓存: {connection_key}") 

239 

240 # 创建新的连接 

241 print(f"🔄 强制刷新Redis连接: {connection_key}") 

242 return self.get_connection(host, port, db, password) 

243 

244 except Exception as e: 

245 print(f"❌ 强制刷新连接失败: {e}") 

246 return None 

247 

248 def get_connection_info(self) -> Dict[str, Any]: 

249 """获取连接信息(用于调试)""" 

250 return { 

251 "cached_connections": [key for key in self._connections_cache.keys()], 

252 "connection_count": len(self._connections_cache), 

253 "max_connections": self._max_connections, 

254 "cache_usage_percent": ( 

255 len(self._connections_cache) / self._max_connections 

256 ) 

257 * 100, 

258 } 

259 

260 def clear_connections(self): 

261 """清空所有连接缓存""" 

262 self._connections_cache.clear() 

263 print("🗑️ 已清空所有Redis连接缓存") 

264 

265 def remove_connection( 

266 self, host: str = None, port: int = None, db: int = None, password: str = None 

267 ) -> bool: 

268 """ 

269 移除指定的连接 

270 

271 Args: 

272 host: Redis主机地址 

273 port: Redis端口 

274 db: Redis数据库编号 

275 password: Redis密码 

276 

277 Returns: 

278 是否成功移除 

279 """ 

280 try: 

281 # 使用默认配置 

282 if host is None: 

283 host = os.getenv("REDIS_HOST", "localhost") 

284 if port is None: 

285 port = int(os.getenv("REDIS_PORT", 6379)) 

286 if db is None: 

287 db = int(os.getenv("REDIS_DB", 0)) 

288 if password is None: 

289 password = os.getenv("REDIS_PASSWORD", "") 

290 

291 connection_key = f"{host}:{port}:{db}:{password or 'no_password'}" 

292 

293 if connection_key in self._connections_cache: 

294 del self._connections_cache[connection_key] 

295 print(f"🗑️ 已移除Redis连接: {connection_key}") 

296 return True 

297 return False 

298 

299 except Exception as e: 

300 print(f"❌ 移除连接失败: {e}") 

301 return False 

302 

303 

304# 全局Redis连接工厂实例 

305redis_connection_factory = RedisConnectionFactory()