🇨🇳 简体中文
🇺🇸 English
🇯🇵 日本語
Skip to the content.

Fire 交易架构

业务架构全景

Fire 量化交易平台的业务架构分为五个层次:数据层、决策层、仓位层、执行层和分析层。

flowchart TB
    subgraph "数据层"
        MD[市场数据<br/>Market Data]
        HD[历史数据<br/>Historical Data]
    end

    subgraph "交易核心层"
        TSE[交易会话引擎<br/>TradingSessionEngine]
        FR[Flow 运行时<br/>FlowRuntime]
    end

    subgraph "决策层"
        DIR[Director<br/>战略方向]
        EXE[Executor<br/>入场时机]
    end

    subgraph "仓位管理层"
        SP[SizingPolicy<br/>仓位计算]
        PM[PositionManager<br/>约束层叠]
    end

    subgraph "执行层"
        TE[交易引擎<br/>TradingEngine]
        OM[订单管理<br/>OrderManager]
    end

    subgraph "分析层"
        BT[回测引擎<br/>BacktestEngine]
        PA[绩效分析<br/>Performance]
    end

    MD --> TSE
    HD --> TSE
    TSE --> FR
    FR --> DIR
    FR --> EXE
    DIR --> PM
    EXE --> SP
    SP --> PM
    PM --> TE
    TE --> OM
    TSE --> BT
    BT --> PA

Director-Executor-PositionManager 核心架构

架构概念

Fire 采用 Director-Executor-PositionManager (DEP) 三层决策架构,实现”大周期定方向、小周期找时机”的多时间框架策略。

flowchart LR
    subgraph "数据层"
        MS[MarketSource<br/>市场数据源]
        DS4h[DataStream<br/>4h]
        DS15m[DataStream<br/>15m]
    end

    subgraph "决策层"
        DIR[Director<br/>战略方向判定]
        EXE[Executor<br/>入场时机判定]
    end

    subgraph "仓位管理层"
        SP[SizingPolicy<br/>仓位计算]
        PM[PositionManager<br/>约束层叠]
    end

    subgraph "执行层"
        TE[TradingEngine<br/>交易引擎]
    end

    MS --> DS4h
    MS --> DS15m
    DS4h --> DIR
    DS15m --> EXE
    DIR --> |max_long/max_short| PM
    EXE --> SP
    SP --> |suggested_target| PM
    PM --> |final_target| TE

    style DIR fill:#e1f5fe
    style EXE fill:#e1f5fe
    style PM fill:#fff3e0,stroke:#333,stroke-width:2px
    style TE fill:#c8e6c9

组件职责

组件 时间框架 职责 输出
Director 大周期 (4h, 1d) 判断市场状态和交易方向 max_long_ratio, max_short_ratio, urgency
Executor 小周期 (1h, 15m) 寻找入场时机 target_ratio, confidence
SizingPolicy - 计算建议仓位 suggested_target, stop_loss
PositionManager - 约束层叠和最终决策 final_target_ratio

模块依赖关系

flowchart LR
    subgraph "配置层"
        FC[FlowConfig<br/>Redis 存储]
        FE[FlowEditor<br/>可视化配置]
    end

    subgraph "运行层"
        FR[FlowRuntime]
        DS[DataStream]
    end

    subgraph "决策层"
        DIR[Director]
        EXE[Executor]
        PM[PositionManager]
    end

    subgraph "执行层"
        TE[TradingEngine]
    end

    FE --> FC
    FC --> FR
    DS --> FR
    FR --> DIR
    FR --> EXE
    DIR --> PM
    EXE --> PM
    PM --> TE

    style FR fill:#e1f5fe
    style PM fill:#fff3e0

核心模块

TradingSessionEngine(会话引擎)

会话引擎管理交易会话的完整生命周期。

会话状态机

stateDiagram-v2
    [*] --> Created: 创建会话
    Created --> Ready: 初始化完成
    Ready --> Running: start()
    Running --> Paused: pause()
    Paused --> Running: resume()
    Running --> Stopped: stop()
    Paused --> Stopped: stop()
    Stopped --> [*]

会话状态

状态 说明
created 会话已创建
ready 初始化完成
running 运行中
paused 已暂停
stopped 已停止
completed 已完成(回测)
error 错误状态

FlowRuntime(节点图执行引擎)

FlowRuntime 负责执行 FlowConfig 定义的节点图,按拓扑顺序处理数据流。

