🇨🇳 简体中文
🇺🇸 English
🇯🇵 日本語
Skip to the content.

Trading 模块架构文档

本文档描述 Trading 模块的架构设计、组件结构、接口定义和配置说明。

概述

Trading 模块是 Fire 量化交易系统的执行核心,负责处理策略信号、执行订单、管理持仓和多通道交易协调。系统支持真实交易和模拟交易两种模式,并实现了多通道虚拟持仓和净额计算的高级功能。

核心功能

基础能力

高级特性

业务特性

特性 描述 实现方式
策略隔离 各策略独立运行互不干扰 独立的策略实例和执行上下文
通道并行 多通道同时运行不同配置 通道管理器和虚拟仓位跟踪
信号聚合 多策略信号加权综合 层次化投票系统和权重计算
动态调仓 根据市场自动调整仓位 动态风险评估和仓位优化
实时监控 全流程状态实时可见 WebSocket推送和事件流
故障恢复 异常情况自动恢复 熔断机制和状态持久化

快速开始

# 基本使用示例
from backend.core.trading.engines.trading_engine import TradingEngine
from backend.core.trading.engines.channel_manager import ChannelManager
from backend.core.trading.models import Order, TradingMode

# 初始化交易引擎
trading_engine = TradingEngine(
    user_id="user123",
    session_id="session456",
    trading_mode=TradingMode.SIMULATION,
    asset_mode=AssetMode.STOCK
)

# 初始化通道管理器
channel_manager = ChannelManager(
    session_id="session456",
    user_id="user123",
    trading_engine=trading_engine,
    channel_configs=channel_configs
)

# 初始化并启动
trading_engine.initialize()
channel_manager.initialize(total_capital=Decimal("100000"))
channel_manager.start_all_channels()

# 提交订单
order = Order(
    symbol="AAPL",
    side=OrderSide.BUY,
    order_type=OrderType.LIMIT,
    quantity=100,
    price=150.0
)
result = trading_engine.submit_order(order)

组件

组件架构图

graph TD
    subgraph "交易引擎核心"
        TE[TradingEngine<br/>交易引擎基类]
        RTE[RealTradingEngine<br/>真实交易引擎]
        STE[SimulationTradingEngine<br/>模拟交易引擎]
        SIM[SimulationEngine<br/>模拟引擎核心]
    end

    subgraph "多通道管理"
        CM[ChannelManager<br/>通道管理器]
        TC[TradingChannel<br/>交易通道]
        CH1[AggressiveChannel<br/>激进通道]
        CH2[BalancedChannel<br/>平衡通道]
        CH3[ConservativeChannel<br/>保守通道]
    end

    subgraph "协调服务"
        VS[VotingService<br/>投票服务]
        VPT[VirtualPositionTracker<br/>虚拟持仓跟踪]
        PN[PositionNettingService<br/>净额计算服务]
    end

    subgraph "风险管理"
        RE[RiskEngine<br/>风险引擎]
        GRM[GlobalRiskManager<br/>全局风控]
        CB[CircuitBreaker<br/>熔断机制]
    end

    subgraph "数据适配器"
        TA[TradeAdapter<br/>交易适配器]
        AA[AssetAdapter<br/>资产适配器]
        QA[QuoteAdapter<br/>行情适配器]
    end

    TE --> RTE
    TE --> STE
    STE --> SIM

    CM --> TC
    TC --> CH1
    TC --> CH2
    TC --> CH3

    CM --> VS
    VS --> VPT
    VPT --> PN

    TE --> RE
    RE --> GRM
    GRM --> CB

    RTE --> TA
    SIM --> AA
    SIM --> QA

    style TE fill:#f9f,stroke:#333,stroke-width:2px
    style CM fill:#bbf,stroke:#333,stroke-width:2px
    style VS fill:#bfb,stroke:#333,stroke-width:2px
    style RE fill:#fbb,stroke:#333,stroke-width:2px

核心组件说明

TradingEngine (交易引擎基类)

ChannelManager (通道管理器)

VotingService (投票服务)

