Trading 模块架构文档
本文档描述 Trading 模块的架构设计、组件结构、接口定义和配置说明。
概述
Trading 模块是 Fire 量化交易系统的执行核心,负责处理策略信号、执行订单、管理持仓和多通道交易协调。系统支持真实交易和模拟交易两种模式,并实现了多通道虚拟持仓和净额计算的高级功能。
核心功能
基础能力
- 双模式支持: 真实交易和模拟交易使用统一接口,无缝切换
- 订单管理: 完整的订单生命周期管理,包括提交、取消、状态跟踪
- 持仓跟踪: 实时持仓和资金状态监控,支持多市场多品种
- 风险控制: 多层次风险检查机制,包含前置、中置、后置检查
- 性能优化: 低延迟执行和批量处理,订单响应时间 <100ms
高级特性
- 多通道交易: 支持激进/平衡/保守三种风险级别通道,各通道独立运行
- 虚拟持仓: 每个通道独立维护虚拟持仓,支持仓位隔离和风险隔离
- 净额计算: 多通道信号聚合和净额处理,优化实际交易执行
- 分层投票: 策略组内投票(权重投票)和通道间协调(优先级协调)
- 配置热更新: 运行时动态调整通道配置,无需重启系统
业务特性
| 特性 | 描述 | 实现方式 |
|---|---|---|
| 策略隔离 | 各策略独立运行互不干扰 | 独立的策略实例和执行上下文 |
| 通道并行 | 多通道同时运行不同配置 | 通道管理器和虚拟仓位跟踪 |
| 信号聚合 | 多策略信号加权综合 | 层次化投票系统和权重计算 |
| 动态调仓 | 根据市场自动调整仓位 | 动态风险评估和仓位优化 |
| 实时监控 | 全流程状态实时可见 | WebSocket推送和事件流 |
| 故障恢复 | 异常情况自动恢复 | 熔断机制和状态持久化 |
快速开始
# 基本使用示例
from backend.core.trading.engines.trading_engine import TradingEngine
from backend.core.trading.engines.channel_manager import ChannelManager
from backend.core.trading.models import Order, TradingMode
# 初始化交易引擎
trading_engine = TradingEngine(
user_id="user123",
session_id="session456",
trading_mode=TradingMode.SIMULATION,
asset_mode=AssetMode.STOCK
)
# 初始化通道管理器
channel_manager = ChannelManager(
session_id="session456",
user_id="user123",
trading_engine=trading_engine,
channel_configs=channel_configs
)
# 初始化并启动
trading_engine.initialize()
channel_manager.initialize(total_capital=Decimal("100000"))
channel_manager.start_all_channels()
# 提交订单
order = Order(
symbol="AAPL",
side=OrderSide.BUY,
order_type=OrderType.LIMIT,
quantity=100,
price=150.0
)
result = trading_engine.submit_order(order)
组件
组件架构图
graph TD
subgraph "交易引擎核心"
TE[TradingEngine<br/>交易引擎基类]
RTE[RealTradingEngine<br/>真实交易引擎]
STE[SimulationTradingEngine<br/>模拟交易引擎]
SIM[SimulationEngine<br/>模拟引擎核心]
end
subgraph "多通道管理"
CM[ChannelManager<br/>通道管理器]
TC[TradingChannel<br/>交易通道]
CH1[AggressiveChannel<br/>激进通道]
CH2[BalancedChannel<br/>平衡通道]
CH3[ConservativeChannel<br/>保守通道]
end
subgraph "协调服务"
VS[VotingService<br/>投票服务]
VPT[VirtualPositionTracker<br/>虚拟持仓跟踪]
PN[PositionNettingService<br/>净额计算服务]
end
subgraph "风险管理"
RE[RiskEngine<br/>风险引擎]
GRM[GlobalRiskManager<br/>全局风控]
CB[CircuitBreaker<br/>熔断机制]
end
subgraph "数据适配器"
TA[TradeAdapter<br/>交易适配器]
AA[AssetAdapter<br/>资产适配器]
QA[QuoteAdapter<br/>行情适配器]
end
TE --> RTE
TE --> STE
STE --> SIM
CM --> TC
TC --> CH1
TC --> CH2
TC --> CH3
CM --> VS
VS --> VPT
VPT --> PN
TE --> RE
RE --> GRM
GRM --> CB
RTE --> TA
SIM --> AA
SIM --> QA
style TE fill:#f9f,stroke:#333,stroke-width:2px
style CM fill:#bbf,stroke:#333,stroke-width:2px
style VS fill:#bfb,stroke:#333,stroke-width:2px
style RE fill:#fbb,stroke:#333,stroke-width:2px
核心组件说明
TradingEngine (交易引擎基类)
- 职责: 定义交易引擎统一接口,管理订单和持仓状态
- 实现类:
backend.core.trading.engines.trading_engine.TradingEngine - 主要方法:
initialize(): 初始化交易引擎submit_order(): 提交交易订单cancel_order(): 取消订单get_positions(): 获取持仓列表get_account_balance(): 获取账户余额
- 状态管理: 有状态(维护订单和持仓信息)
ChannelManager (通道管理器)
- 职责: 管理多个交易通道的生命周期,协调通道间的交互
- 实现类:
backend.core.trading.engines.channel_manager.ChannelManager - 主要方法:
initialize(): 初始化所有通道start_all_channels(): 启动所有通道distribute_market_data(): 分发市场数据collect_signals(): 收集通道信号calculate_net_position(): 计算净额持仓
- 状态管理: 有状态(维护通道实例和配置)
VotingService (投票服务)
- 职责: 实现分层投票机制,协调策略组内和通道间的决策
- 实现类:
backend.core.trading.services.voting_service.VotingService - 主要方法:
vote(): 执行分层投票intra_channel_vote(): 策略组内投票inter_channel_coordination(): 通道间协调
- 状态管理: 无状态
VirtualPositionTracker (虚拟持仓跟踪器)
- 职责: 跟踪和管理每个通道的虚拟持仓
- 实现类:
backend.core.trading.services.virtual_position_tracker.VirtualPositionTracker - 主要方法:
update_position(): 更新虚拟持仓get_channel_positions(): 获取通道持仓calculate_pnl(): 计算盈亏get_aggregated_positions(): 获取聚合持仓
- 状态管理: 有状态(维护虚拟持仓数据)
SimulationEngine (模拟引擎核心)
- 职责: 实现模拟交易的核心逻辑,包括成交模拟和成本计算
- 实现类:
backend.core.trading.engines.simulation_engine.SimulationEngine - 主要方法:
submit_order(): 模拟订单执行simulate_execution_price(): 模拟成交价格(含滑点)calculate_commission(): 计算手续费update_position(): 更新模拟持仓
- 状态管理: 有状态(维护模拟账户和持仓)
- 成本模型:
- 默认滑点率: 0.1%
- 默认手续费率: 0.03%
- 支持自定义成本参数
RiskEngine (风险引擎)
- 职责: 执行多层次风险控制,包括前置、中置和后置检查
- 实现类:
backend.core.trading.risk.risk_engine.RiskEngine - 风控层次:
- 前置检查: 订单提交前的参数验证和限制检查
- 中置检查: 订单执行中的实时风险监控
- 后置检查: 订单成交后的风险指标更新
- 风控维度:
- 仓位风险: 单品种仓位限制、总仓位限制
- 资金风险: 最大损失限制、保证金要求
- 市场风险: 流动性检查、价格异常检测
- 操作风险: 订单频率限制、操作权限验证
TradingChannel (交易通道)
- 职责: 单个交易通道的独立运行单元,维护独立的策略组和虚拟持仓
- 实现类:
backend.core.trading.engines.trading_channel.TradingChannel - 通道类型:
- AggressiveChannel: 激进通道(高风险高收益)
- BalancedChannel: 平衡通道(风险收益均衡)
- ConservativeChannel: 保守通道(低风险稳健)
- 主要功能:
- 独立的策略引擎实例
- 独立的虚拟持仓管理
- 通道级别的风险控制
- 信号生成和投票聚合
ConfigValidator (配置验证服务)
- 职责: 验证通道配置和自定义模板的合法性
- 实现类:
backend.core.services.config_validator.ConfigValidator - 主要方法:
validate_channels_config(): 验证通道配置(返回 (是否有效, 错误列表))validate_channel(): 验证单个通道配置validate_strategy(): 验证策略配置(策略ID、权重、参数)
- 验证规则:
- 通道数量限制: 1-3个通道
- 资金分配总和: 必须为100%
- 策略权重总和: 每个通道内必须为1.0
- 策略ID有效性: 从策略注册表验证
- 参数范围验证: 根据策略元数据检查
- 状态管理: 无状态(纯验证逻辑)
依赖关系
内部依赖
| 模块名 | 版本 | 用途 | 是否必需 |
|---|---|---|---|
core.trading.strategies |
- | 策略引擎和基类 | 是 |
core.trading.models |
- | 数据模型定义 | 是 |
core.trading.risk |
- | 风险控制组件 | 是 |
core.repositories |
- | 数据持久化 | 是 |
core.data_source |
- | 市场数据接入 | 是 |
infrastructure.broker |
- | 券商接口适配 | 是(真实交易) |
infrastructure.cache |
- | Redis 缓存 | 是 |
utils.logging |
- | 日志记录 | 是 |
外部依赖
| 库名 | 版本要求 | 用途 |
|---|---|---|
pandas |
≥2.3.0 | 数据处理 |
numpy |
≥1.24.0 | 数值计算 |
pydantic |
≥2.6.0 | 数据验证 |
redis |
≥5.0.0 | 数据存储 |
longport |
≥3.0.0 | 券商接口 |
asyncio |
标准库 | 异步支持 |
数据流
订单执行流程图
flowchart LR
Start([策略生成信号]) --> CM[通道管理器]
CM --> VS[投票服务]
VS --> |多通道信号| Vote{分层投票}
Vote --> PN[净额计算]
PN --> CheckRisk{风险检查}
CheckRisk -->|通过| TE[交易引擎]
CheckRisk -->|拒绝| Reject[拒绝订单]
TE --> Mode{交易模式}
Mode -->|真实| RTE[真实交易引擎]
Mode -->|模拟| STE[模拟交易引擎]
RTE --> API[券商API]
API --> Confirm[成交确认]
STE --> SIM[模拟成交]
SIM --> Update[更新持仓]
Confirm --> Result[返回结果]
Update --> Result
Reject --> Result
Result --> End([结束])
style Start fill:#e1f5fe
style End fill:#e1f5fe
style Reject fill:#ffebee
style Confirm fill:#c8e6c9
style Update fill:#c8e6c9
市场数据处理流程
sequenceDiagram
participant MD as 市场数据源
participant CM as 通道管理器
participant CH as 交易通道
participant SE as 策略引擎
participant VPT as 虚拟持仓
participant VS as 投票服务
participant PN as 净额计算
MD->>CM: 推送行情数据
CM->>CH: 分发到各通道
loop 每个通道
CH->>SE: 处理市场数据
SE->>SE: 生成交易信号
SE->>CH: 返回信号
CH->>VPT: 更新虚拟持仓
end
CM->>VS: 收集所有信号
VS->>VS: 执行分层投票
VS->>PN: 投票结果
PN->>PN: 计算净额持仓
PN->>CM: 返回调仓需求
TE-->>CM: 执行结果
CM->>VPT: 更新实际持仓
数据结构
输入数据格式
市场数据 (MarketData):
@dataclass
class MarketData:
symbol: str
price: Decimal
volume: int
bid: Decimal
ask: Decimal
timestamp: datetime
# 可选字段
open: Optional[Decimal] = None
high: Optional[Decimal] = None
low: Optional[Decimal] = None
close: Optional[Decimal] = None
交易信号 (Signal):
@dataclass
class Signal:
symbol: str
action: ActionType # BUY, SELL, HOLD
confidence: float # 0.0 - 1.0
strategy_id: str
strategy_weight: float
metadata: Dict[str, Any]
timestamp: datetime
输出数据格式
订单结果 (OrderResult):
@dataclass
class OrderResult:
success: bool
order_id: Optional[str]
execution_price: Optional[Decimal]
executed_quantity: Optional[int]
commission: Optional[Decimal]
error_message: Optional[str]
timestamp: datetime
净额持仓 (NetPosition):
@dataclass
class NetPosition:
symbol: str
target_quantity: Decimal # 目标持仓
current_quantity: Decimal # 当前持仓
adjustment_needed: Decimal # 需要调整的数量
channels_involved: List[str] # 涉及的通道
confidence: float # 综合置信度
接口
Python API
TradingEngine 接口
class TradingEngine(ABC):
"""交易引擎基类接口"""
@abstractmethod
def initialize(self) -> bool:
"""初始化交易引擎"""
pass
@abstractmethod
def submit_order(self, order: Order) -> OrderResult:
"""提交订单"""
pass
@abstractmethod
def cancel_order(self, order_id: str) -> bool:
"""取消订单"""
pass
@abstractmethod
def get_positions(self) -> List[Position]:
"""获取当前持仓"""
pass
@abstractmethod
def get_account_balance(self) -> AccountBalance:
"""获取账户余额"""
pass
ChannelManager 接口
class ChannelManager:
"""通道管理器接口"""
def initialize(self, total_capital: Decimal) -> bool:
"""初始化通道管理器"""
pass
def start_all_channels(self) -> None:
"""启动所有通道"""
pass
def stop_all_channels(self) -> None:
"""停止所有通道"""
pass
def distribute_market_data(self, market_data: MarketData) -> None:
"""分发市场数据到各通道"""
pass
def collect_signals(self) -> Dict[str, List[Signal]]:
"""收集所有通道信号"""
pass
def calculate_net_positions(self) -> Dict[str, NetPosition]:
"""计算净额持仓"""
pass
REST API
自定义模板管理
列出用户模板
GET /api/v1/custom-templates/
查询参数:
sort_by: 排序字段 (created_at 或 updated_at)order: 排序方向 (asc 或 desc)
响应:
{
"templates": [
{
"id": "uuid-1234",
"name": "My Aggressive Setup",
"channels_config": [...],
"created_at": "2025-11-22T10:00:00Z",
"updated_at": "2025-11-22T12:00:00Z"
}
]
}
创建模板
POST /api/v1/custom-templates/
请求参数:
{
"name": "My Strategy Template",
"channels_config": [
{
"channel_id": "aggressive_1",
"channel_type": "aggressive",
"capital_allocation": 0.4,
"strategies": [
{"strategy_id": "momentum_breakout", "weight": 0.6},
{"strategy_id": "rsi_momentum", "weight": 0.4}
]
}
]
}
验证规则 (通过 ConfigValidator):
- 通道数量: 1-3个
- 资金分配总和: 100%
- 策略权重总和: 每通道1.0
- 策略ID: 必须在注册表中
响应:
{
"id": "uuid-5678",
"name": "My Strategy Template",
"created_at": "2025-11-22T10:00:00Z"
}
更新模板
PUT /api/v1/custom-templates/{template_id}
删除模板
DELETE /api/v1/custom-templates/{template_id}
提交订单
POST /api/v1/trading/orders
请求参数:
{
"symbol": "AAPL",
"side": "BUY",
"order_type": "LIMIT",
"quantity": 100,
"price": 150.00,
"channel_id": "balanced_channel",
"strategy_id": "ma_crossover"
}
响应:
{
"success": true,
"order_id": "ORD_20250101_001",
"status": "SUBMITTED",
"message": "Order successfully submitted"
}
获取持仓
GET /api/v1/trading/positions
响应:
{
"positions": [
{
"symbol": "AAPL",
"quantity": 100,
"average_price": 150.00,
"current_price": 151.00,
"unrealized_pnl": 100.00,
"channels": {
"aggressive": 30,
"balanced": 50,
"conservative": 20
}
}
],
"total_value": 15100.00,
"total_pnl": 100.00
}
获取通道状态
GET /api/v1/trading/channels
响应:
{
"channels": [
{
"channel_id": "aggressive_channel",
"status": "RUNNING",
"allocated_capital": 30000,
"virtual_positions": 5,
"total_pnl": 1200.50,
"strategies_count": 3
}
]
}
数据模型
CustomTemplate (自定义通道配置模板)
用户保存的通道配置模板,持久化在Redis中。
模型定义: backend.core.models.custom_template.py
字段:
id: UUID - 模板唯一标识user_id: UUID - 所属用户IDname: str - 模板名称 (最大100字符)channels_config: List[Dict] - 通道配置JSONcreated_at: datetime - 创建时间updated_at: datetime - 更新时间
Repository: backend.core.repositories.custom_template_repository.CustomTemplateRepository
存储结构 (Redis):
# Hash: user的模板索引
key: f"custom_templates:user:{user_id}"
field: template_id
value: template_name
# Hash: 模板详细数据
key: f"custom_template:{template_id}"
fields: {
"name": str,
"user_id": str,
"channels_config": json_string,
"created_at": iso_timestamp,
"updated_at": iso_timestamp
}
CRUD操作:
create(): 创建新模板,验证配置合法性get(): 根据ID获取模板list_by_user(): 获取用户所有模板update(): 更新模板(重新验证配置)delete(): 删除模板及索引
扩展点
自定义交易引擎
实现 TradingEngine 基类来创建自定义交易引擎:
from backend.core.trading.engines.trading_engine import TradingEngine
class CustomTradingEngine(TradingEngine):
"""自定义交易引擎实现"""
def __init__(self, config: Dict[str, Any]):
super().__init__()
self.config = config
def submit_order(self, order: Order) -> OrderResult:
"""自定义订单执行逻辑"""
# 实现自定义逻辑
pass
def cancel_order(self, order_id: str) -> bool:
"""自定义取消逻辑"""
# 实现自定义逻辑
pass
自定义风险控制
扩展风险引擎添加自定义风控规则:
from backend.core.trading.risk.risk_engine import RiskEngine
class CustomRiskEngine(RiskEngine):
"""自定义风险引擎"""
def add_custom_rule(self, rule: RiskRule):
"""添加自定义风控规则"""
# 实现规则添加逻辑
pass
def check_custom_risk(self, order: Order) -> RiskCheckResult:
"""执行自定义风险检查"""
# 实现自定义风险检查逻辑
pass
自定义通道类型
创建新的通道类型以支持不同的交易风格:
from backend.core.trading.engines.trading_channel import TradingChannel
class CustomChannel(TradingChannel):
"""自定义交易通道"""
def __init__(self, channel_id: str, config: ChannelConfig):
super().__init__(channel_id, config)
self.custom_params = config.custom_params
def process_signal(self, signal: Signal) -> Decision:
"""自定义信号处理逻辑"""
# 实现特定的信号处理
pass
def calculate_position_size(self, signal: Signal) -> int:
"""自定义仓位计算"""
# 实现特定的仓位算法
pass
注册扩展
# 注册自定义组件
from backend.core.trading.registry import TradingRegistry
# 注册自定义交易引擎
TradingRegistry.register_engine("custom", CustomTradingEngine)
# 注册自定义通道
TradingRegistry.register_channel("custom", CustomChannel)
# 注册自定义风险引擎
TradingRegistry.register_risk_engine("custom", CustomRiskEngine)
@trading_engine.hook("before_order_submit")
def validate_order(order: Order) -> bool:
"""订单提交前的验证"""
if order.quantity > 1000:
logger.warning(f"Large order detected: {order.quantity}")
return True
@trading_engine.hook("on_order_filled")
def notify_filled(order: Order, result: OrderResult):
"""订单成交通知"""
notification_service.send(
f"Order {order.order_id} filled at {result.execution_price}"
)
配置
实际配置方式
Trading 模块的配置通过以下方式管理:
- 数据库配置:通过
SettingsRepository存储用户级配置 - 环境变量:通过
.env文件和infrastructure/config/settings.py - 运行时配置:通过 API 参数传递
支持的环境变量
根据 infrastructure/config/settings.py 的实际实现:
| 环境变量 | 说明 | 默认值 |
|---|---|---|
REDIS_HOST |
Redis主机 | localhost |
REDIS_PORT |
Redis端口 | 6379 |
TRADING_ENABLED |
启用交易 | False |
RISK_LIMIT |
风险限制 | 10000.0 |
LOG_LEVEL |
日志级别 | INFO |
LONGPORT_APP_KEY |
长桥API密钥 | None |
LONGPORT_APP_SECRET |
长桥API密钥 | None |
LONGPORT_ACCESS_TOKEN |
长桥访问令牌 | None |
配置示例
# 实际使用的配置方式
from infrastructure.config.settings import settings
# 访问配置
redis_host = settings.redis_host
trading_enabled = settings.trading_enabled
# 通过工厂获取数据源配置
from core.data_source.factories.config_factory import unified_config_factory
config = unified_config_factory.get_data_source_config(user_id)