🇨🇳 简体中文
🇺🇸 English
🇯🇵 日本語
Skip to the content.

策略模块架构文档

📋 概述

策略模块是量化交易系统的核心组件,负责实现各种交易策略逻辑。本文档重点描述策略模块的接口定义、模块依赖关系和交互方式,为策略开发和系统扩展提供清晰的架构指导。

🏗️ 架构设计

1. 模块依赖关系图

graph TB
    subgraph "量化交易系统"
        TSE[TradingSessionEngine<br/>统一调度中心<br/>异步初始化流程]
    end
    
    subgraph "策略模块"
        SE[StrategyEngine<br/>策略管理器+统一日志中心<br/>初始化生命周期管理]
        BS[BaseStrategy<br/>策略基类<br/>被动执行模式]
        SF[StrategyFactory<br/>策略工厂]
        
        subgraph "具体策略实现"
            MACD[MACDStrategy]
            RSI[RSIStrategy]
            BB[BollingerBandsStrategy]
            MA[MovingAverageCrossoverStrategy]
        end
    end
    
    subgraph "交易模块"
        TE[TradingEngine<br/>交易引擎]
        SE2[SimulationEngine<br/>模拟交易引擎]
    end
    
    subgraph "数据模块"
        QA[QuoteAdapter<br/>行情适配器]
        DA[DataAdapter<br/>数据适配器]
    end
    
    subgraph "外部系统"
        API[券商API]
        DB[(数据库)]
        WS[WebSocket]
    end
    
    %% 核心依赖关系
    TSE --> SE
    SE --> BS
    SE --> TE
    SE --> QA
    
    SF --> BS
    MACD --> BS
    RSI --> BS
    BB --> BS
    MA --> BS
    
    TE --> SE2
    QA --> DA
    DA --> DB
    TE --> API
    SE --> WS
    
    %% 数据流(虚线表示数据流向)
    TSE -.->|市场数据推送| SE
    SE -.->|数据分发| BS
    BS -.->|交易信号| TE
    TE -.->|成交确认| BS
    BS -.->|历史数据请求| QA
    QA -.->|历史数据返回| BS
    BS -.->|日志记录| SE
    SE -.->|日志推送| WS

2. 模块职责划分

模块 职责 依赖关系
TradingSessionEngine 统一调度数据流,协调各模块,数据源管理 依赖 StrategyEngine, TradingEngine
StrategyEngine 策略生命周期管理、数据分发、统一日志管理 依赖 TradingEngine, QuoteAdapter
BaseStrategy 策略抽象接口、通用功能、被动接收数据 依赖 TradingEngine, QuoteAdapter, StrategyEngine
具体策略 实现具体交易逻辑,生成交易信号 继承 BaseStrategy
StrategyFactory 策略创建和注册 依赖 BaseStrategy
TradingEngine 订单执行、持仓管理 依赖 SimulationEngine
QuoteAdapter 数据访问统一接口 依赖 DataAdapter

3. 核心接口层次

classDiagram
    class BaseStrategy {
        <<abstract>>
        +name: str
        +config: Dict
        +is_initialized: bool
        +context: StrategyContext
        +trading_engine: TradingEngine
        +quote_adapter: QuoteAdapter
        +strategy_engine: StrategyEngine
        
        +initialize(context)* void
        +on_market_data(market_data)* void
        +on_trade_update(trade)* void
        
        +get_historical_data(symbol, start, end) List[MarketData]
        +get_portfolio() Portfolio
        
        +submit_order(order) bool
        +calculate_position_size(symbol, price, risk_ratio) Decimal
        
        +check_risk_limits(symbol, quantity, price) Dict
        +get_risk_limits() RiskLimits
        
        +log_message(message, log_type) void
        +start() void
        +stop() void
    }
    
    class MACDStrategy {
        +fast_period: int
        +slow_period: int
        +signal_period: int
        +initialize(context) void
        +on_market_data(market_data) void
        +on_trade_update(trade) void
    }
    
    class RSIStrategy {
        +period: int
        +overbought_threshold: float
        +oversold_threshold: float
        +initialize(context) void
        +on_market_data(market_data) void
        +on_trade_update(trade) void
    }
    
    class BollingerBandsStrategy {
        +period: int
        +std_dev: float
        +initialize(context) void
        +on_market_data(market_data) void
        +on_trade_update(trade) void
    }
    
    class MovingAverageCrossoverStrategy {
        +fast_period: int
        +slow_period: int
        +initialize(context) void
        +on_market_data(market_data) void
        +on_trade_update(trade) void
    }
    
    BaseStrategy <|-- MACDStrategy
    BaseStrategy <|-- RSIStrategy
    BaseStrategy <|-- BollingerBandsStrategy
    BaseStrategy <|-- MovingAverageCrossoverStrategy