VirtualPositionTracker (虚拟持仓跟踪器)

SimulationEngine (模拟引擎核心)

RiskEngine (风险引擎)

TradingChannel (交易通道)

ConfigValidator (配置验证服务)

依赖关系

内部依赖

模块名 版本 用途 是否必需
core.trading.strategies - 策略引擎和基类
core.trading.models - 数据模型定义
core.trading.risk - 风险控制组件
core.repositories - 数据持久化
core.data_source - 市场数据接入
infrastructure.broker - 券商接口适配 是(真实交易)
infrastructure.cache - Redis 缓存
utils.logging - 日志记录

外部依赖

库名 版本要求 用途
pandas ≥2.3.0 数据处理
numpy ≥1.24.0 数值计算
pydantic ≥2.6.0 数据验证
redis ≥5.0.0 数据存储
longport ≥3.0.0 券商接口
asyncio 标准库 异步支持

数据流

订单执行流程图

flowchart LR
    Start([策略生成信号]) --> CM[通道管理器]
    CM --> VS[投票服务]
    VS --> |多通道信号| Vote{分层投票}
    Vote --> PN[净额计算]
    PN --> CheckRisk{风险检查}
    CheckRisk -->|通过| TE[交易引擎]
    CheckRisk -->|拒绝| Reject[拒绝订单]

    TE --> Mode{交易模式}
    Mode -->|真实| RTE[真实交易引擎]
    Mode -->|模拟| STE[模拟交易引擎]

    RTE --> API[券商API]
    API --> Confirm[成交确认]

    STE --> SIM[模拟成交]
    SIM --> Update[更新持仓]

    Confirm --> Result[返回结果]
    Update --> Result
    Reject --> Result
    Result --> End([结束])

    style Start fill:#e1f5fe
    style End fill:#e1f5fe
    style Reject fill:#ffebee
    style Confirm fill:#c8e6c9
    style Update fill:#c8e6c9

市场数据处理流程

sequenceDiagram
    participant MD as 市场数据源
    participant CM as 通道管理器
    participant CH as 交易通道
    participant SE as 策略引擎
    participant VPT as 虚拟持仓
    participant VS as 投票服务
    participant PN as 净额计算

    MD->>CM: 推送行情数据
    CM->>CH: 分发到各通道

    loop 每个通道
        CH->>SE: 处理市场数据
        SE->>SE: 生成交易信号
        SE->>CH: 返回信号
        CH->>VPT: 更新虚拟持仓
    end

    CM->>VS: 收集所有信号
    VS->>VS: 执行分层投票
    VS->>PN: 投票结果
    PN->>PN: 计算净额持仓
    PN->>CM: 返回调仓需求
    TE-->>CM: 执行结果
    CM->>VPT: 更新实际持仓

数据结构

输入数据格式

市场数据 (MarketData):

@dataclass
class MarketData:
    symbol: str
    price: Decimal
    volume: int
    bid: Decimal
    ask: Decimal
    timestamp: datetime

    # 可选字段
    open: Optional[Decimal] = None
    high: Optional[Decimal] = None
    low: Optional[Decimal] = None
    close: Optional[Decimal] = None

交易信号 (Signal):

@dataclass
class Signal:
    symbol: str
    action: ActionType  # BUY, SELL, HOLD
    confidence: float    # 0.0 - 1.0
    strategy_id: str
    strategy_weight: float
    metadata: Dict[str, Any]
    timestamp: datetime

输出数据格式

订单结果 (OrderResult):

@dataclass
class OrderResult:
    success: bool
    order_id: Optional[str]
    execution_price: Optional[Decimal]
    executed_quantity: Optional[int]
    commission: Optional[Decimal]
    error_message: Optional[str]
    timestamp: datetime

净额持仓 (NetPosition):

@dataclass
class NetPosition:
    symbol: str
    target_quantity: Decimal      # 目标持仓
    current_quantity: Decimal      # 当前持仓
    adjustment_needed: Decimal     # 需要调整的数量
    channels_involved: List[str]   # 涉及的通道
    confidence: float              # 综合置信度

