Fire 交易架构
业务架构全景
Fire 量化交易平台的业务架构分为五个层次:数据层、决策层、仓位层、执行层和分析层。
flowchart TB
subgraph "数据层"
MD[市场数据<br/>Market Data]
HD[历史数据<br/>Historical Data]
end
subgraph "交易核心层"
TSE[交易会话引擎<br/>TradingSessionEngine]
FR[Flow 运行时<br/>FlowRuntime]
end
subgraph "决策层"
DIR[Director<br/>战略方向]
EXE[Executor<br/>入场时机]
end
subgraph "仓位管理层"
SP[SizingPolicy<br/>仓位计算]
PM[PositionManager<br/>约束层叠]
end
subgraph "执行层"
TE[交易引擎<br/>TradingEngine]
OM[订单管理<br/>OrderManager]
end
subgraph "分析层"
BT[回测引擎<br/>BacktestEngine]
PA[绩效分析<br/>Performance]
end
MD --> TSE
HD --> TSE
TSE --> FR
FR --> DIR
FR --> EXE
DIR --> PM
EXE --> SP
SP --> PM
PM --> TE
TE --> OM
TSE --> BT
BT --> PA
Director-Executor-PositionManager 核心架构
架构概念
Fire 采用 Director-Executor-PositionManager (DEP) 三层决策架构,实现”大周期定方向、小周期找时机”的多时间框架策略。
flowchart LR
subgraph "数据层"
MS[MarketSource<br/>市场数据源]
DS4h[DataStream<br/>4h]
DS15m[DataStream<br/>15m]
end
subgraph "决策层"
DIR[Director<br/>战略方向判定]
EXE[Executor<br/>入场时机判定]
end
subgraph "仓位管理层"
SP[SizingPolicy<br/>仓位计算]
PM[PositionManager<br/>约束层叠]
end
subgraph "执行层"
TE[TradingEngine<br/>交易引擎]
end
MS --> DS4h
MS --> DS15m
DS4h --> DIR
DS15m --> EXE
DIR --> |max_long/max_short| PM
EXE --> SP
SP --> |suggested_target| PM
PM --> |final_target| TE
style DIR fill:#e1f5fe
style EXE fill:#e1f5fe
style PM fill:#fff3e0,stroke:#333,stroke-width:2px
style TE fill:#c8e6c9
组件职责
| 组件 |
时间框架 |
职责 |
输出 |
| Director |
大周期 (4h, 1d) |
判断市场状态和交易方向 |
max_long_ratio, max_short_ratio, urgency |
| Executor |
小周期 (1h, 15m) |
寻找入场时机 |
target_ratio, confidence |
| SizingPolicy |
- |
计算建议仓位 |
suggested_target, stop_loss |
| PositionManager |
- |
约束层叠和最终决策 |
final_target_ratio |
模块依赖关系
flowchart LR
subgraph "配置层"
FC[FlowConfig<br/>Redis 存储]
FE[FlowEditor<br/>可视化配置]
end
subgraph "运行层"
FR[FlowRuntime]
DS[DataStream]
end
subgraph "决策层"
DIR[Director]
EXE[Executor]
PM[PositionManager]
end
subgraph "执行层"
TE[TradingEngine]
end
FE --> FC
FC --> FR
DS --> FR
FR --> DIR
FR --> EXE
DIR --> PM
EXE --> PM
PM --> TE
style FR fill:#e1f5fe
style PM fill:#fff3e0
核心模块
TradingSessionEngine(会话引擎)
会话引擎管理交易会话的完整生命周期。
会话状态机
stateDiagram-v2
[*] --> Created: 创建会话
Created --> Ready: 初始化完成
Ready --> Running: start()
Running --> Paused: pause()
Paused --> Running: resume()
Running --> Stopped: stop()
Paused --> Stopped: stop()
Stopped --> [*]
会话状态
| 状态 |
说明 |
created |
会话已创建 |
ready |
初始化完成 |
running |
运行中 |
paused |
已暂停 |
stopped |
已停止 |
completed |
已完成(回测) |
error |
错误状态 |
FlowRuntime(节点图执行引擎)
FlowRuntime 负责执行 FlowConfig 定义的节点图,按拓扑顺序处理数据流。
执行流程
flowchart LR
Load[加载 FlowConfig] --> Validate[验证配置]
Validate --> Init[实例化节点]
Init --> Topo[拓扑排序]
Topo --> Execute[按序执行]
Execute --> Output[收集输出]
FlowRuntime 接口
class FlowRuntime:
def __init__(self, flow_config: FlowConfig, session_id: str):
"""初始化运行时"""
pass
def execute_tick(self, market_data: MarketData) -> None:
"""执行一个 tick 的数据处理"""
pass
def get_node_outputs(self) -> Dict[str, Any]:
"""获取所有节点的输出"""
pass
def start(self) -> None:
"""启动运行时"""
pass
def stop(self) -> None:
"""停止运行时"""
pass
验证规则
FlowValidator 执行以下验证:
- 连接有效性:源/目标节点必须存在
- 时间框架约束:Director 时间框架 ≥ Executor 时间框架
- Symbol 一致性:同一流程使用相同标的
- 循环检测:禁止循环依赖
TradingEngine(交易引擎)
TradingEngine 负责实际订单执行和持仓管理。
接口
class TradingEngine:
async def submit_order(self, order: Order) -> OrderResult:
"""提交订单"""
pass
async def cancel_order(self, order_id: str) -> bool:
"""取消订单"""
pass
def get_positions(self) -> List[Position]:
"""获取当前持仓"""
pass
def get_account_balance(self) -> AccountBalance:
"""获取账户余额"""
pass
订单执行流程
sequenceDiagram
participant PM as PositionManager
participant TE as TradingEngine
participant SIM as SimulationEngine
participant BA as BrokerAdapter
PM->>TE: final_target_ratio
TE->>TE: 计算订单数量
alt 模拟模式
TE->>SIM: 模拟执行
SIM-->>TE: 模拟成交
else 实盘模式
TE->>BA: 发送订单
BA-->>TE: 成交回报
end
TE->>TE: 更新持仓
SimulationEngine(模拟引擎)
SimulationEngine 提供模拟交易执行,包括滑点和手续费计算。
| 参数 |
默认值 |
说明 |
| 滑点率 |
0.1% |
模拟市场冲击 |
| 手续费率 |
0.03% |
交易佣金 |
Flow 系统
FlowConfig 结构
策略流程通过 Flow Editor 进行可视化配置,持久化为 FlowConfig 存储在 Redis。
flowchart TB
subgraph "FlowConfig"
Nodes[节点列表<br/>Director/Executor/Output]
Edges[连接关系<br/>数据流向]
Params[节点参数<br/>策略配置]
end
subgraph "节点类型"
DN[DirectorNode<br/>大周期策略]
EN[ExecutorNode<br/>小周期策略]
ON[OutputNode<br/>输出节点]
DBN[DebugNode<br/>调试节点]
end
Nodes --> DN
Nodes --> EN
Nodes --> ON
Nodes --> DBN
FlowConfig 数据模型
@dataclass
class FlowConfig:
id: str # Flow 唯一标识
name: str # 名称
description: str # 描述
nodes: List[FlowNode] # 节点列表
edges: List[FlowEdge] # 连线列表
metadata: FlowMetadata # 元数据
version: int # 版本号
created_at: datetime
updated_at: datetime
节点类型
| 节点类型 |
用途 |
输入 |
输出 |
marketSource |
市场数据源 |
无 |
ohlcv, tick |
director |
战略方向判定 |
data_stream |
constraints |
executor |
入场时机判定 |
data_stream |
signal |
sizingPolicy |
仓位计算 |
executor_output |
sizing_output |
positionManager |
约束层叠 |
director_constraints, sizing_output |
decision |
tradeExecutor |
订单执行 |
decisions |
无 |
consoleLog |
调试日志 |
input |
无 |
chartPlotter |
图表绘制 |
input |
无 |
metricGauge |
指标仪表 |
input |
无 |
Flow 持久化
Redis 存储结构
# 单个 Flow 的详细数据
key: "fire:flows"
type: Hash
field: flow_id
value: JSON 序列化的 FlowConfig
# 获取 Flow
HGET fire:flows "flow_abc123"
FlowConfigRepository 接口
class FlowConfigRepository:
def create(self, flow: FlowConfig) -> str:
"""创建 Flow,返回 flow_id"""
pass
def get_by_id(self, flow_id: str) -> Optional[FlowConfig]:
"""获取单个 Flow"""
pass
def get_all(self) -> List[FlowConfig]:
"""获取所有 Flow"""
pass
def update(self, flow_id: str, flow: FlowConfig) -> bool:
"""更新 Flow(版本号递增)"""
pass
def delete(self, flow_id: str) -> bool:
"""删除 Flow"""
pass
def duplicate(self, flow_id: str) -> Optional[str]:
"""复制 Flow,返回新 flow_id"""
pass
交易数据流
实时交易数据流
sequenceDiagram
participant Market as 市场数据源
participant TSE as TradingSessionEngine
participant FR as FlowRuntime
participant DIR as Director
participant EXE as Executor
participant PM as PositionManager
participant TE as TradingEngine
Market->>TSE: 推送行情数据
TSE->>FR: 分发数据
FR->>DIR: 4h K 线数据
FR->>EXE: 15m K 线数据
DIR-->>PM: max_long/max_short
EXE-->>PM: target_ratio
PM->>PM: 约束层叠计算
PM->>TE: final_target_ratio
TE->>TE: 生成订单
回测数据流
sequenceDiagram
participant HD as 历史数据
participant BT as BacktestEngine
participant FR as FlowRuntime
participant PM as PositionManager
participant PA as 绩效分析
HD->>BT: 加载历史数据
BT->>FR: 按时序推送
FR->>FR: 执行节点图
FR-->>PM: 生成信号
PM->>BT: 模拟交易
BT->>PA: 记录交易
PA-->>BT: 返回报告
REST API
Flow 配置管理
| 方法 |
路径 |
说明 |
| GET |
/api/v1/flows |
获取 Flow 列表 |
| POST |
/api/v1/flows |
创建 Flow |
| GET |
/api/v1/flows/{flow_id} |
获取 Flow 详情 |
| PUT |
/api/v1/flows/{flow_id} |
更新 Flow |
| DELETE |
/api/v1/flows/{flow_id} |
删除 Flow |
| POST |
/api/v1/flows/{flow_id}/duplicate |
复制 Flow |
| POST |
/api/v1/flows/{flow_id}/validate |
验证 Flow |
交易会话管理
| 方法 |
路径 |
说明 |
| POST |
/api/v1/trading/mtf/sessions |
创建会话 |
| GET |
/api/v1/trading/mtf/sessions |
获取会话列表 |
| POST |
/api/v1/trading/mtf/sessions/{session_id}/start |
启动会话 |
| POST |
/api/v1/trading/mtf/sessions/{session_id}/pause |
暂停会话 |
| POST |
/api/v1/trading/mtf/sessions/{session_id}/stop |
停止会话 |
| POST |
/api/v1/trading/mtf/sessions/{session_id}/step |
单步执行 |
| GET |
/api/v1/trading/mtf/sessions/{session_id}/outputs |
获取节点输出 |
数据回放控制
| 方法 |
路径 |
说明 |
| POST |
/api/v1/trading/sessions/{session_id}/replay/start |
启动回放 |
| POST |
/api/v1/trading/sessions/{session_id}/replay/control |
控制回放 |
| GET |
/api/v1/trading/sessions/{session_id}/replay/status |
获取回放状态 |
WebSocket API
端点
连接地址:/ws/trading/{session_id}
消息类型
| 类型 |
说明 |
director_update |
Director 输出更新 |
executor_signal |
Executor 信号 |
sizing_output |
仓位计算结果 |
position_decision |
PositionManager 决策 |
debug_output |
调试节点数据 |
调试节点消息
| 消息类型 |
用途 |
数据格式 |
console_log |
控制台日志输出 |
{level, message, source, timestamp} |
chart_plotter |
图表绘制数据 |
{symbol, timeframe, candle, indicators} |
metric_gauge |
指标仪表数据 |
{metric_name, value, min, max, threshold} |
director_state |
Director 状态变更 |
{market_state, constraints, urgency} |
executor_signal |
Executor 信号 |
{target_ratio, confidence, stop_loss} |
position_update |
持仓变更 |
{ratio, quantity, entry_price, pnl} |
replay_control |
回放控制响应 |
{action, current_time, speed, is_paused} |
依赖关系
内部依赖
| 模块 |
用途 |
core.trading.mtf |
多时间框架核心 |
core.repositories |
数据持久化 |
core.data_source |
市场数据接入 |
infrastructure.broker |
券商接口适配 |
infrastructure.cache |
Redis 缓存 |
外部依赖
| 库 |
版本 |
用途 |
| pandas |
≥2.3.0 |
数据处理 |
| pydantic |
≥2.6.0 |
数据验证 |
| redis |
≥5.0.0 |
状态持久化 |
相关文档
| 文档 |
内容 |
| 系统概览 |
系统简介、技术栈、核心概念 |
| 工程架构 |
分层架构、目录结构、数据层设计 |
| 策略与风控 |
策略体系、PositionManager 约束、风险控制、策略开发指南 |