🔌 核心接口定义

1. 策略基类接口 (BaseStrategy)

class BaseStrategy(ABC):
    """策略基类 - 提供通用功能和依赖注入"""
    
    # === 依赖注入 ===
    def __init__(self, name: str, config: Dict[str, Any]):
        self.name = name
        self.config = config
        self.is_initialized = False                              # 策略初始化状态
        self.context: Optional[StrategyContext] = None
        self.trading_engine: Optional[TradingEngine] = None      # 交易引擎依赖
        self.quote_adapter: Optional[QuoteAdapter] = None        # 行情适配器依赖
        self.strategy_engine: Optional[StrategyEngine] = None    # 策略引擎引用,用于统一日志管理
    
    # === 抽象方法 (必须实现) ===
    @abstractmethod
    def initialize(self, context: StrategyContext) -> None:
        """策略初始化 - 必须实现"""
        pass
    
    @abstractmethod
    def on_market_data(self, market_data: MarketData) -> None:
        """市场数据回调 - 必须实现"""
        pass
    
    @abstractmethod
    def on_trade_update(self, trade: Trade) -> None:
        """交易更新回调 - 必须实现"""
        pass
    
    # === 数据访问接口 ===
    def get_historical_data(self, symbol: str, start_time: datetime, end_time: datetime) -> List[MarketData]:
        """获取历史数据 - 通过行情适配器"""
        pass
    
    def get_portfolio(self) -> Portfolio:
        """获取投资组合 - 通过交易引擎"""
        pass
    
    # === 交易执行接口 ===
    def submit_order(self, order: Order) -> bool:
        """提交订单 - 通过交易引擎"""
        pass
    
    def calculate_position_size(self, symbol: str, price: Decimal, risk_ratio: float) -> Decimal:
        """计算仓位大小"""
        pass
    
    # === 风险管理接口 ===
    def check_risk_limits(self, symbol: str, quantity: Decimal, price: Decimal) -> Dict[str, Any]:
        """风险检查"""
        pass
    
    def get_risk_limits(self) -> RiskLimits:
        """获取风险限制"""
        pass
    
    # === 日志管理接口 ===
    def log_message(self, message: str, log_type: str = "log") -> None:
        """统一日志接口 - 通过策略引擎"""
        pass
    
    # === 生命周期管理 ===
    def start(self) -> None:
        """启动策略"""
        pass
    
    def stop(self) -> None:
        """停止策略"""
        pass

2. 策略引擎接口 (StrategyEngine)