接口

Python API

TradingEngine 接口

class TradingEngine(ABC):
    """交易引擎基类接口"""

    @abstractmethod
    def initialize(self) -> bool:
        """初始化交易引擎"""
        pass

    @abstractmethod
    def submit_order(self, order: Order) -> OrderResult:
        """提交订单"""
        pass

    @abstractmethod
    def cancel_order(self, order_id: str) -> bool:
        """取消订单"""
        pass

    @abstractmethod
    def get_positions(self) -> List[Position]:
        """获取当前持仓"""
        pass

    @abstractmethod
    def get_account_balance(self) -> AccountBalance:
        """获取账户余额"""
        pass

ChannelManager 接口

class ChannelManager:
    """通道管理器接口"""

    def initialize(self, total_capital: Decimal) -> bool:
        """初始化通道管理器"""
        pass

    def start_all_channels(self) -> None:
        """启动所有通道"""
        pass

    def stop_all_channels(self) -> None:
        """停止所有通道"""
        pass

    def distribute_market_data(self, market_data: MarketData) -> None:
        """分发市场数据到各通道"""
        pass

    def collect_signals(self) -> Dict[str, List[Signal]]:
        """收集所有通道信号"""
        pass

    def calculate_net_positions(self) -> Dict[str, NetPosition]:
        """计算净额持仓"""
        pass

REST API

自定义模板管理

列出用户模板

GET /api/v1/custom-templates/

查询参数:

响应:

{
  "templates": [
    {
      "id": "uuid-1234",
      "name": "My Aggressive Setup",
      "channels_config": [...],
      "created_at": "2025-11-22T10:00:00Z",
      "updated_at": "2025-11-22T12:00:00Z"
    }
  ]
}
创建模板

POST /api/v1/custom-templates/

请求参数:

{
  "name": "My Strategy Template",
  "channels_config": [
    {
      "channel_id": "aggressive_1",
      "channel_type": "aggressive",
      "capital_allocation": 0.4,
      "strategies": [
        {"strategy_id": "momentum_breakout", "weight": 0.6},
        {"strategy_id": "rsi_momentum", "weight": 0.4}
      ]
    }
  ]
}

验证规则 (通过 ConfigValidator):

响应:

{
  "id": "uuid-5678",
  "name": "My Strategy Template",
  "created_at": "2025-11-22T10:00:00Z"
}
更新模板

PUT /api/v1/custom-templates/{template_id}

删除模板

DELETE /api/v1/custom-templates/{template_id}

提交订单

POST /api/v1/trading/orders

请求参数:

{
  "symbol": "AAPL",
  "side": "BUY",
  "order_type": "LIMIT",
  "quantity": 100,
  "price": 150.00,
  "channel_id": "balanced_channel",
  "strategy_id": "ma_crossover"
}

响应:

{
  "success": true,
  "order_id": "ORD_20250101_001",
  "status": "SUBMITTED",
  "message": "Order successfully submitted"
}

获取持仓

GET /api/v1/trading/positions

响应:

{
  "positions": [
    {
      "symbol": "AAPL",
      "quantity": 100,
      "average_price": 150.00,
      "current_price": 151.00,
      "unrealized_pnl": 100.00,
      "channels": {
        "aggressive": 30,
        "balanced": 50,
        "conservative": 20
      }
    }
  ],
  "total_value": 15100.00,
  "total_pnl": 100.00
}

获取通道状态

GET /api/v1/trading/channels

响应:

{
  "channels": [
    {
      "channel_id": "aggressive_channel",
      "status": "RUNNING",
      "allocated_capital": 30000,
      "virtual_positions": 5,
      "total_pnl": 1200.50,
      "strategies_count": 3
    }
  ]
}

数据模型

CustomTemplate (自定义通道配置模板)

用户保存的通道配置模板,持久化在Redis中。

模型定义: backend.core.models.custom_template.py

字段:

Repository: backend.core.repositories.custom_template_repository.CustomTemplateRepository

存储结构 (Redis):

