Coverage for core/services/websocket_service.py: 54.61%
141 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-13 18:58 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-13 18:58 +0000
1"""
2统一的WebSocket日志服务
3提供全局的WebSocket日志发送功能
4"""
6import asyncio
7import json
8import logging
9import queue
10import threading
11import time
12from datetime import datetime
13from typing import Any, Dict, Optional
15from fastapi import WebSocket, WebSocketDisconnect
17logger = logging.getLogger(__name__)
20class UnifiedWebSocketService:
21 """统一的WebSocket服务"""
23 def __init__(self):
24 # 存储活跃的WebSocket连接
25 self.active_connections: Dict[str, WebSocket] = {}
26 # 存储任务日志
27 self.task_logs: Dict[str, list] = {}
28 # 日志队列
29 self.log_queue = queue.Queue()
30 # 日志处理线程状态
31 self.log_thread_running = False
32 # 启动日志处理线程
33 self._start_log_processor()
35 def _start_log_processor(self):
36 """启动日志处理线程"""
37 if not self.log_thread_running:
38 self.log_thread_running = True
39 thread = threading.Thread(target=self._process_log_queue, daemon=True)
40 thread.start()
41 logger.info("WebSocket日志处理线程已启动")
43 def _process_log_queue(self):
44 """处理日志队列"""
45 while self.log_thread_running:
46 try:
47 # 从队列获取日志消息
48 task_id, message, log_type = self.log_queue.get(timeout=1)
50 # 检查连接是否存在
51 if task_id not in self.active_connections:
52 continue
54 # 发送日志
55 try:
56 # 创建新的事件循环来处理异步发送
57 loop = asyncio.new_event_loop()
58 asyncio.set_event_loop(loop)
59 loop.run_until_complete(
60 self._send_log_async(task_id, message, log_type)
61 )
62 loop.close()
63 except Exception as e:
64 logger.error(f"WebSocket发送日志失败: {e}")
66 except queue.Empty:
67 continue
68 except Exception as e:
69 logger.error(f"处理日志队列失败: {e}")
70 time.sleep(1)
72 self.log_thread_running = False
74 async def _send_log_async(self, task_id: str, message: str, log_type: str = "log"):
75 """异步发送日志"""
76 if task_id not in self.active_connections:
77 return
79 try:
80 websocket = self.active_connections[task_id]
82 # 添加时间戳
83 timestamp = datetime.now().isoformat()
84 log_message = f"[{timestamp}] {message}"
86 # 添加到日志列表
87 if task_id not in self.task_logs:
88 self.task_logs[task_id] = []
89 self.task_logs[task_id].append(log_message)
91 # 发送到WebSocket
92 await websocket.send_text(
93 json.dumps(
94 {
95 "type": log_type,
96 "message": log_message,
97 "timestamp": timestamp,
98 "task_id": task_id,
99 }
100 )
101 )
103 logger.debug(f"WebSocket日志发送成功: {task_id} - {message[:50]}...")
105 except Exception as e:
106 logger.error(f"WebSocket发送失败: {e}")
107 # 如果发送失败,断开连接
108 self.disconnect(task_id)
110 async def connect(self, websocket: WebSocket, task_id: str):
111 """接受WebSocket连接"""
112 await websocket.accept()
113 self.active_connections[task_id] = websocket
114 self.task_logs[task_id] = []
115 print(f"✅ WebSocket连接已建立,任务ID: {task_id}")
116 logger.info(f"WebSocket连接已建立,任务ID: {task_id}")
118 def disconnect(self, task_id: str):
119 """断开WebSocket连接"""
120 if task_id in self.active_connections:
121 del self.active_connections[task_id]
122 if task_id in self.task_logs:
123 del self.task_logs[task_id]
124 logger.info(f"WebSocket连接已断开,任务ID: {task_id}")
126 def send_log(self, task_id: str, message: str, log_type: str = "log"):
127 """发送日志消息(线程安全)"""
128 try:
129 # 将日志消息放入队列
130 self.log_queue.put((task_id, message, log_type))
131 except Exception as e:
132 logger.error(f"发送日志到队列失败: {e}")
134 def _sync_send_log(self, task_id: str, message: str, log_type: str = "log"):
135 """同步发送日志(备用方案)"""
136 try:
137 # 直接发送到WebSocket,不经过队列
138 if task_id in self.active_connections:
139 websocket = self.active_connections[task_id]
140 timestamp = datetime.now().isoformat()
141 log_message = f"[{timestamp}] {message}"
143 # 添加到日志列表
144 if task_id not in self.task_logs:
145 self.task_logs[task_id] = []
146 self.task_logs[task_id].append(log_message)
148 # 尝试发送(可能会失败,但不影响主流程)
149 try:
150 import asyncio
152 loop = asyncio.new_event_loop()
153 asyncio.set_event_loop(loop)
154 loop.run_until_complete(
155 websocket.send_text(
156 json.dumps(
157 {
158 "type": log_type,
159 "message": log_message,
160 "timestamp": timestamp,
161 "task_id": task_id,
162 }
163 )
164 )
165 )
166 loop.close()
167 except Exception:
168 pass # WebSocket发送失败不影响主流程
170 except Exception as e:
171 logger.error(f"同步发送日志失败: {e}")
173 def send_status(
174 self, task_id: str, status: str, progress: int, data_count: int = 0
175 ):
176 """发送状态更新"""
177 status_message = f"状态: {status}, 进度: {progress}%, 数据量: {data_count}"
178 self.send_log(task_id, status_message, "status")
180 def send_error(self, task_id: str, error_message: str):
181 """发送错误消息"""
182 self.send_log(task_id, f"错误: {error_message}", "error")
184 def send_success(self, task_id: str, success_message: str):
185 """发送成功消息"""
186 self.send_log(task_id, f"成功: {success_message}", "success")
188 def get_logs(self, task_id: str) -> list:
189 """获取任务的日志列表"""
190 return self.task_logs.get(task_id, [])
192 def get_active_connections(self) -> list:
193 """获取活跃连接列表"""
194 return list(self.active_connections.keys())
196 def is_connected(self, task_id: str) -> bool:
197 """检查连接是否存在"""
198 return task_id in self.active_connections
201# 全局WebSocket服务实例
202websocket_service = UnifiedWebSocketService()
205# WebSocket端点
206async def websocket_endpoint(websocket: WebSocket, task_id: str):
207 """WebSocket端点"""
208 print(f"🔌 WebSocket端点被调用: {task_id}")
209 await websocket_service.connect(websocket, task_id)
210 print(f"✅ WebSocket连接已建立: {task_id}")
212 try:
213 # 保持连接活跃
214 while True:
215 try:
216 # 等待客户端发送消息
217 await asyncio.wait_for(websocket.receive_text(), timeout=60.0)
218 except asyncio.TimeoutError:
219 # 超时后发送ping消息保持连接
220 try:
221 await websocket.send_text(
222 json.dumps(
223 {
224 "type": "ping",
225 "timestamp": datetime.now().isoformat(),
226 "task_id": task_id,
227 }
228 )
229 )
230 print(f"🏓 发送ping消息到 {task_id}")
231 except Exception as e:
232 print(f"❌ 发送ping失败: {e}")
233 break
234 except WebSocketDisconnect:
235 print(f"🔌 WebSocket断开连接: {task_id}")
236 break
237 except Exception as e:
238 print(f"❌ WebSocket错误: {e}")
239 break
240 except WebSocketDisconnect:
241 print(f"🔌 WebSocket断开连接: {task_id}")
242 except Exception as e:
243 print(f"❌ WebSocket端点错误: {e}")
244 finally:
245 websocket_service.disconnect(task_id)
248# 便捷函数
249def send_websocket_log(task_id: str, message: str, log_type: str = "log"):
250 """发送WebSocket日志的便捷函数"""
251 websocket_service.send_log(task_id, message, log_type)
254def send_websocket_status(
255 task_id: str, status: str, progress: int, data_count: int = 0
256):
257 """发送WebSocket状态的便捷函数"""
258 websocket_service.send_status(task_id, status, progress, data_count)
261def send_websocket_error(task_id: str, error_message: str):
262 """发送WebSocket错误的便捷函数"""
263 websocket_service.send_error(task_id, error_message)
266def send_websocket_success(task_id: str, success_message: str):
267 """发送WebSocket成功的便捷函数"""
268 websocket_service.send_success(task_id, success_message)