class StrategyEngine:
    """策略引擎 - 管理策略生命周期、数据分发和统一日志管理"""
    
    def __init__(self, user_id: str, session_id: str, 
                 trading_engine: TradingEngine, 
                 session_config: Dict[str, Any],
                 trading_session_engine=None):
        self.trading_engine = trading_engine      # 交易引擎依赖
        self.quote_adapter = QuoteAdapter(user_id) # 行情适配器依赖
        self.strategies: Dict[str, BaseStrategy] = {}
        self.trading_session_engine = trading_session_engine  # 量化交易系统引用
        
        # 初始化生命周期管理
        self.is_initialized = False
        self.initialization_lock = threading.Lock()
        self.initialization_event = threading.Event()
        self.initialization_error = None
        
        # 策略执行同步控制
        self.strategy_execution_lock = threading.Lock()
        self.strategy_execution_event = threading.Event()
        
        # 统一日志管理
        self.log_sequence = 0
    
    # === 策略管理接口 ===
    def add_strategy(self, strategy: BaseStrategy) -> bool:
        """添加策略 - 注入依赖并初始化"""
        pass
    
    def remove_strategy(self, strategy_name: str) -> bool:
        """移除策略"""
        pass
    
    # === 数据分发接口 ===
    def process_market_data(self, market_data: MarketData) -> None:
        """处理市场数据 - 分发给所有策略(被动模式)"""
        pass
    
    # === 初始化生命周期管理 ===
    def start(self) -> bool:
        """启动策略引擎 - 异步初始化"""
        pass
    
    def wait_for_initialization(self, timeout: float = 30.0) -> bool:
        """等待策略初始化完成"""
        pass
    
    def _start_initialization(self) -> None:
        """异步初始化流程"""
        pass
    
    def _load_strategies_historical_data(self) -> None:
        """加载所有策略的历史数据"""
        pass
    
    # === 策略执行同步控制 ===
    def wait_for_strategy_execution(self, timeout: float = 5.0) -> bool:
        """等待策略执行完成(回测模式)"""
        pass
    
    # === 日志管理接口 ===
    def log_message(self, message: str, log_type: str = "log", component: str = "strategy_engine") -> None:
        """统一日志管理 - 保存到Redis并推送到WebSocket"""
        pass
    
    def get_historical_data(self, symbol: str, start_time: datetime, end_time: datetime) -> List[MarketData]:
        """获取历史数据 - 供策略调用"""
        pass

3. 策略工厂接口 (StrategyFactory)

class StrategyFactory:
    """策略工厂 - 创建和注册策略"""
    
    @staticmethod
    def create_strategy(strategy_name: str, config: Dict[str, Any]) -> Optional[BaseStrategy]:
        """创建策略实例"""
        pass
    
    @staticmethod
    def get_available_strategies() -> Dict[str, Dict[str, Any]]:
        """获取可用策略列表"""
        pass
    
    @staticmethod
    def validate_strategy_config(strategy_name: str, config: Dict[str, Any]) -> Dict[str, Any]:
        """验证策略配置"""
        pass

🔄 模块交互关系

1. 依赖注入关系

sequenceDiagram
    participant TSE as TradingSessionEngine
    participant SE as StrategyEngine
    participant BS as BaseStrategy
    participant TE as TradingEngine
    participant QA as QuoteAdapter
    
    TSE->>SE: 创建策略引擎
    TSE->>SE: 注入 TradingEngine
    TSE->>SE: 注入 session_config
    TSE->>SE: 注入 TradingSessionEngine (引用)
    
    SE->>BS: 创建策略实例
    SE->>BS: 注入 TradingEngine
    SE->>BS: 注入 QuoteAdapter
    SE->>BS: 注入 StrategyEngine (日志)
    SE->>BS: 调用 initialize()
    SE->>BS: 设置 is_initialized = True
    
    Note over BS: 策略完全被动化<br/>不主动获取数据
    
    BS->>QA: 获取历史数据 (按需)
    BS->>TE: 提交订单 (被动)
    BS->>SE: 记录日志 (统一)

2. 数据流交互

sequenceDiagram
    participant TSE as TradingSessionEngine
    participant SE as StrategyEngine
    participant BS as BaseStrategy
    participant TE as TradingEngine
    
    Note over TSE: 统一调度数据流
    
    TSE->>SE: process_market_data(market_data)
    
    alt 回测模式
        SE->>SE: 等待策略执行完成
    end
    
    SE->>BS: on_market_data(market_data)
    
    Note over BS: 策略被动接收数据<br/>生成交易信号
    
    BS->>TE: submit_order(order)
    TE->>BS: 返回成交结果
    BS->>SE: log_message("交易完成")
    SE->>SE: 生成序号并保存到Redis
    SE->>SE: 推送日志到WebSocket
    
    alt 回测模式
        SE->>SE: 设置策略执行完成事件
    end
    
    Note over TSE: 统一日志管理