# Hash: user的模板索引
key: f"custom_templates:user:{user_id}"
field: template_id
value: template_name

# Hash: 模板详细数据
key: f"custom_template:{template_id}"
fields: {
    "name": str,
    "user_id": str,
    "channels_config": json_string,
    "created_at": iso_timestamp,
    "updated_at": iso_timestamp
}

CRUD操作:

扩展点

自定义交易引擎

实现 TradingEngine 基类来创建自定义交易引擎:

from backend.core.trading.engines.trading_engine import TradingEngine

class CustomTradingEngine(TradingEngine):
    """自定义交易引擎实现"""

    def __init__(self, config: Dict[str, Any]):
        super().__init__()
        self.config = config

    def submit_order(self, order: Order) -> OrderResult:
        """自定义订单执行逻辑"""
        # 实现自定义逻辑
        pass

    def cancel_order(self, order_id: str) -> bool:
        """自定义取消逻辑"""
        # 实现自定义逻辑
        pass

自定义风险控制

扩展风险引擎添加自定义风控规则:

from backend.core.trading.risk.risk_engine import RiskEngine

class CustomRiskEngine(RiskEngine):
    """自定义风险引擎"""

    def add_custom_rule(self, rule: RiskRule):
        """添加自定义风控规则"""
        # 实现规则添加逻辑
        pass

    def check_custom_risk(self, order: Order) -> RiskCheckResult:
        """执行自定义风险检查"""
        # 实现自定义风险检查逻辑
        pass

自定义通道类型

创建新的通道类型以支持不同的交易风格:

from backend.core.trading.engines.trading_channel import TradingChannel

class CustomChannel(TradingChannel):
    """自定义交易通道"""

    def __init__(self, channel_id: str, config: ChannelConfig):
        super().__init__(channel_id, config)
        self.custom_params = config.custom_params

    def process_signal(self, signal: Signal) -> Decision:
        """自定义信号处理逻辑"""
        # 实现特定的信号处理
        pass

    def calculate_position_size(self, signal: Signal) -> int:
        """自定义仓位计算"""
        # 实现特定的仓位算法
        pass

注册扩展

# 注册自定义组件
from backend.core.trading.registry import TradingRegistry

# 注册自定义交易引擎
TradingRegistry.register_engine("custom", CustomTradingEngine)

# 注册自定义通道
TradingRegistry.register_channel("custom", CustomChannel)

# 注册自定义风险引擎
TradingRegistry.register_risk_engine("custom", CustomRiskEngine)
@trading_engine.hook("before_order_submit")
def validate_order(order: Order) -> bool:
    """订单提交前的验证"""
    if order.quantity > 1000:
        logger.warning(f"Large order detected: {order.quantity}")
    return True

@trading_engine.hook("on_order_filled")
def notify_filled(order: Order, result: OrderResult):
    """订单成交通知"""
    notification_service.send(
        f"Order {order.order_id} filled at {result.execution_price}"
    )

配置

实际配置方式

Trading 模块的配置通过以下方式管理:

  1. 数据库配置:通过 SettingsRepository 存储用户级配置
  2. 环境变量:通过 .env 文件和 infrastructure/config/settings.py
  3. 运行时配置:通过 API 参数传递

支持的环境变量

根据 infrastructure/config/settings.py 的实际实现:

环境变量 说明 默认值
REDIS_HOST Redis主机 localhost
REDIS_PORT Redis端口 6379
TRADING_ENABLED 启用交易 False
RISK_LIMIT 风险限制 10000.0
LOG_LEVEL 日志级别 INFO
LONGPORT_APP_KEY 长桥API密钥 None
LONGPORT_APP_SECRET 长桥API密钥 None
LONGPORT_ACCESS_TOKEN 长桥访问令牌 None

配置示例

# 实际使用的配置方式
from infrastructure.config.settings import settings

# 访问配置
redis_host = settings.redis_host
trading_enabled = settings.trading_enabled

# 通过工厂获取数据源配置
from core.data_source.factories.config_factory import unified_config_factory

config = unified_config_factory.get_data_source_config(user_id)

相关文档