Coverage for infrastructure/database/redis_connection_factory.py: 47.01%
134 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-13 18:58 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-13 18:58 +0000
1"""
2Redis连接工厂
3统一管理Redis连接,实现连接复用和缓存
4"""
6import os
7import threading
8from typing import Any, Dict, Optional
10import redis
11from dotenv import load_dotenv
13# 加载环境变量
14load_dotenv()
17class RedisConnectionFactory:
18 """Redis连接工厂 - 单例模式"""
20 _instance = None
21 _lock = threading.Lock()
23 def __new__(cls):
24 """单例模式实现"""
25 if cls._instance is None:
26 with cls._lock:
27 if cls._instance is None:
28 cls._instance = super().__new__(cls)
29 return cls._instance
31 def __init__(self):
32 if not hasattr(self, "_initialized"):
33 # 缓存结构: {connection_key: {client, last_used, config}}
34 self._connections_cache: Dict[str, Dict[str, Any]] = {}
35 self._max_connections = 10 # 最大连接数
36 self._initialized = True
38 def get_connection(
39 self, host: str = None, port: int = None, db: int = None, password: str = None
40 ) -> Optional[redis.Redis]:
41 """
42 获取Redis连接(复用或创建)
44 Args:
45 host: Redis主机地址
46 port: Redis端口
47 db: Redis数据库编号
48 password: Redis密码
50 Returns:
51 Redis连接实例,失败返回 None
52 """
53 try:
54 # 使用默认配置
55 if host is None:
56 host = os.getenv("REDIS_HOST", "localhost")
57 if port is None:
58 port = int(os.getenv("REDIS_PORT", 6379))
59 if db is None:
60 db = int(os.getenv("REDIS_DB", 0))
61 if password is None:
62 password = os.getenv("REDIS_PASSWORD", "")
64 # 生成连接键
65 connection_key = f"{host}:{port}:{db}:{password or 'no_password'}"
67 # 检查是否已有该配置的连接
68 if connection_key in self._connections_cache:
69 cached_connection = self._connections_cache[connection_key]
70 # 更新最后使用时间
71 cached_connection["last_used"] = self._get_current_timestamp()
72 print(f"♻️ 复用Redis连接: {connection_key}")
73 return cached_connection["client"]
75 # 检查连接数量限制
76 self._enforce_connection_limit()
78 # 创建新的连接
79 print(f"🆕 创建新的Redis连接: {connection_key}")
80 client = self._create_connection(host, port, db, password)
81 if client:
82 # 添加最后使用时间和配置
83 connection_data = {
84 "client": client,
85 "last_used": self._get_current_timestamp(),
86 "config": {
87 "host": host,
88 "port": port,
89 "db": db,
90 "password": password,
91 },
92 }
93 self._connections_cache[connection_key] = connection_data
94 print(
95 f"✅ Redis连接创建成功并缓存,当前连接数量: {len(self._connections_cache)}"
96 )
97 return client
99 return None
101 except Exception as e:
102 print(f"❌ 获取Redis连接失败: {e}")
103 return None
105 def _get_current_timestamp(self) -> float:
106 """获取当前时间戳"""
107 import time
109 return time.time()
111 def _enforce_connection_limit(self):
112 """强制执行连接限制(LRU淘汰)"""
113 if len(self._connections_cache) >= self._max_connections:
114 # 按最后使用时间排序,删除最久未使用的
115 sorted_connections = sorted(
116 self._connections_cache.items(), key=lambda x: x[1].get("last_used", 0)
117 )
119 # 删除最久未使用的连接
120 oldest_key, oldest_connection = sorted_connections[0]
121 del self._connections_cache[oldest_key]
122 print(f"🗑️ LRU淘汰最久未使用的Redis连接: {oldest_key}")
124 def _create_connection(
125 self, host: str, port: int, db: int, password: str
126 ) -> Optional[redis.Redis]:
127 """创建Redis连接"""
128 try:
129 if password:
130 client = redis.Redis(
131 host=host,
132 port=port,
133 db=db,
134 password=password,
135 decode_responses=True,
136 socket_connect_timeout=5,
137 socket_timeout=5,
138 health_check_interval=30,
139 )
140 else:
141 client = redis.Redis(
142 host=host,
143 port=port,
144 db=db,
145 decode_responses=True,
146 socket_connect_timeout=5,
147 socket_timeout=5,
148 health_check_interval=30,
149 )
151 # 测试连接
152 client.ping()
153 print(f"✅ Redis连接测试成功: {host}:{port}")
154 return client
156 except Exception as e:
157 print(f"❌ Redis连接创建失败: {e}")
158 return None
160 def is_connection_available(
161 self, host: str = None, port: int = None, db: int = None, password: str = None
162 ) -> bool:
163 """检查连接是否可用"""
164 connection = self.get_connection(host, port, db, password)
165 return connection is not None
167 def refresh_connection_if_needed(
168 self, host: str = None, port: int = None, db: int = None, password: str = None
169 ) -> bool:
170 """
171 检查并刷新连接(如果配置发生变化)
173 Args:
174 host: Redis主机地址
175 port: Redis端口
176 db: Redis数据库编号
177 password: Redis密码
179 Returns:
180 是否需要刷新连接
181 """
182 try:
183 # 使用默认配置
184 if host is None:
185 host = os.getenv("REDIS_HOST", "localhost")
186 if port is None:
187 port = int(os.getenv("REDIS_PORT", 6379))
188 if db is None:
189 db = int(os.getenv("REDIS_DB", 0))
190 if password is None:
191 password = os.getenv("REDIS_PASSWORD", "")
193 connection_key = f"{host}:{port}:{db}:{password or 'no_password'}"
195 # 检查缓存中是否有该配置的连接
196 if connection_key not in self._connections_cache:
197 print(f"🔄 Redis配置变化,需要刷新连接: {connection_key}")
198 return True
200 print(f"✅ Redis配置未变化,连接可用: {connection_key}")
201 return False
203 except Exception as e:
204 print(f"❌ 检查连接刷新需求失败: {e}")
205 return True
207 def force_refresh_connection(
208 self, host: str = None, port: int = None, db: int = None, password: str = None
209 ) -> Optional[redis.Redis]:
210 """
211 强制刷新连接(清理缓存后重新创建)
213 Args:
214 host: Redis主机地址
215 port: Redis端口
216 db: Redis数据库编号
217 password: Redis密码
219 Returns:
220 新的连接实例,失败返回 None
221 """
222 try:
223 # 使用默认配置
224 if host is None:
225 host = os.getenv("REDIS_HOST", "localhost")
226 if port is None:
227 port = int(os.getenv("REDIS_PORT", 6379))
228 if db is None:
229 db = int(os.getenv("REDIS_DB", 0))
230 if password is None:
231 password = os.getenv("REDIS_PASSWORD", "")
233 connection_key = f"{host}:{port}:{db}:{password or 'no_password'}"
235 # 清理该配置的旧连接
236 if connection_key in self._connections_cache:
237 del self._connections_cache[connection_key]
238 print(f"🗑️ 清理旧Redis连接缓存: {connection_key}")
240 # 创建新的连接
241 print(f"🔄 强制刷新Redis连接: {connection_key}")
242 return self.get_connection(host, port, db, password)
244 except Exception as e:
245 print(f"❌ 强制刷新连接失败: {e}")
246 return None
248 def get_connection_info(self) -> Dict[str, Any]:
249 """获取连接信息(用于调试)"""
250 return {
251 "cached_connections": [key for key in self._connections_cache.keys()],
252 "connection_count": len(self._connections_cache),
253 "max_connections": self._max_connections,
254 "cache_usage_percent": (
255 len(self._connections_cache) / self._max_connections
256 )
257 * 100,
258 }
260 def clear_connections(self):
261 """清空所有连接缓存"""
262 self._connections_cache.clear()
263 print("🗑️ 已清空所有Redis连接缓存")
265 def remove_connection(
266 self, host: str = None, port: int = None, db: int = None, password: str = None
267 ) -> bool:
268 """
269 移除指定的连接
271 Args:
272 host: Redis主机地址
273 port: Redis端口
274 db: Redis数据库编号
275 password: Redis密码
277 Returns:
278 是否成功移除
279 """
280 try:
281 # 使用默认配置
282 if host is None:
283 host = os.getenv("REDIS_HOST", "localhost")
284 if port is None:
285 port = int(os.getenv("REDIS_PORT", 6379))
286 if db is None:
287 db = int(os.getenv("REDIS_DB", 0))
288 if password is None:
289 password = os.getenv("REDIS_PASSWORD", "")
291 connection_key = f"{host}:{port}:{db}:{password or 'no_password'}"
293 if connection_key in self._connections_cache:
294 del self._connections_cache[connection_key]
295 print(f"🗑️ 已移除Redis连接: {connection_key}")
296 return True
297 return False
299 except Exception as e:
300 print(f"❌ 移除连接失败: {e}")
301 return False
304# 全局Redis连接工厂实例
305redis_connection_factory = RedisConnectionFactory()