3. 接口调用关系

调用方 被调用方 接口方法 目的
TradingSessionEngine StrategyEngine process_market_data() 推送市场数据
TradingSessionEngine StrategyEngine wait_for_initialization() 等待策略初始化完成
StrategyEngine BaseStrategy on_market_data() 分发市场数据
StrategyEngine BaseStrategy initialize() 初始化策略
StrategyEngine BaseStrategy _load_historical_data() 加载历史数据
BaseStrategy TradingEngine submit_order() 执行交易
BaseStrategy StrategyEngine get_historical_data() 获取历史数据
BaseStrategy StrategyEngine log_message() 记录日志
StrategyFactory BaseStrategy create_strategy() 创建策略实例

关键设计原则

4. 模块边界和职责

graph LR
    subgraph "策略模块边界"
        SE[StrategyEngine]
        BS[BaseStrategy]
        SF[StrategyFactory]
    end
    
    subgraph "外部依赖"
        TE[TradingEngine]
        QA[QuoteAdapter]
        TSE[TradingSessionEngine]
    end
    
    SE -.->|依赖| TE
    SE -.->|依赖| QA
    BS -.->|依赖| TE
    BS -.->|依赖| QA
    BS -.->|依赖| SE
    TSE -.->|使用| SE
    SF -.->|创建| BS

模块边界规则

📊 数据流设计

1. 完整数据流图

graph TD
    subgraph "数据源层"
        API[券商API]
        DB[(数据库)]
    end
    
    subgraph "适配器层"
        QA[QuoteAdapter]
        DA[DataAdapter]
    end
    
    subgraph "系统层"
        TSE[TradingSessionEngine]
        SE[StrategyEngine]
    end
    
    subgraph "策略层"
        BS[BaseStrategy]
        CS[具体策略]
    end
    
    subgraph "交易层"
        TE[TradingEngine]
        SE2[SimulationEngine]
    end
    
    subgraph "输出层"
        WS[WebSocket]
        LOG[日志系统]
    end
    
    %% 数据流
    API --> QA
    DB --> DA
    DA --> QA
    
    TSE --> SE
    SE --> BS
    BS --> CS
    
    CS --> TE
    TE --> SE2
    
    SE --> WS
    SE --> LOG
    
    %% 数据流向
    API -.->|实时数据| QA
    DB -.->|历史数据| DA
    QA -.->|市场数据| SE
    SE -.->|数据分发| CS
    CS -.->|交易信号| TE
    TE -.->|成交确认| CS
    CS -.->|日志| SE
    SE -.->|日志推送| WS

2. 数据流类型

数据流类型 方向 数据内容 触发方式
市场数据流 下行 MarketData 定时推送/实时推送
交易信号流 上行 Order 策略生成信号时
成交确认流 下行 Trade 订单成交时
历史数据流 双向 List[MarketData] 策略请求时
日志流 上行 LogMessage 事件发生时
状态流 双向 StrategyStatus 状态变更时

🛠️ 策略开发指南

1. 策略开发流程

graph TD
    A[1. 继承BaseStrategy] --> B[2. 实现抽象方法]
    B --> C[3. 实现策略逻辑]
    C --> D[4. 注册策略]
    D --> E[5. 配置模板]
    E --> F[6. 测试验证]
    
    A1[定义策略参数] --> A
    B1[initialize] --> B
    B2[on_market_data] --> B
    B3[on_trade_update] --> B
    B4[on_timer] --> B
    
    C1[信号生成逻辑] --> C
    C2[风险检查] --> C
    C3[订单执行] --> C
    
    D1[StrategyFactory注册] --> D
    E1[参数定义] --> E
    E2[风险配置] --> E
    F1[单元测试] --> F
    F2[回测验证] --> F

2. 策略开发模板

