策略模块架构文档
📋 概述
策略模块是量化交易系统的核心组件,负责实现各种交易策略逻辑。本文档重点描述策略模块的接口定义、模块依赖关系和交互方式,为策略开发和系统扩展提供清晰的架构指导。
🏗️ 架构设计
1. 模块依赖关系图
graph TB
subgraph "量化交易系统"
TSE[TradingSessionEngine<br/>统一调度中心<br/>异步初始化流程]
end
subgraph "策略模块"
SE[StrategyEngine<br/>策略管理器+统一日志中心<br/>初始化生命周期管理]
BS[BaseStrategy<br/>策略基类<br/>被动执行模式]
SF[StrategyFactory<br/>策略工厂]
subgraph "具体策略实现"
MACD[MACDStrategy]
RSI[RSIStrategy]
BB[BollingerBandsStrategy]
MA[MovingAverageCrossoverStrategy]
end
end
subgraph "交易模块"
TE[TradingEngine<br/>交易引擎]
SE2[SimulationEngine<br/>模拟交易引擎]
end
subgraph "数据模块"
QA[QuoteAdapter<br/>行情适配器]
DA[DataAdapter<br/>数据适配器]
end
subgraph "外部系统"
API[券商API]
DB[(数据库)]
WS[WebSocket]
end
%% 核心依赖关系
TSE --> SE
SE --> BS
SE --> TE
SE --> QA
SF --> BS
MACD --> BS
RSI --> BS
BB --> BS
MA --> BS
TE --> SE2
QA --> DA
DA --> DB
TE --> API
SE --> WS
%% 数据流(虚线表示数据流向)
TSE -.->|市场数据推送| SE
SE -.->|数据分发| BS
BS -.->|交易信号| TE
TE -.->|成交确认| BS
BS -.->|历史数据请求| QA
QA -.->|历史数据返回| BS
BS -.->|日志记录| SE
SE -.->|日志推送| WS
2. 模块职责划分
模块 |
职责 |
依赖关系 |
TradingSessionEngine |
统一调度数据流,协调各模块,数据源管理 |
依赖 StrategyEngine, TradingEngine |
StrategyEngine |
策略生命周期管理、数据分发、统一日志管理 |
依赖 TradingEngine, QuoteAdapter |
BaseStrategy |
策略抽象接口、通用功能、被动接收数据 |
依赖 TradingEngine, QuoteAdapter, StrategyEngine |
具体策略 |
实现具体交易逻辑,生成交易信号 |
继承 BaseStrategy |
StrategyFactory |
策略创建和注册 |
依赖 BaseStrategy |
TradingEngine |
订单执行、持仓管理 |
依赖 SimulationEngine |
QuoteAdapter |
数据访问统一接口 |
依赖 DataAdapter |
3. 核心接口层次
classDiagram
class BaseStrategy {
<<abstract>>
+name: str
+config: Dict
+is_initialized: bool
+context: StrategyContext
+trading_engine: TradingEngine
+quote_adapter: QuoteAdapter
+strategy_engine: StrategyEngine
+initialize(context)* void
+on_market_data(market_data)* void
+on_trade_update(trade)* void
+get_historical_data(symbol, start, end) List[MarketData]
+get_portfolio() Portfolio
+submit_order(order) bool
+calculate_position_size(symbol, price, risk_ratio) Decimal
+check_risk_limits(symbol, quantity, price) Dict
+get_risk_limits() RiskLimits
+log_message(message, log_type) void
+start() void
+stop() void
}
class MACDStrategy {
+fast_period: int
+slow_period: int
+signal_period: int
+initialize(context) void
+on_market_data(market_data) void
+on_trade_update(trade) void
}
class RSIStrategy {
+period: int
+overbought_threshold: float
+oversold_threshold: float
+initialize(context) void
+on_market_data(market_data) void
+on_trade_update(trade) void
}
class BollingerBandsStrategy {
+period: int
+std_dev: float
+initialize(context) void
+on_market_data(market_data) void
+on_trade_update(trade) void
}
class MovingAverageCrossoverStrategy {
+fast_period: int
+slow_period: int
+initialize(context) void
+on_market_data(market_data) void
+on_trade_update(trade) void
}
BaseStrategy <|-- MACDStrategy
BaseStrategy <|-- RSIStrategy
BaseStrategy <|-- BollingerBandsStrategy
BaseStrategy <|-- MovingAverageCrossoverStrategy
🔌 核心接口定义
1. 策略基类接口 (BaseStrategy)
class BaseStrategy(ABC):
"""策略基类 - 提供通用功能和依赖注入"""
# === 依赖注入 ===
def __init__(self, name: str, config: Dict[str, Any]):
self.name = name
self.config = config
self.is_initialized = False # 策略初始化状态
self.context: Optional[StrategyContext] = None
self.trading_engine: Optional[TradingEngine] = None # 交易引擎依赖
self.quote_adapter: Optional[QuoteAdapter] = None # 行情适配器依赖
self.strategy_engine: Optional[StrategyEngine] = None # 策略引擎引用,用于统一日志管理
# === 抽象方法 (必须实现) ===
@abstractmethod
def initialize(self, context: StrategyContext) -> None:
"""策略初始化 - 必须实现"""
pass
@abstractmethod
def on_market_data(self, market_data: MarketData) -> None:
"""市场数据回调 - 必须实现"""
pass
@abstractmethod
def on_trade_update(self, trade: Trade) -> None:
"""交易更新回调 - 必须实现"""
pass
# === 数据访问接口 ===
def get_historical_data(self, symbol: str, start_time: datetime, end_time: datetime) -> List[MarketData]:
"""获取历史数据 - 通过行情适配器"""
pass
def get_portfolio(self) -> Portfolio:
"""获取投资组合 - 通过交易引擎"""
pass
# === 交易执行接口 ===
def submit_order(self, order: Order) -> bool:
"""提交订单 - 通过交易引擎"""
pass
def calculate_position_size(self, symbol: str, price: Decimal, risk_ratio: float) -> Decimal:
"""计算仓位大小"""
pass
# === 风险管理接口 ===
def check_risk_limits(self, symbol: str, quantity: Decimal, price: Decimal) -> Dict[str, Any]:
"""风险检查"""
pass
def get_risk_limits(self) -> RiskLimits:
"""获取风险限制"""
pass
# === 日志管理接口 ===
def log_message(self, message: str, log_type: str = "log") -> None:
"""统一日志接口 - 通过策略引擎"""
pass
# === 生命周期管理 ===
def start(self) -> None:
"""启动策略"""
pass
def stop(self) -> None:
"""停止策略"""
pass
2. 策略引擎接口 (StrategyEngine)
class StrategyEngine:
"""策略引擎 - 管理策略生命周期、数据分发和统一日志管理"""
def __init__(self, user_id: str, session_id: str,
trading_engine: TradingEngine,
session_config: Dict[str, Any],
trading_session_engine=None):
self.trading_engine = trading_engine # 交易引擎依赖
self.quote_adapter = QuoteAdapter(user_id) # 行情适配器依赖
self.strategies: Dict[str, BaseStrategy] = {}
self.trading_session_engine = trading_session_engine # 量化交易系统引用
# 初始化生命周期管理
self.is_initialized = False
self.initialization_lock = threading.Lock()
self.initialization_event = threading.Event()
self.initialization_error = None
# 策略执行同步控制
self.strategy_execution_lock = threading.Lock()
self.strategy_execution_event = threading.Event()
# 统一日志管理
self.log_sequence = 0
# === 策略管理接口 ===
def add_strategy(self, strategy: BaseStrategy) -> bool:
"""添加策略 - 注入依赖并初始化"""
pass
def remove_strategy(self, strategy_name: str) -> bool:
"""移除策略"""
pass
# === 数据分发接口 ===
def process_market_data(self, market_data: MarketData) -> None:
"""处理市场数据 - 分发给所有策略(被动模式)"""
pass
# === 初始化生命周期管理 ===
def start(self) -> bool:
"""启动策略引擎 - 异步初始化"""
pass
def wait_for_initialization(self, timeout: float = 30.0) -> bool:
"""等待策略初始化完成"""
pass
def _start_initialization(self) -> None:
"""异步初始化流程"""
pass
def _load_strategies_historical_data(self) -> None:
"""加载所有策略的历史数据"""
pass
# === 策略执行同步控制 ===
def wait_for_strategy_execution(self, timeout: float = 5.0) -> bool:
"""等待策略执行完成(回测模式)"""
pass
# === 日志管理接口 ===
def log_message(self, message: str, log_type: str = "log", component: str = "strategy_engine") -> None:
"""统一日志管理 - 保存到Redis并推送到WebSocket"""
pass
def get_historical_data(self, symbol: str, start_time: datetime, end_time: datetime) -> List[MarketData]:
"""获取历史数据 - 供策略调用"""
pass
3. 策略工厂接口 (StrategyFactory)
class StrategyFactory:
"""策略工厂 - 创建和注册策略"""
@staticmethod
def create_strategy(strategy_name: str, config: Dict[str, Any]) -> Optional[BaseStrategy]:
"""创建策略实例"""
pass
@staticmethod
def get_available_strategies() -> Dict[str, Dict[str, Any]]:
"""获取可用策略列表"""
pass
@staticmethod
def validate_strategy_config(strategy_name: str, config: Dict[str, Any]) -> Dict[str, Any]:
"""验证策略配置"""
pass
🔄 模块交互关系
1. 依赖注入关系
sequenceDiagram
participant TSE as TradingSessionEngine
participant SE as StrategyEngine
participant BS as BaseStrategy
participant TE as TradingEngine
participant QA as QuoteAdapter
TSE->>SE: 创建策略引擎
TSE->>SE: 注入 TradingEngine
TSE->>SE: 注入 session_config
TSE->>SE: 注入 TradingSessionEngine (引用)
SE->>BS: 创建策略实例
SE->>BS: 注入 TradingEngine
SE->>BS: 注入 QuoteAdapter
SE->>BS: 注入 StrategyEngine (日志)
SE->>BS: 调用 initialize()
SE->>BS: 设置 is_initialized = True
Note over BS: 策略完全被动化<br/>不主动获取数据
BS->>QA: 获取历史数据 (按需)
BS->>TE: 提交订单 (被动)
BS->>SE: 记录日志 (统一)
2. 数据流交互
sequenceDiagram
participant TSE as TradingSessionEngine
participant SE as StrategyEngine
participant BS as BaseStrategy
participant TE as TradingEngine
Note over TSE: 统一调度数据流
TSE->>SE: process_market_data(market_data)
alt 回测模式
SE->>SE: 等待策略执行完成
end
SE->>BS: on_market_data(market_data)
Note over BS: 策略被动接收数据<br/>生成交易信号
BS->>TE: submit_order(order)
TE->>BS: 返回成交结果
BS->>SE: log_message("交易完成")
SE->>SE: 生成序号并保存到Redis
SE->>SE: 推送日志到WebSocket
alt 回测模式
SE->>SE: 设置策略执行完成事件
end
Note over TSE: 统一日志管理
3. 接口调用关系
调用方 |
被调用方 |
接口方法 |
目的 |
TradingSessionEngine |
StrategyEngine |
process_market_data() |
推送市场数据 |
TradingSessionEngine |
StrategyEngine |
wait_for_initialization() |
等待策略初始化完成 |
StrategyEngine |
BaseStrategy |
on_market_data() |
分发市场数据 |
StrategyEngine |
BaseStrategy |
initialize() |
初始化策略 |
StrategyEngine |
BaseStrategy |
_load_historical_data() |
加载历史数据 |
BaseStrategy |
TradingEngine |
submit_order() |
执行交易 |
BaseStrategy |
StrategyEngine |
get_historical_data() |
获取历史数据 |
BaseStrategy |
StrategyEngine |
log_message() |
记录日志 |
StrategyFactory |
BaseStrategy |
create_strategy() |
创建策略实例 |
关键设计原则:
- 策略完全被动化:策略不主动获取数据,只处理推送的数据
- 统一数据调度:所有数据流由
TradingSessionEngine
统一调度
- 统一日志管理:所有日志通过
StrategyEngine
统一管理
- 初始化生命周期:完整的策略初始化状态管理
- 异步初始化:API立即返回,后台异步完成初始化
- 策略执行同步:回测模式下等待策略执行完成
4. 模块边界和职责
graph LR
subgraph "策略模块边界"
SE[StrategyEngine]
BS[BaseStrategy]
SF[StrategyFactory]
end
subgraph "外部依赖"
TE[TradingEngine]
QA[QuoteAdapter]
TSE[TradingSessionEngine]
end
SE -.->|依赖| TE
SE -.->|依赖| QA
BS -.->|依赖| TE
BS -.->|依赖| QA
BS -.->|依赖| SE
TSE -.->|使用| SE
SF -.->|创建| BS
模块边界规则:
- 策略模块不直接依赖外部API或数据库
- 所有外部访问通过注入的适配器进行
- 策略模块通过接口与外部系统交互
- 日志统一通过StrategyEngine管理
📊 数据流设计
1. 完整数据流图
graph TD
subgraph "数据源层"
API[券商API]
DB[(数据库)]
end
subgraph "适配器层"
QA[QuoteAdapter]
DA[DataAdapter]
end
subgraph "系统层"
TSE[TradingSessionEngine]
SE[StrategyEngine]
end
subgraph "策略层"
BS[BaseStrategy]
CS[具体策略]
end
subgraph "交易层"
TE[TradingEngine]
SE2[SimulationEngine]
end
subgraph "输出层"
WS[WebSocket]
LOG[日志系统]
end
%% 数据流
API --> QA
DB --> DA
DA --> QA
TSE --> SE
SE --> BS
BS --> CS
CS --> TE
TE --> SE2
SE --> WS
SE --> LOG
%% 数据流向
API -.->|实时数据| QA
DB -.->|历史数据| DA
QA -.->|市场数据| SE
SE -.->|数据分发| CS
CS -.->|交易信号| TE
TE -.->|成交确认| CS
CS -.->|日志| SE
SE -.->|日志推送| WS
2. 数据流类型
数据流类型 |
方向 |
数据内容 |
触发方式 |
市场数据流 |
下行 |
MarketData |
定时推送/实时推送 |
交易信号流 |
上行 |
Order |
策略生成信号时 |
成交确认流 |
下行 |
Trade |
订单成交时 |
历史数据流 |
双向 |
List[MarketData] |
策略请求时 |
日志流 |
上行 |
LogMessage |
事件发生时 |
状态流 |
双向 |
StrategyStatus |
状态变更时 |
🛠️ 策略开发指南
1. 策略开发流程
graph TD
A[1. 继承BaseStrategy] --> B[2. 实现抽象方法]
B --> C[3. 实现策略逻辑]
C --> D[4. 注册策略]
D --> E[5. 配置模板]
E --> F[6. 测试验证]
A1[定义策略参数] --> A
B1[initialize] --> B
B2[on_market_data] --> B
B3[on_trade_update] --> B
B4[on_timer] --> B
C1[信号生成逻辑] --> C
C2[风险检查] --> C
C3[订单执行] --> C
D1[StrategyFactory注册] --> D
E1[参数定义] --> E
E2[风险配置] --> E
F1[单元测试] --> F
F2[回测验证] --> F
2. 策略开发模板
class MyCustomStrategy(BaseStrategy):
"""自定义策略模板"""
def __init__(self, name: str, config: Dict[str, Any]):
super().__init__(name, config)
# 策略参数
self.fast_period = config.get('fast_period', 12)
self.slow_period = config.get('slow_period', 26)
self.position_size = config.get('position_size', 0.1)
def initialize(self, context: StrategyContext) -> None:
"""策略初始化 - 必须实现"""
self.context = context
self.is_active = True
self._load_historical_data()
self.log_message(f"✅ {self.name}策略已初始化")
def on_market_data(self, market_data: MarketData) -> None:
"""市场数据回调 - 必须实现"""
if not market_data.is_market_open:
return
# 生成交易信号
signal = self._generate_signal(market_data)
if signal:
self._execute_signal(signal, market_data)
def on_trade_update(self, trade: Trade) -> None:
"""交易更新回调 - 必须实现"""
self.log_message(f"📊 交易更新: {trade}")
# 注意:策略完全被动化,不再需要定时器回调
# 所有逻辑都在 on_market_data 中处理
# === 策略核心逻辑 ===
def _generate_signal(self, market_data: MarketData) -> Optional[Dict]:
"""生成交易信号"""
# 实现信号生成逻辑
pass
def _execute_signal(self, signal: Dict, market_data: MarketData) -> None:
"""执行交易信号"""
# 风险检查
risk_check = self.check_risk_limits(signal['symbol'], signal['quantity'], signal['price'])
if not risk_check["valid"]:
self.log_message(f"❌ 风险检查失败: {risk_check['errors']}", "warn")
return
# 提交订单
order = self._create_order(signal)
success = self.submit_order(order)
if success:
self.log_message(f"🟢 订单提交成功: {signal['symbol']}", "info")
else:
self.log_message(f"❌ 订单提交失败: {signal['symbol']}", "error")
3. 策略注册
# strategy_factory.py
def create_strategy(strategy_name: str, config: Dict[str, Any]) -> Optional[BaseStrategy]:
strategy_mapping = {
"macd": MACDStrategy,
"rsi": RSIStrategy,
"bollinger": BollingerBandsStrategy,
"ma_crossover": MovingAverageCrossoverStrategy,
"my_custom": MyCustomStrategy, # 注册新策略
}
strategy_class = strategy_mapping.get(strategy_name.lower())
if strategy_class:
return strategy_class(strategy_name, config)
return None
4. 配置模板
# strategy_config.py
class MyCustomStrategyConfig(StrategyConfigTemplate):
def __init__(self):
super().__init__(
strategy_name="my_custom",
display_name="自定义策略",
description="基于自定义技术指标的交易策略",
parameters=[
StrategyParameter(
name="fast_period",
type=ParameterType.INTEGER,
default_value=12,
description="快速周期",
min_value=5,
max_value=50,
required=True
),
StrategyParameter(
name="position_size",
type=ParameterType.FLOAT,
default_value=0.1,
description="仓位大小比例",
min_value=0.01,
max_value=1.0,
required=True
)
],
recommended_risk_config={
"max_position_ratio": 0.3,
"stop_loss_ratio": 0.1,
"max_drawdown": 0.2
}
)
🔧 最佳实践
1. 接口使用规范
接口类型 |
正确用法 |
错误用法 |
数据获取 |
self.get_historical_data() |
直接调用外部API |
风险管理 |
self.check_risk_limits() |
跳过风险检查 |
日志记录 |
self.log_message() |
使用print() |
订单提交 |
self.submit_order() |
直接调用交易引擎 |
投资组合 |
self.get_portfolio() |
直接访问持仓数据 |
2. 依赖注入规范
graph LR
subgraph "策略模块"
BS[BaseStrategy]
end
subgraph "外部依赖"
TE[TradingEngine]
QA[QuoteAdapter]
SE[StrategyEngine]
end
BS -.->|注入| TE
BS -.->|注入| QA
BS -.->|注入| SE
BS -->|使用| TE
BS -->|使用| QA
BS -->|使用| SE
依赖注入规则:
- 策略不直接创建外部依赖
- 所有依赖通过构造函数或setter注入
- 策略只使用注入的接口,不关心具体实现
3. 错误处理模式
# 标准错误处理模式
def _execute_signal(self, signal: Dict, market_data: MarketData) -> None:
try:
# 1. 参数验证
if not self._validate_signal(signal):
self.log_message("❌ 信号参数无效", "error")
return
# 2. 风险检查
risk_check = self.check_risk_limits(signal['symbol'], signal['quantity'], signal['price'])
if not risk_check["valid"]:
self.log_message(f"❌ 风险检查失败: {risk_check['errors']}", "warn")
return
# 3. 执行交易
order = self._create_order(signal)
success = self.submit_order(order)
if success:
self.log_message(f"🟢 订单提交成功: {signal['symbol']}", "info")
else:
self.log_message(f"❌ 订单提交失败: {signal['symbol']}", "error")
except Exception as e:
self.log_message(f"❌ 执行信号异常: {e}", "error")
4. 性能优化建议
优化类型 |
实现方式 |
适用场景 |
数据缓存 |
缓存技术指标计算结果 |
频繁计算相同指标 |
批量处理 |
累积多个数据点后批量处理 |
高频数据场景 |
异步处理 |
使用异步方法处理非关键路径 |
网络请求、文件IO |
内存管理 |
及时清理不需要的数据 |
长时间运行的策略 |
📈 现有策略分析
1. 策略对比表
策略名称 |
技术指标 |
信号逻辑 |
适用场景 |
风险特征 |
实现复杂度 |
MACD |
MACD线、信号线 |
金叉死叉 |
趋势跟踪 |
中等风险 |
中等 |
RSI |
RSI指标 |
超买超卖反转 |
震荡市场 |
低风险 |
简单 |
布林带 |
布林带 |
突破边界 |
波动性交易 |
高风险 |
中等 |
移动平均 |
双均线 |
均线交叉 |
趋势确认 |
低风险 |
简单 |
2. 策略接口实现对比
graph TD
subgraph "策略实现层次"
BS[BaseStrategy]
subgraph "简单策略"
RSI[RSIStrategy]
MA[MovingAverageStrategy]
end
subgraph "中等复杂度"
MACD[MACDStrategy]
BB[BollingerBandsStrategy]
end
subgraph "复杂策略"
ML[MLStrategy]
MF[MultiFactorStrategy]
end
end
BS --> RSI
BS --> MA
BS --> MACD
BS --> BB
BS --> ML
BS --> MF
🚀 扩展建议
1. 策略扩展架构
graph TB
subgraph "策略类型扩展"
A[技术指标策略]
B[基本面策略]
C[量化策略]
D[机器学习策略]
end
subgraph "策略组合扩展"
E[策略轮动]
F[多策略并行]
G[策略权重]
end
subgraph "高级功能扩展"
H[动态参数调整]
I[自适应风险管理]
J[实时策略优化]
end
A --> E
B --> F
C --> G
D --> H
E --> I
F --> J
2. 扩展接口设计
# 高级策略接口
class AdvancedStrategy(BaseStrategy):
"""高级策略接口"""
@abstractmethod
def adapt_parameters(self, market_condition: Dict) -> None:
"""动态参数调整"""
pass
@abstractmethod
def optimize_risk(self, performance: Dict) -> None:
"""自适应风险管理"""
pass
# 策略组合接口
class StrategyPortfolio:
"""策略组合管理"""
def add_strategy(self, strategy: BaseStrategy, weight: float) -> None:
"""添加策略并设置权重"""
pass
def rebalance(self, market_data: MarketData) -> None:
"""策略权重再平衡"""
pass
📚 总结
核心架构特性
- 分层设计: 清晰的模块分层和职责划分
- 接口驱动: 基于接口的模块交互和依赖注入
- 统一管理: 策略引擎统一管理策略生命周期
- 标准化: 标准化的数据访问和日志管理
- 可扩展: 支持快速添加新策略类型
开发指导原则
- 依赖注入: 所有外部依赖通过注入获得
- 接口隔离: 策略只依赖必要的接口
- 单一职责: 每个模块只负责一个核心功能
- 开闭原则: 对扩展开放,对修改封闭
- 统一标准: 所有策略遵循相同的开发标准
通过遵循本文档的架构设计和开发规范,可以快速开发出高质量、可维护的交易策略,并轻松集成到量化交易系统中。