Coverage for core/services/websocket_service.py: 54.61%

141 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-13 18:58 +0000

1""" 

2统一的WebSocket日志服务 

3提供全局的WebSocket日志发送功能 

4""" 

5 

6import asyncio 

7import json 

8import logging 

9import queue 

10import threading 

11import time 

12from datetime import datetime 

13from typing import Any, Dict, Optional 

14 

15from fastapi import WebSocket, WebSocketDisconnect 

16 

17logger = logging.getLogger(__name__) 

18 

19 

20class UnifiedWebSocketService: 

21 """统一的WebSocket服务""" 

22 

23 def __init__(self): 

24 # 存储活跃的WebSocket连接 

25 self.active_connections: Dict[str, WebSocket] = {} 

26 # 存储任务日志 

27 self.task_logs: Dict[str, list] = {} 

28 # 日志队列 

29 self.log_queue = queue.Queue() 

30 # 日志处理线程状态 

31 self.log_thread_running = False 

32 # 启动日志处理线程 

33 self._start_log_processor() 

34 

35 def _start_log_processor(self): 

36 """启动日志处理线程""" 

37 if not self.log_thread_running: 

38 self.log_thread_running = True 

39 thread = threading.Thread(target=self._process_log_queue, daemon=True) 

40 thread.start() 

41 logger.info("WebSocket日志处理线程已启动") 

42 

43 def _process_log_queue(self): 

44 """处理日志队列""" 

45 while self.log_thread_running: 

46 try: 

47 # 从队列获取日志消息 

48 task_id, message, log_type = self.log_queue.get(timeout=1) 

49 

50 # 检查连接是否存在 

51 if task_id not in self.active_connections: 

52 continue 

53 

54 # 发送日志 

55 try: 

56 # 创建新的事件循环来处理异步发送 

57 loop = asyncio.new_event_loop() 

58 asyncio.set_event_loop(loop) 

59 loop.run_until_complete( 

60 self._send_log_async(task_id, message, log_type) 

61 ) 

62 loop.close() 

63 except Exception as e: 

64 logger.error(f"WebSocket发送日志失败: {e}") 

65 

66 except queue.Empty: 

67 continue 

68 except Exception as e: 

69 logger.error(f"处理日志队列失败: {e}") 

70 time.sleep(1) 

71 

72 self.log_thread_running = False 

73 

74 async def _send_log_async(self, task_id: str, message: str, log_type: str = "log"): 

75 """异步发送日志""" 

76 if task_id not in self.active_connections: 

77 return 

78 

79 try: 

80 websocket = self.active_connections[task_id] 

81 

82 # 添加时间戳 

83 timestamp = datetime.now().isoformat() 

84 log_message = f"[{timestamp}] {message}" 

85 

86 # 添加到日志列表 

87 if task_id not in self.task_logs: 

88 self.task_logs[task_id] = [] 

89 self.task_logs[task_id].append(log_message) 

90 

91 # 发送到WebSocket 

92 await websocket.send_text( 

93 json.dumps( 

94 { 

95 "type": log_type, 

96 "message": log_message, 

97 "timestamp": timestamp, 

98 "task_id": task_id, 

99 } 

100 ) 

101 ) 

102 

103 logger.debug(f"WebSocket日志发送成功: {task_id} - {message[:50]}...") 

104 

105 except Exception as e: 

106 logger.error(f"WebSocket发送失败: {e}") 

107 # 如果发送失败,断开连接 

108 self.disconnect(task_id) 

109 

110 async def connect(self, websocket: WebSocket, task_id: str): 

111 """接受WebSocket连接""" 

112 await websocket.accept() 

113 self.active_connections[task_id] = websocket 

114 self.task_logs[task_id] = [] 

115 print(f"✅ WebSocket连接已建立,任务ID: {task_id}") 

116 logger.info(f"WebSocket连接已建立,任务ID: {task_id}") 

117 

118 def disconnect(self, task_id: str): 

119 """断开WebSocket连接""" 

120 if task_id in self.active_connections: 

121 del self.active_connections[task_id] 

122 if task_id in self.task_logs: 

123 del self.task_logs[task_id] 

124 logger.info(f"WebSocket连接已断开,任务ID: {task_id}") 

125 

126 def send_log(self, task_id: str, message: str, log_type: str = "log"): 

127 """发送日志消息(线程安全)""" 

128 try: 

129 # 将日志消息放入队列 

130 self.log_queue.put((task_id, message, log_type)) 

131 except Exception as e: 

132 logger.error(f"发送日志到队列失败: {e}") 

133 

134 def _sync_send_log(self, task_id: str, message: str, log_type: str = "log"): 

135 """同步发送日志(备用方案)""" 

136 try: 