class MyCustomStrategy(BaseStrategy):
    """自定义策略模板"""
    
    def __init__(self, name: str, config: Dict[str, Any]):
        super().__init__(name, config)
        # 策略参数
        self.fast_period = config.get('fast_period', 12)
        self.slow_period = config.get('slow_period', 26)
        self.position_size = config.get('position_size', 0.1)
    
    def initialize(self, context: StrategyContext) -> None:
        """策略初始化 - 必须实现"""
        self.context = context
        self.is_active = True
        self._load_historical_data()
        self.log_message(f"✅ {self.name}策略已初始化")
    
    def on_market_data(self, market_data: MarketData) -> None:
        """市场数据回调 - 必须实现"""
        if not market_data.is_market_open:
            return
        
        # 生成交易信号
        signal = self._generate_signal(market_data)
        if signal:
            self._execute_signal(signal, market_data)
    
    def on_trade_update(self, trade: Trade) -> None:
        """交易更新回调 - 必须实现"""
        self.log_message(f"📊 交易更新: {trade}")
    
# 注意:策略完全被动化,不再需要定时器回调
# 所有逻辑都在 on_market_data 中处理
    
    # === 策略核心逻辑 ===
    def _generate_signal(self, market_data: MarketData) -> Optional[Dict]:
        """生成交易信号"""
        # 实现信号生成逻辑
        pass
    
    def _execute_signal(self, signal: Dict, market_data: MarketData) -> None:
        """执行交易信号"""
        # 风险检查
        risk_check = self.check_risk_limits(signal['symbol'], signal['quantity'], signal['price'])
        if not risk_check["valid"]:
            self.log_message(f"❌ 风险检查失败: {risk_check['errors']}", "warn")
            return
        
        # 提交订单
        order = self._create_order(signal)
        success = self.submit_order(order)
        
        if success:
            self.log_message(f"🟢 订单提交成功: {signal['symbol']}", "info")
        else:
            self.log_message(f"❌ 订单提交失败: {signal['symbol']}", "error")

3. 策略注册

# strategy_factory.py
def create_strategy(strategy_name: str, config: Dict[str, Any]) -> Optional[BaseStrategy]:
    strategy_mapping = {
        "macd": MACDStrategy,
        "rsi": RSIStrategy,
        "bollinger": BollingerBandsStrategy,
        "ma_crossover": MovingAverageCrossoverStrategy,
        "my_custom": MyCustomStrategy,  # 注册新策略
    }
    
    strategy_class = strategy_mapping.get(strategy_name.lower())
    if strategy_class:
        return strategy_class(strategy_name, config)
    return None

4. 配置模板

# strategy_config.py
class MyCustomStrategyConfig(StrategyConfigTemplate):
    def __init__(self):
        super().__init__(
            strategy_name="my_custom",
            display_name="自定义策略",
            description="基于自定义技术指标的交易策略",
            parameters=[
                StrategyParameter(
                    name="fast_period",
                    type=ParameterType.INTEGER,
                    default_value=12,
                    description="快速周期",
                    min_value=5,
                    max_value=50,
                    required=True
                ),
                StrategyParameter(
                    name="position_size",
                    type=ParameterType.FLOAT,
                    default_value=0.1,
                    description="仓位大小比例",
                    min_value=0.01,
                    max_value=1.0,
                    required=True
                )
            ],
            recommended_risk_config={
                "max_position_ratio": 0.3,
                "stop_loss_ratio": 0.1,
                "max_drawdown": 0.2
            }
        )

🔧 最佳实践

1. 接口使用规范

接口类型 正确用法 错误用法
数据获取 self.get_historical_data() 直接调用外部API
风险管理 self.check_risk_limits() 跳过风险检查
日志记录 self.log_message() 使用print()
订单提交 self.submit_order() 直接调用交易引擎
投资组合 self.get_portfolio() 直接访问持仓数据

2. 依赖注入规范

graph LR
    subgraph "策略模块"
        BS[BaseStrategy]
    end
    
    subgraph "外部依赖"
        TE[TradingEngine]
        QA[QuoteAdapter]
        SE[StrategyEngine]
    end
    
    BS -.->|注入| TE
    BS -.->|注入| QA
    BS -.->|注入| SE
    
    BS -->|使用| TE
    BS -->|使用| QA
    BS -->|使用| SE

依赖注入规则