执行流程

flowchart LR
    Load[加载 FlowConfig] --> Validate[验证配置]
    Validate --> Init[实例化节点]
    Init --> Topo[拓扑排序]
    Topo --> Execute[按序执行]
    Execute --> Output[收集输出]

FlowRuntime 接口

class FlowRuntime:
    def __init__(self, flow_config: FlowConfig, session_id: str):
        """初始化运行时"""
        pass

    def execute_tick(self, market_data: MarketData) -> None:
        """执行一个 tick 的数据处理"""
        pass

    def get_node_outputs(self) -> Dict[str, Any]:
        """获取所有节点的输出"""
        pass

    def start(self) -> None:
        """启动运行时"""
        pass

    def stop(self) -> None:
        """停止运行时"""
        pass

验证规则

FlowValidator 执行以下验证:

  1. 连接有效性:源/目标节点必须存在
  2. 时间框架约束:Director 时间框架 ≥ Executor 时间框架
  3. Symbol 一致性:同一流程使用相同标的
  4. 循环检测:禁止循环依赖

TradingEngine(交易引擎)

TradingEngine 负责实际订单执行和持仓管理。

接口

class TradingEngine:
    async def submit_order(self, order: Order) -> OrderResult:
        """提交订单"""
        pass

    async def cancel_order(self, order_id: str) -> bool:
        """取消订单"""
        pass

    def get_positions(self) -> List[Position]:
        """获取当前持仓"""
        pass

    def get_account_balance(self) -> AccountBalance:
        """获取账户余额"""
        pass

订单执行流程

sequenceDiagram
    participant PM as PositionManager
    participant TE as TradingEngine
    participant SIM as SimulationEngine
    participant BA as BrokerAdapter

    PM->>TE: final_target_ratio
    TE->>TE: 计算订单数量

    alt 模拟模式
        TE->>SIM: 模拟执行
        SIM-->>TE: 模拟成交
    else 实盘模式
        TE->>BA: 发送订单
        BA-->>TE: 成交回报
    end

    TE->>TE: 更新持仓

SimulationEngine(模拟引擎)

SimulationEngine 提供模拟交易执行,包括滑点和手续费计算。

参数 默认值 说明
滑点率 0.1% 模拟市场冲击
手续费率 0.03% 交易佣金

Flow 系统

FlowConfig 结构

策略流程通过 Flow Editor 进行可视化配置,持久化为 FlowConfig 存储在 Redis。

flowchart TB
    subgraph "FlowConfig"
        Nodes[节点列表<br/>Director/Executor/Output]
        Edges[连接关系<br/>数据流向]
        Params[节点参数<br/>策略配置]
    end

    subgraph "节点类型"
        DN[DirectorNode<br/>大周期策略]
        EN[ExecutorNode<br/>小周期策略]
        ON[OutputNode<br/>输出节点]
        DBN[DebugNode<br/>调试节点]
    end

    Nodes --> DN
    Nodes --> EN
    Nodes --> ON
    Nodes --> DBN

FlowConfig 数据模型

@dataclass
class FlowConfig:
    id: str                       # Flow 唯一标识
    name: str                     # 名称
    description: str              # 描述
    nodes: List[FlowNode]         # 节点列表
    edges: List[FlowEdge]         # 连线列表
    metadata: FlowMetadata        # 元数据
    version: int                  # 版本号
    created_at: datetime
    updated_at: datetime

节点类型

节点类型 用途 输入 输出
marketSource 市场数据源 ohlcv, tick
director 战略方向判定 data_stream constraints
executor 入场时机判定 data_stream signal
sizingPolicy 仓位计算 executor_output sizing_output
positionManager 约束层叠 director_constraints, sizing_output decision
tradeExecutor 订单执行 decisions
consoleLog 调试日志 input
chartPlotter 图表绘制 input
metricGauge 指标仪表 input

Flow 持久化

Redis 存储结构

# 单个 Flow 的详细数据
key: "fire:flows"
type: Hash
field: flow_id
value: JSON 序列化的 FlowConfig

# 获取 Flow
HGET fire:flows "flow_abc123"

FlowConfigRepository 接口

