🇨🇳 简体中文
🇺🇸 English
🇯🇵 日本語
Skip to the content.

WebSocket 模块架构文档

本文档描述 WebSocket 服务模块的架构设计、协议定义、消息格式和实时通信机制。

概述

WebSocket 模块是 Fire 量化交易系统的实时通信基础设施,负责提供双向实时数据推送、任务进度监控、日志流式传输和系统状态广播。系统采用任务隔离设计,每个长时间运行的任务拥有独立的 WebSocket 连接和消息队列,确保消息可靠传递和性能隔离。

核心功能

实时数据推送

任务进度监控

日志流式传输

设计原则

组件

组件架构图

graph TB
    subgraph "客户端层"
        RC[React Client]
        WSC[WebSocket Context]
        ARM[Auto Reconnect Manager]
    end

    subgraph "服务端层"
        WSF[WebSocketFactory<br/>连接工厂]
        TWS[TaskWebSocketService<br/>任务服务]
        CM[ConnectionManager<br/>连接管理]
        AQ[AsyncQueue<br/>异步队列]
    end

    subgraph "业务层"
        DI[Data Import<br/>数据导入]
        TM[Trading Monitor<br/>交易监控]
        SE[Strategy Engine<br/>策略引擎]
        BT[Backtest<br/>回测系统]
    end

    subgraph "存储层"
        MLS[Memory Log Store<br/>内存日志]
        RC2[(Redis Cache<br/>缓存)]
    end

    RC --> WSC
    WSC --> ARM
    ARM <--> WSF
    WSF --> TWS
    TWS --> CM
    TWS --> AQ
    TWS --> MLS
    MLS --> RC2

    DI --> WSF
    TM --> WSF
    SE --> WSF
    BT --> WSF

    style WSF fill:#e1f5ff
    style TWS fill:#e8f5e9
    style AQ fill:#fff4e6

核心组件说明

WebSocketServiceFactory (WebSocket 服务工厂)

TaskWebSocketService (任务 WebSocket 服务)

ConnectionManager (连接管理器)

AsyncQueue (异步队列)

依赖关系

内部依赖

模块名 版本 用途 是否必需
core.models.task - 任务模型
core.services.logging - 日志服务
infrastructure.redis - 缓存支持
utils.async_helpers - 异步工具

外部依赖

库名 版本要求 用途 License
fastapi >=0.100.0 WebSocket 支持 MIT
websockets >=11.0 WebSocket 协议 BSD
asyncio 标准库 异步支持 Python
json 标准库 消息序列化 Python

数据流

WebSocket 连接建立流程

sequenceDiagram
    participant Client as 客户端
    participant Server as 服务器
    participant Factory as 工厂
    participant Service as 任务服务
    participant Queue as 队列

    Client->>Server: WebSocket 连接请求
    Server->>Factory: 获取或创建服务
    Factory->>Service: 创建任务服务
    Service->>Queue: 初始化消息队列
    Service-->>Factory: 返回服务实例
    Factory-->>Server: 返回服务
    Server->>Service: 建立连接
    Service-->>Client: 连接确认
    Service->>Client: 发送历史日志

消息推送流程

flowchart LR
    Start([业务事件]) --> Generate[生成消息]
    Generate --> Enqueue[加入队列]
    Enqueue --> Check{队列满?}
    Check -->|是| Drop[丢弃旧消息]
    Check -->|否| Process[处理消息]
    Drop --> Process
    Process --> Batch[批量打包]
    Batch --> Send[发送客户端]
    Send --> Confirm{确认接收?}
    Confirm -->|是| End([完成])
    Confirm -->|否| Retry[重试机制]
    Retry --> Send

    style Generate fill:#e1f5ff
    style Batch fill:#fff4e6

数据结构

消息格式

WebSocket 消息结构:

@dataclass
class WebSocketMessage:
    type: str                    # 消息类型
    task_id: str                # 任务ID
    timestamp: datetime         # 时间戳
    data: Dict[str, Any]       # 消息数据

    # 消息类型枚举
    # - progress: 进度更新
    # - log: 日志消息
    # - status: 状态变更
    # - data: 数据推送
    # - error: 错误通知

协议定义

消息类型定义:

类型 用途 数据格式
progress 进度更新 {current: int, total: int, message: str}
log 日志消息 {level: str, message: str, source: str}
status 状态变更 {old_status: str, new_status: str, reason: str}
data 数据推送 {type: str, payload: any}
error 错误通知 {code: str, message: str, details: any}

接口

Python API

WebSocket 服务接口

class WebSocketService:
    """WebSocket 服务接口"""

    async def connect(self, websocket: WebSocket):
        """建立 WebSocket 连接"""
        pass

    async def disconnect(self):
        """断开 WebSocket 连接"""
        pass

    async def send_message(self, message: WebSocketMessage):
        """发送消息"""
        pass

    async def broadcast(self, message: WebSocketMessage):
        """广播消息到所有连接"""
        pass

客户端使用示例

// React 客户端示例
import { useWebSocket } from '@/contexts/WebSocketContext';

function TaskMonitor({ taskId }) {
  const { messages, status, reconnect } = useWebSocket(taskId);

  useEffect(() => {
    // 监听任务消息
    const filtered = messages.filter(m => m.task_id === taskId);
    // 处理消息...
  }, [messages]);

  return (
    <div>
      <ConnectionStatus status={status} />
      <MessageList messages={messages} />
    </div>
  );
}

WebSocket 端点

端点 描述 认证
/ws/task/{task_id} 任务专用 WebSocket JWT Token
/ws/market 市场数据 WebSocket JWT Token
/ws/trading 交易通知 WebSocket JWT Token
/ws/system 系统状态 WebSocket Admin Only

扩展点

自定义消息处理器

from backend.core.services.websocket import MessageHandler

class CustomMessageHandler(MessageHandler):
    """自定义消息处理器"""

    async def handle(self, message: WebSocketMessage):
        """处理自定义消息类型"""
        # 实现消息处理逻辑
        pass

注册自定义处理器

from backend.core.services.websocket import HandlerRegistry

# 注册处理器
HandlerRegistry.register(
    message_type="custom_type",
    handler_class=CustomMessageHandler
)

配置

实际配置方式

WebSocket 模块的配置通过以下方式管理:

  1. 连接配置: 通过环境变量配置
  2. 队列参数: 通过 infrastructure/config/settings.py
  3. 心跳设置: 运行时参数

配置参数

参数名 默认值 说明
WS_MAX_CONNECTIONS 100 最大并发连接数
WS_QUEUE_SIZE 1000 消息队列大小
WS_BATCH_SIZE 10 批量发送大小
WS_HEARTBEAT_INTERVAL 30 心跳间隔(秒)
WS_LOG_BUFFER_SIZE 500 日志缓冲大小
WS_RECONNECT_INTERVAL 5 重连间隔(秒)

客户端配置

// WebSocket 客户端配置
export const WS_CONFIG = {
  reconnect: true,              // 启用自动重连
  reconnectInterval: 5000,      // 重连间隔
  maxReconnectAttempts: 10,     // 最大重连次数
  heartbeatInterval: 30000,     // 心跳间隔
  messageQueueSize: 100,        // 客户端队列大小
};

相关文档