WebSocket 模块架构文档
本文档描述 WebSocket 服务模块的架构设计、协议定义、消息格式和实时通信机制。
概述
WebSocket 模块是 Fire 量化交易系统的实时通信基础设施,负责提供双向实时数据推送、任务进度监控、日志流式传输和系统状态广播。系统采用任务隔离设计,每个长时间运行的任务拥有独立的 WebSocket 连接和消息队列,确保消息可靠传递和性能隔离。
核心功能
实时数据推送
- 市场行情推送: 毫秒级行情数据分发
- 交易状态更新: 订单状态、成交通知实时推送
- 策略信号广播: 策略产生的买卖信号实时分发
- 风险告警推送: 风控触发事件立即通知
任务进度监控
- 导入进度追踪: 大量数据导入的实时进度
- 回测状态更新: 策略回测进度和结果推送
- 训练进度监控: 模型训练过程实时反馈
- 批处理任务跟踪: 批量操作的进度和状态
日志流式传输
- 策略运行日志: 策略执行过程的详细日志
- 系统调试信息: Debug 级别日志实时查看
- 错误告警推送: 异常和错误立即推送
- 审计日志流: 操作审计日志实时传输
设计原则
- 任务隔离: 每个任务独立连接和队列
- 异步非阻塞: 全异步架构,避免阻塞
- 自动重连: 连接断开自动恢复
- 背压控制: 队列满时的流量控制
- 内存安全: 自动清理过期连接和日志
组件
组件架构图
graph TB
subgraph "客户端层"
RC[React Client]
WSC[WebSocket Context]
ARM[Auto Reconnect Manager]
end
subgraph "服务端层"
WSF[WebSocketFactory<br/>连接工厂]
TWS[TaskWebSocketService<br/>任务服务]
CM[ConnectionManager<br/>连接管理]
AQ[AsyncQueue<br/>异步队列]
end
subgraph "业务层"
DI[Data Import<br/>数据导入]
TM[Trading Monitor<br/>交易监控]
SE[Strategy Engine<br/>策略引擎]
BT[Backtest<br/>回测系统]
end
subgraph "存储层"
MLS[Memory Log Store<br/>内存日志]
RC2[(Redis Cache<br/>缓存)]
end
RC --> WSC
WSC --> ARM
ARM <--> WSF
WSF --> TWS
TWS --> CM
TWS --> AQ
TWS --> MLS
MLS --> RC2
DI --> WSF
TM --> WSF
SE --> WSF
BT --> WSF
style WSF fill:#e1f5ff
style TWS fill:#e8f5e9
style AQ fill:#fff4e6
核心组件说明
WebSocketServiceFactory (WebSocket 服务工厂)
- 职责: 管理多个任务专用的 WebSocket 服务实例
- 实现类:
backend.core.services.websocket_service_factory.WebSocketServiceFactory - 主要方法:
get_or_create_service(): 获取或创建任务服务remove_service(): 移除任务服务broadcast(): 广播消息到所有连接cleanup(): 清理过期连接
- 状态管理: 有状态(维护服务实例池)
TaskWebSocketService (任务 WebSocket 服务)
- 职责: 为单个任务提供独立的 WebSocket 连接和消息队列
- 实现类:
backend.core.services.websocket_service_factory.TaskWebSocketService - 主要功能:
- 独立的消息队列
- 连接生命周期管理
- 日志缓冲和批量发送
- 自动重连机制
- 状态管理: 有状态(维护连接和队列)
ConnectionManager (连接管理器)
- 职责: 管理 WebSocket 连接的生命周期
- 主要功能:
- 连接建立和关闭
- 心跳检测
- 异常处理
- 重连策略
AsyncQueue (异步队列)
- 职责: 处理消息队列,避免事件循环冲突
- 主要功能:
- 异步消息入队
- 批量消息处理
- 背压控制
- 队列溢出处理
依赖关系
内部依赖
| 模块名 | 版本 | 用途 | 是否必需 |
|---|---|---|---|
core.models.task |
- | 任务模型 | 是 |
core.services.logging |
- | 日志服务 | 是 |
infrastructure.redis |
- | 缓存支持 | 否 |
utils.async_helpers |
- | 异步工具 | 是 |
外部依赖
| 库名 | 版本要求 | 用途 | License |
|---|---|---|---|
fastapi |
>=0.100.0 | WebSocket 支持 | MIT |
websockets |
>=11.0 | WebSocket 协议 | BSD |
asyncio |
标准库 | 异步支持 | Python |
json |
标准库 | 消息序列化 | Python |
数据流
WebSocket 连接建立流程
sequenceDiagram
participant Client as 客户端
participant Server as 服务器
participant Factory as 工厂
participant Service as 任务服务
participant Queue as 队列
Client->>Server: WebSocket 连接请求
Server->>Factory: 获取或创建服务
Factory->>Service: 创建任务服务
Service->>Queue: 初始化消息队列
Service-->>Factory: 返回服务实例
Factory-->>Server: 返回服务
Server->>Service: 建立连接
Service-->>Client: 连接确认
Service->>Client: 发送历史日志
消息推送流程
flowchart LR
Start([业务事件]) --> Generate[生成消息]
Generate --> Enqueue[加入队列]
Enqueue --> Check{队列满?}
Check -->|是| Drop[丢弃旧消息]
Check -->|否| Process[处理消息]
Drop --> Process
Process --> Batch[批量打包]
Batch --> Send[发送客户端]
Send --> Confirm{确认接收?}
Confirm -->|是| End([完成])
Confirm -->|否| Retry[重试机制]
Retry --> Send
style Generate fill:#e1f5ff
style Batch fill:#fff4e6
数据结构
消息格式
WebSocket 消息结构:
@dataclass
class WebSocketMessage:
type: str # 消息类型
task_id: str # 任务ID
timestamp: datetime # 时间戳
data: Dict[str, Any] # 消息数据
# 消息类型枚举
# - progress: 进度更新
# - log: 日志消息
# - status: 状态变更
# - data: 数据推送
# - error: 错误通知
协议定义
消息类型定义:
| 类型 | 用途 | 数据格式 |
|---|---|---|
progress |
进度更新 | {current: int, total: int, message: str} |
log |
日志消息 | {level: str, message: str, source: str} |
status |
状态变更 | {old_status: str, new_status: str, reason: str} |
data |
数据推送 | {type: str, payload: any} |
error |
错误通知 | {code: str, message: str, details: any} |
接口
Python API
WebSocket 服务接口
class WebSocketService:
"""WebSocket 服务接口"""
async def connect(self, websocket: WebSocket):
"""建立 WebSocket 连接"""
pass
async def disconnect(self):
"""断开 WebSocket 连接"""
pass
async def send_message(self, message: WebSocketMessage):
"""发送消息"""
pass
async def broadcast(self, message: WebSocketMessage):
"""广播消息到所有连接"""
pass
客户端使用示例
// React 客户端示例
import { useWebSocket } from '@/contexts/WebSocketContext';
function TaskMonitor({ taskId }) {
const { messages, status, reconnect } = useWebSocket(taskId);
useEffect(() => {
// 监听任务消息
const filtered = messages.filter(m => m.task_id === taskId);
// 处理消息...
}, [messages]);
return (
<div>
<ConnectionStatus status={status} />
<MessageList messages={messages} />
</div>
);
}
WebSocket 端点
| 端点 | 描述 | 认证 |
|---|---|---|
/ws/task/{task_id} |
任务专用 WebSocket | JWT Token |
/ws/market |
市场数据 WebSocket | JWT Token |
/ws/trading |
交易通知 WebSocket | JWT Token |
/ws/system |
系统状态 WebSocket | Admin Only |
扩展点
自定义消息处理器
from backend.core.services.websocket import MessageHandler
class CustomMessageHandler(MessageHandler):
"""自定义消息处理器"""
async def handle(self, message: WebSocketMessage):
"""处理自定义消息类型"""
# 实现消息处理逻辑
pass
注册自定义处理器
from backend.core.services.websocket import HandlerRegistry
# 注册处理器
HandlerRegistry.register(
message_type="custom_type",
handler_class=CustomMessageHandler
)
配置
实际配置方式
WebSocket 模块的配置通过以下方式管理:
- 连接配置: 通过环境变量配置
- 队列参数: 通过
infrastructure/config/settings.py - 心跳设置: 运行时参数
配置参数
| 参数名 | 默认值 | 说明 |
|---|---|---|
WS_MAX_CONNECTIONS |
100 | 最大并发连接数 |
WS_QUEUE_SIZE |
1000 | 消息队列大小 |
WS_BATCH_SIZE |
10 | 批量发送大小 |
WS_HEARTBEAT_INTERVAL |
30 | 心跳间隔(秒) |
WS_LOG_BUFFER_SIZE |
500 | 日志缓冲大小 |
WS_RECONNECT_INTERVAL |
5 | 重连间隔(秒) |
客户端配置
// WebSocket 客户端配置
export const WS_CONFIG = {
reconnect: true, // 启用自动重连
reconnectInterval: 5000, // 重连间隔
maxReconnectAttempts: 10, // 最大重连次数
heartbeatInterval: 30000, // 心跳间隔
messageQueueSize: 100, // 客户端队列大小
};