137 # 直接发送到WebSocket,不经过队列 

138 if task_id in self.active_connections: 

139 websocket = self.active_connections[task_id] 

140 timestamp = datetime.now().isoformat() 

141 log_message = f"[{timestamp}] {message}" 

142 

143 # 添加到日志列表 

144 if task_id not in self.task_logs: 

145 self.task_logs[task_id] = [] 

146 self.task_logs[task_id].append(log_message) 

147 

148 # 尝试发送(可能会失败,但不影响主流程) 

149 try: 

150 import asyncio 

151 

152 loop = asyncio.new_event_loop() 

153 asyncio.set_event_loop(loop) 

154 loop.run_until_complete( 

155 websocket.send_text( 

156 json.dumps( 

157 { 

158 "type": log_type, 

159 "message": log_message, 

160 "timestamp": timestamp, 

161 "task_id": task_id, 

162 } 

163 ) 

164 ) 

165 ) 

166 loop.close() 

167 except Exception: 

168 pass # WebSocket发送失败不影响主流程 

169 

170 except Exception as e: 

171 logger.error(f"同步发送日志失败: {e}") 

172 

173 def send_status( 

174 self, task_id: str, status: str, progress: int, data_count: int = 0 

175 ): 

176 """发送状态更新""" 

177 status_message = f"状态: {status}, 进度: {progress}%, 数据量: {data_count}" 

178 self.send_log(task_id, status_message, "status") 

179 

180 def send_error(self, task_id: str, error_message: str): 

181 """发送错误消息""" 

182 self.send_log(task_id, f"错误: {error_message}", "error") 

183 

184 def send_success(self, task_id: str, success_message: str): 

185 """发送成功消息""" 

186 self.send_log(task_id, f"成功: {success_message}", "success") 

187 

188 def get_logs(self, task_id: str) -> list: 

189 """获取任务的日志列表""" 

190 return self.task_logs.get(task_id, []) 

191 

192 def get_active_connections(self) -> list: 

193 """获取活跃连接列表""" 

194 return list(self.active_connections.keys()) 

195 

196 def is_connected(self, task_id: str) -> bool: 

197 """检查连接是否存在""" 

198 return task_id in self.active_connections 

199 

200 

201# 全局WebSocket服务实例 

202websocket_service = UnifiedWebSocketService() 

203 

204 

205# WebSocket端点 

206async def websocket_endpoint(websocket: WebSocket, task_id: str): 

207 """WebSocket端点""" 

208 print(f"🔌 WebSocket端点被调用: {task_id}") 

209 await websocket_service.connect(websocket, task_id) 

210 print(f"✅ WebSocket连接已建立: {task_id}") 

211 

212 try: 

213 # 保持连接活跃 

214 while True: 

215 try: 

216 # 等待客户端发送消息 

217 await asyncio.wait_for(websocket.receive_text(), timeout=60.0) 

218 except asyncio.TimeoutError: 

219 # 超时后发送ping消息保持连接 

220 try: 

221 await websocket.send_text( 

222 json.dumps( 

223 { 

224 "type": "ping", 

225 "timestamp": datetime.now().isoformat(), 

226 "task_id": task_id, 

227 } 

228 ) 

229 ) 

230 print(f"🏓 发送ping消息到 {task_id}") 

231 except Exception as e: 

232 print(f"❌ 发送ping失败: {e}") 

233 break 

234 except WebSocketDisconnect: 

235 print(f"🔌 WebSocket断开连接: {task_id}") 

236 break 

237 except Exception as e: 

238 print(f"❌ WebSocket错误: {e}") 

239 break 

240 except WebSocketDisconnect: 

241 print(f"🔌 WebSocket断开连接: {task_id}") 

242 except Exception as e: 

243 print(f"❌ WebSocket端点错误: {e}") 

244 finally: 

245 websocket_service.disconnect(task_id) 

246 

247 

248# 便捷函数 

249def send_websocket_log(task_id: str, message: str, log_type: str = "log"): 

250 """发送WebSocket日志的便捷函数""" 

251 websocket_service.send_log(task_id, message, log_type) 

252 

253 

254def send_websocket_status( 

255 task_id: str, status: str, progress: int, data_count: int = 0 

256): 

257 """发送WebSocket状态的便捷函数""" 

258 websocket_service.send_status(task_id, status, progress, data_count) 

259 

260 

261def send_websocket_error(task_id: str, error_message: str): 

262 """发送WebSocket错误的便捷函数""" 

263 websocket_service.send_error(task_id, error_message) 

264 

265 

266def send_websocket_success(task_id: str, success_message: str): 

267 """发送WebSocket成功的便捷函数""" 

268 websocket_service.send_success(task_id, success_message)