Coverage for infrastructure/database/redis_client.py: 34.86%
284 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-13 18:58 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-13 18:58 +0000
1"""
2Redis客户端模块
3提供Redis连接和基本操作功能
4"""
6import json
7import os
8from datetime import timedelta
9from typing import Any, Optional
11import redis
12from dotenv import load_dotenv
14from .redis_connection_factory import redis_connection_factory
16# 加载项目根目录的环境变量
17load_dotenv()
20class RedisClient:
21 """Redis客户端封装类"""
23 def __init__(self):
24 """初始化Redis连接(使用连接工厂)"""
25 self.redis_host = os.getenv("REDIS_HOST", "localhost")
26 self.redis_port = int(os.getenv("REDIS_PORT", 6379))
27 self.redis_db = int(os.getenv("REDIS_DB", 0))
28 self.redis_password = os.getenv("REDIS_PASSWORD", "")
30 # 使用连接工厂获取Redis连接
31 try:
32 self.client = redis_connection_factory.get_connection(
33 host=self.redis_host,
34 port=self.redis_port,
35 db=self.redis_db,
36 password=self.redis_password,
37 )
39 if self.client:
40 print(
41 f"✅ Redis连接成功(使用连接工厂): {self.redis_host}:{self.redis_port}"
42 )
43 else:
44 print(f"❌ Redis连接失败(连接工厂返回None)")
46 except Exception as e:
47 print(f"❌ Redis连接失败: {e}")
48 self.client = None
50 def is_connected(self) -> bool:
51 """检查Redis连接状态"""
52 if not self.client:
53 return False
54 try:
55 self.client.ping()
56 return True
57 except:
58 return False
60 def set(self, key: str, value: Any, expire: Optional[int] = None) -> bool:
61 """设置键值对"""
62 if not self.is_connected():
63 return False
65 try:
66 if isinstance(value, (dict, list)):
67 value = json.dumps(value, ensure_ascii=False)
69 if expire:
70 return self.client.setex(key, expire, value)
71 else:
72 return self.client.set(key, value)
73 except Exception as e:
74 print(f"Redis SET错误: {e}")
75 return False
77 def get(self, key: str) -> Optional[Any]:
78 """获取键值"""
79 if not self.is_connected():
80 return None
82 try:
83 value = self.client.get(key)
84 if value is None:
85 return None
87 # 尝试解析JSON
88 try:
89 return json.loads(value)
90 except:
91 return value
92 except Exception as e:
93 print(f"Redis GET错误: {e}")
94 return None
96 def delete(self, key: str) -> bool:
97 """删除键"""
98 if not self.is_connected():
99 return False
101 try:
102 return bool(self.client.delete(key))
103 except Exception as e:
104 print(f"Redis DELETE错误: {e}")
105 return False
107 def exists(self, key: str) -> bool:
108 """检查键是否存在"""
109 if not self.is_connected():
110 return False
112 try:
113 return bool(self.client.exists(key))
114 except Exception as e:
115 print(f"Redis EXISTS错误: {e}")
116 return False
118 def expire(self, key: str, seconds: int) -> bool:
119 """设置键过期时间"""
120 if not self.is_connected():
121 return False
123 try:
124 return bool(self.client.expire(key, seconds))
125 except Exception as e:
126 print(f"Redis EXPIRE错误: {e}")
127 return False
129 def ttl(self, key: str) -> int:
130 """获取键的剩余生存时间"""
131 if not self.is_connected():
132 return -1
134 try:
135 return self.client.ttl(key)
136 except Exception as e:
137 print(f"Redis TTL错误: {e}")
138 return -1
140 def keys(self, pattern: str = "*") -> list:
141 """获取匹配模式的键列表"""
142 if not self.is_connected():
143 return []
145 try:
146 return self.client.keys(pattern)
147 except Exception as e:
148 print(f"Redis KEYS错误: {e}")
149 return []
151 def flushdb(self) -> bool:
152 """清空当前数据库"""
153 if not self.is_connected():
154 return False
156 try:
157 return self.client.flushdb()
158 except Exception as e:
159 print(f"Redis FLUSHDB错误: {e}")
160 return False
162 def info(self) -> dict:
163 """获取Redis服务器信息"""
164 if not self.is_connected():
165 return {}
167 try:
168 return self.client.info()
169 except Exception as e:
170 print(f"Redis INFO错误: {e}")
171 return {}
173 def setex(self, key: str, time: int, value: Any) -> bool:
174 """设置键值对并设置过期时间"""
175 if not self.is_connected():
176 return False
178 try:
179 if isinstance(value, (dict, list)):
180 value = json.dumps(value, ensure_ascii=False)
181 return bool(self.client.setex(key, time, value))
182 except Exception as e:
183 print(f"Redis SETEX错误: {e}")
184 return False
186 def sadd(self, key: str, *values) -> int:
187 """向集合添加成员"""
188 if not self.is_connected():
189 return 0
191 try:
192 return self.client.sadd(key, *values)
193 except Exception as e:
194 print(f"Redis SADD错误: {e}")
195 return 0
197 def smembers(self, key: str) -> set:
198 """获取集合的所有成员"""
199 if not self.is_connected():
200 return set()
202 try:
203 return self.client.smembers(key)
204 except Exception as e:
205 print(f"Redis SMEMBERS错误: {e}")
206 return set()
208 def zadd(self, key: str, mapping: dict) -> int:
209 """向有序集合添加成员"""
210 if not self.is_connected():
211 return 0
213 try:
214 return self.client.zadd(key, mapping)
215 except Exception as e:
216 print(f"Redis ZADD错误: {e}")
217 return 0
219 def zrangebyscore(
220 self, key: str, min_score: float, max_score: float, withscores: bool = False
221 ) -> list:
222 """按分数范围获取有序集合成员"""
223 if not self.is_connected():
224 return []
226 try:
227 return self.client.zrangebyscore(
228 key, min_score, max_score, withscores=withscores
229 )
230 except Exception as e:
231 print(f"Redis ZRANGEBYSCORE错误: {e}")
232 return []
234 def zrem(self, key: str, *members) -> int:
235 """从有序集合删除成员"""
236 if not self.is_connected():
237 return 0
239 try:
240 return self.client.zrem(key, *members)
241 except Exception as e:
242 print(f"Redis ZREM错误: {e}")
243 return 0
245 def srem(self, key: str, *members) -> int:
246 """从集合中移除成员"""
247 if not self.is_connected():
248 return 0
250 try:
251 return self.client.srem(key, *members)
252 except Exception as e:
253 print(f"Redis SREM错误: {e}")
254 return 0
256 def hset(
257 self, key: str, field: str = None, value: Any = None, mapping: dict = None
258 ) -> int:
259 """设置哈希字段的值"""
260 try:
261 if self.is_connected():
262 if mapping is not None:
263 # 如果有mapping,使用批量设置
264 return self.client.hset(key, mapping=mapping)
265 elif field is not None and value is not None:
266 # 单个字段设置
267 return self.client.hset(key, field, value)
268 else:
269 print(f"❌ hset参数不完整")
270 return 0
271 else:
272 print(f"❌ Redis未连接,无法设置哈希字段")
273 return 0
274 except Exception as e:
275 print(f"❌ 设置哈希字段失败: {e}")
276 return 0
278 def hget(self, key: str, field: str) -> Optional[Any]:
279 """获取哈希字段的值"""
280 try:
281 if self.is_connected():
282 result = self.client.hget(key, field)
283 if result is not None:
284 # 尝试解析JSON
285 try:
286 return json.loads(result)
287 except:
288 return result
289 return None
290 else:
291 print(f"❌ Redis未连接,无法获取哈希字段")
292 return None
293 except Exception as e:
294 print(f"❌ 获取哈希字段失败: {e}")
295 return None
297 def hgetall(self, key: str) -> dict:
298 """获取哈希的所有字段和值"""
299 try:
300 if self.is_connected():
301 result = self.client.hgetall(key)
302 # 尝试解析每个值的JSON
303 parsed_result = {}
304 for field, value in result.items():
305 try:
306 parsed_result[field] = json.loads(value)
307 except:
308 parsed_result[field] = value
309 return parsed_result
310 else:
311 print(f"❌ Redis未连接,无法获取哈希所有字段")
312 return {}
313 except Exception as e:
314 print(f"❌ 获取哈希所有字段失败: {e}")
315 return {}
317 def hdel(self, key: str, *fields) -> int:
318 """删除哈希字段"""
319 try:
320 if self.is_connected():
321 return self.client.hdel(key, *fields)
322 else:
323 print(f"❌ Redis未连接,无法删除哈希字段")
324 return 0
325 except Exception as e:
326 print(f"❌ 删除哈希字段失败: {e}")
327 return 0
329 def lpush(self, key: str, *values) -> int:
330 """向列表左侧添加元素"""
331 try:
332 if self.is_connected():
333 return self.client.lpush(key, *values)
334 else:
335 print(f"❌ Redis未连接,无法向列表添加元素")
336 return 0
337 except Exception as e:
338 print(f"❌ 向列表添加元素失败: {e}")
339 return 0
341 def rpush(self, key: str, *values) -> int:
342 """向列表右侧添加元素"""
343 try:
344 if self.is_connected():
345 return self.client.rpush(key, *values)
346 else:
347 print(f"❌ Redis未连接,无法向列表添加元素")
348 return 0
349 except Exception as e:
350 print(f"❌ 向列表添加元素失败: {e}")
351 return 0
353 def lrange(self, key: str, start: int, end: int) -> list:
354 """获取列表指定范围的元素"""
355 try:
356 if self.is_connected():
357 result = self.client.lrange(key, start, end)
358 # 解码字节字符串
359 return [
360 item.decode("utf-8") if isinstance(item, bytes) else item
361 for item in result
362 ]
363 else:
364 print(f"❌ Redis未连接,无法获取列表元素")
365 return []
366 except Exception as e:
367 print(f"❌ 获取列表元素失败: {e}")
368 return []
370 def llen(self, key: str) -> int:
371 """获取列表长度"""
372 try:
373 if self.is_connected():
374 return self.client.llen(key)
375 else:
376 print(f"❌ Redis未连接,无法获取列表长度")
377 return 0
378 except Exception as e:
379 print(f"❌ 获取列表长度失败: {e}")
380 return 0
382 def lpop(self, key: str) -> Optional[str]:
383 """从列表左侧弹出元素"""
384 try:
385 if self.is_connected():
386 result = self.client.lpop(key)
387 return result.decode("utf-8") if isinstance(result, bytes) else result
388 else:
389 print(f"❌ Redis未连接,无法弹出列表元素")
390 return None
391 except Exception as e:
392 print(f"❌ 弹出列表元素失败: {e}")
393 return None
395 def rpop(self, key: str) -> Optional[str]:
396 """从列表右侧弹出元素"""
397 try:
398 if self.is_connected():
399 result = self.client.rpop(key)
400 return result.decode("utf-8") if isinstance(result, bytes) else result
401 else:
402 print(f"❌ Redis未连接,无法弹出列表元素")
403 return None
404 except Exception as e:
405 print(f"❌ 弹出列表元素失败: {e}")
406 return None
409# 全局Redis客户端实例
410redis_client = RedisClient()
413def get_redis() -> RedisClient:
414 """获取Redis客户端实例"""
415 return redis_client