class FlowConfigRepository:
    def create(self, flow: FlowConfig) -> str:
        """创建 Flow,返回 flow_id"""
        pass

    def get_by_id(self, flow_id: str) -> Optional[FlowConfig]:
        """获取单个 Flow"""
        pass

    def get_all(self) -> List[FlowConfig]:
        """获取所有 Flow"""
        pass

    def update(self, flow_id: str, flow: FlowConfig) -> bool:
        """更新 Flow(版本号递增)"""
        pass

    def delete(self, flow_id: str) -> bool:
        """删除 Flow"""
        pass

    def duplicate(self, flow_id: str) -> Optional[str]:
        """复制 Flow,返回新 flow_id"""
        pass

交易数据流

实时交易数据流

sequenceDiagram
    participant Market as 市场数据源
    participant TSE as TradingSessionEngine
    participant FR as FlowRuntime
    participant DIR as Director
    participant EXE as Executor
    participant PM as PositionManager
    participant TE as TradingEngine

    Market->>TSE: 推送行情数据
    TSE->>FR: 分发数据
    FR->>DIR: 4h K 线数据
    FR->>EXE: 15m K 线数据
    DIR-->>PM: max_long/max_short
    EXE-->>PM: target_ratio
    PM->>PM: 约束层叠计算
    PM->>TE: final_target_ratio
    TE->>TE: 生成订单

回测数据流

sequenceDiagram
    participant HD as 历史数据
    participant BT as BacktestEngine
    participant FR as FlowRuntime
    participant PM as PositionManager
    participant PA as 绩效分析

    HD->>BT: 加载历史数据
    BT->>FR: 按时序推送
    FR->>FR: 执行节点图
    FR-->>PM: 生成信号
    PM->>BT: 模拟交易
    BT->>PA: 记录交易
    PA-->>BT: 返回报告

REST API

Flow 配置管理

方法 路径 说明
GET /api/v1/flows 获取 Flow 列表
POST /api/v1/flows 创建 Flow
GET /api/v1/flows/{flow_id} 获取 Flow 详情
PUT /api/v1/flows/{flow_id} 更新 Flow
DELETE /api/v1/flows/{flow_id} 删除 Flow
POST /api/v1/flows/{flow_id}/duplicate 复制 Flow
POST /api/v1/flows/{flow_id}/validate 验证 Flow

交易会话管理

方法 路径 说明
POST /api/v1/trading/mtf/sessions 创建会话
GET /api/v1/trading/mtf/sessions 获取会话列表
POST /api/v1/trading/mtf/sessions/{session_id}/start 启动会话
POST /api/v1/trading/mtf/sessions/{session_id}/pause 暂停会话
POST /api/v1/trading/mtf/sessions/{session_id}/stop 停止会话
POST /api/v1/trading/mtf/sessions/{session_id}/step 单步执行
GET /api/v1/trading/mtf/sessions/{session_id}/outputs 获取节点输出

数据回放控制

方法 路径 说明
POST /api/v1/trading/sessions/{session_id}/replay/start 启动回放
POST /api/v1/trading/sessions/{session_id}/replay/control 控制回放
GET /api/v1/trading/sessions/{session_id}/replay/status 获取回放状态

WebSocket API

端点

连接地址/ws/trading/{session_id}

消息类型

类型 说明
director_update Director 输出更新
executor_signal Executor 信号
sizing_output 仓位计算结果
position_decision PositionManager 决策
debug_output 调试节点数据

调试节点消息

消息类型 用途 数据格式
console_log 控制台日志输出 {level, message, source, timestamp}
chart_plotter 图表绘制数据 {symbol, timeframe, candle, indicators}
metric_gauge 指标仪表数据 {metric_name, value, min, max, threshold}
director_state Director 状态变更 {market_state, constraints, urgency}
executor_signal Executor 信号 {target_ratio, confidence, stop_loss}
position_update 持仓变更 {ratio, quantity, entry_price, pnl}
replay_control 回放控制响应 {action, current_time, speed, is_paused}

依赖关系

内部依赖

模块 用途
core.trading.mtf 多时间框架核心
core.repositories 数据持久化
core.data_source 市场数据接入
infrastructure.broker 券商接口适配
infrastructure.cache Redis 缓存

外部依赖

版本 用途
pandas ≥2.3.0 数据处理
pydantic ≥2.6.0 数据验证
redis ≥5.0.0 状态持久化

相关文档

文档 内容
系统概览 系统简介、技术栈、核心概念
工程架构 分层架构、目录结构、数据层设计
策略与风控 策略体系、PositionManager 约束、风险控制、策略开发指南