3. 错误处理模式

# 标准错误处理模式
def _execute_signal(self, signal: Dict, market_data: MarketData) -> None:
    try:
        # 1. 参数验证
        if not self._validate_signal(signal):
            self.log_message("❌ 信号参数无效", "error")
            return
        
        # 2. 风险检查
        risk_check = self.check_risk_limits(signal['symbol'], signal['quantity'], signal['price'])
        if not risk_check["valid"]:
            self.log_message(f"❌ 风险检查失败: {risk_check['errors']}", "warn")
            return
        
        # 3. 执行交易
        order = self._create_order(signal)
        success = self.submit_order(order)
        
        if success:
            self.log_message(f"🟢 订单提交成功: {signal['symbol']}", "info")
        else:
            self.log_message(f"❌ 订单提交失败: {signal['symbol']}", "error")
            
    except Exception as e:
        self.log_message(f"❌ 执行信号异常: {e}", "error")

4. 性能优化建议

优化类型 实现方式 适用场景
数据缓存 缓存技术指标计算结果 频繁计算相同指标
批量处理 累积多个数据点后批量处理 高频数据场景
异步处理 使用异步方法处理非关键路径 网络请求、文件IO
内存管理 及时清理不需要的数据 长时间运行的策略

📈 现有策略分析

1. 策略对比表

策略名称 技术指标 信号逻辑 适用场景 风险特征 实现复杂度
MACD MACD线、信号线 金叉死叉 趋势跟踪 中等风险 中等
RSI RSI指标 超买超卖反转 震荡市场 低风险 简单
布林带 布林带 突破边界 波动性交易 高风险 中等
移动平均 双均线 均线交叉 趋势确认 低风险 简单

2. 策略接口实现对比

graph TD
    subgraph "策略实现层次"
        BS[BaseStrategy]
        
        subgraph "简单策略"
            RSI[RSIStrategy]
            MA[MovingAverageStrategy]
        end
        
        subgraph "中等复杂度"
            MACD[MACDStrategy]
            BB[BollingerBandsStrategy]
        end
        
        subgraph "复杂策略"
            ML[MLStrategy]
            MF[MultiFactorStrategy]
        end
    end
    
    BS --> RSI
    BS --> MA
    BS --> MACD
    BS --> BB
    BS --> ML
    BS --> MF

🚀 扩展建议

1. 策略扩展架构

graph TB
    subgraph "策略类型扩展"
        A[技术指标策略]
        B[基本面策略]
        C[量化策略]
        D[机器学习策略]
    end
    
    subgraph "策略组合扩展"
        E[策略轮动]
        F[多策略并行]
        G[策略权重]
    end
    
    subgraph "高级功能扩展"
        H[动态参数调整]
        I[自适应风险管理]
        J[实时策略优化]
    end
    
    A --> E
    B --> F
    C --> G
    D --> H
    E --> I
    F --> J

2. 扩展接口设计

# 高级策略接口
class AdvancedStrategy(BaseStrategy):
    """高级策略接口"""
    
    @abstractmethod
    def adapt_parameters(self, market_condition: Dict) -> None:
        """动态参数调整"""
        pass
    
    @abstractmethod
    def optimize_risk(self, performance: Dict) -> None:
        """自适应风险管理"""
        pass

# 策略组合接口
class StrategyPortfolio:
    """策略组合管理"""
    
    def add_strategy(self, strategy: BaseStrategy, weight: float) -> None:
        """添加策略并设置权重"""
        pass
    
    def rebalance(self, market_data: MarketData) -> None:
        """策略权重再平衡"""
        pass

📚 总结

核心架构特性

  1. 分层设计: 清晰的模块分层和职责划分
  2. 接口驱动: 基于接口的模块交互和依赖注入
  3. 统一管理: 策略引擎统一管理策略生命周期
  4. 标准化: 标准化的数据访问和日志管理
  5. 可扩展: 支持快速添加新策略类型

开发指导原则

通过遵循本文档的架构设计和开发规范,可以快速开发出高质量、可维护的交易策略,并轻松集成到量化交易系统中。