交易模块架构文档
📋 概述
交易模块是量化交易系统的核心执行组件,负责处理策略生成的交易信号,执行订单提交、持仓管理和成交确认。本文档详细描述了交易模块的架构设计、接口定义、与其他模块的交互方式。
🏗️ 架构设计
1. 模块依赖关系图
graph TB
subgraph "量化交易系统"
TSE[TradingSessionEngine<br/>统一调度中心<br/>异步初始化流程]
end
subgraph "交易模块"
TE[TradingEngine<br/>交易引擎抽象基类<br/>统一日志管理]
RTE[RealTradingEngine<br/>真实交易引擎]
STE[SimulationTradingEngine<br/>模拟交易引擎]
SE[SimulationEngine<br/>模拟交易引擎<br/>统一日志管理]
end
subgraph "数据模块"
TA[TradeAdapter<br/>交易适配器]
AA[AssetAdapter<br/>资产适配器]
QA[QuoteAdapter<br/>行情适配器]
end
subgraph "外部系统"
API[券商API]
DB[(数据库)]
REDIS[(Redis缓存)]
end
%% 依赖关系
TSE --> TE
TE --> RTE
TE --> STE
STE --> SE
RTE --> TA
RTE --> AA
SE --> AA
SE --> QA
TA --> API
AA --> DB
AA --> REDIS
QA --> DB
2. 模块职责划分
模块 |
职责 |
依赖关系 |
TradingEngine |
交易引擎抽象基类,定义交易接口 |
依赖 AssetAdapter |
RealTradingEngine |
真实交易引擎,调用券商API |
依赖 TradeAdapter, AssetAdapter |
SimulationTradingEngine |
模拟交易引擎包装器 |
依赖 SimulationEngine |
SimulationEngine |
模拟交易核心逻辑 |
依赖 AssetAdapter, QuoteAdapter |
🔌 核心接口定义
1. 交易引擎抽象基类 (TradingEngine)
class TradingEngine(ABC):
"""交易引擎抽象基类"""
def __init__(self, user_id: str, session_id: str,
trading_mode: TradingMode, asset_mode: AssetMode):
self.user_id = user_id
self.session_id = session_id
self.trading_mode = trading_mode
self.asset_mode = asset_mode
self.is_running = False
self.start_time: Optional[datetime] = None
self.end_time: Optional[datetime] = None
# === 抽象方法 (必须实现) ===
@abstractmethod
def initialize(self) -> bool:
"""初始化交易引擎"""
pass
@abstractmethod
def start(self) -> bool:
"""启动交易引擎"""
pass
@abstractmethod
def stop(self) -> bool:
"""停止交易引擎"""
pass
@abstractmethod
def submit_order(self, order: Order) -> OrderResult:
"""提交交易订单"""
pass
@abstractmethod
def cancel_order(self, order_id: str) -> bool:
"""取消订单"""
pass
@abstractmethod
def get_positions(self) -> List[Position]:
"""获取持仓列表"""
pass
@abstractmethod
def get_account_balance(self) -> AccountBalance:
"""获取账户余额"""
pass
# === 通用方法 ===
def process_market_data(self, market_data: MarketData) -> None:
"""处理市场数据"""
pass
def set_strategy_engine(self, strategy_engine) -> None:
"""设置策略引擎引用,用于统一日志管理"""
pass
2. 真实交易引擎 (RealTradingEngine)
class RealTradingEngine(TradingEngine):
"""真实交易引擎 - 调用券商API"""
def __init__(self, user_id: str, session_id: str,
trading_mode: TradingMode, asset_mode: AssetMode):
super().__init__(user_id, session_id, trading_mode, asset_mode)
self.trade_adapter = TradeDataSourceAdapter(user_id)
self.orders: Dict[str, Order] = {}
self.positions: List[Position] = []
def initialize(self) -> bool:
"""初始化真实交易引擎"""
# 检查交易适配器是否可用
# 加载现有持仓
pass
def submit_order(self, order: Order) -> OrderResult:
"""提交真实交易订单"""
# 通过TradeAdapter调用券商API
pass
def get_positions(self) -> List[Position]:
"""获取真实持仓"""
# 通过AssetAdapter获取持仓
pass
def get_account_balance(self) -> AccountBalance:
"""获取真实账户余额"""
# 通过AssetAdapter获取余额
pass
3. 模拟交易引擎 (SimulationTradingEngine)
class SimulationTradingEngine(TradingEngine):
"""模拟交易引擎 - 包装SimulationEngine"""
def __init__(self, user_id: str, session_id: str,
trading_mode: TradingMode, asset_mode: AssetMode,
initial_capital: Decimal):
super().__init__(user_id, session_id, trading_mode, asset_mode)
self.initial_capital = initial_capital
self.simulation_engine: Optional[SimulationEngine] = None
self.strategy_engine = None
def initialize(self) -> bool:
"""初始化模拟交易引擎"""
# 创建SimulationEngine实例
pass
def submit_order(self, order: Order) -> OrderResult:
"""提交模拟交易订单"""
# 委托给SimulationEngine处理
pass
def get_positions(self) -> List[Position]:
"""获取模拟持仓"""
# 从SimulationEngine获取持仓
pass
def get_account_balance(self) -> AccountBalance:
"""获取模拟账户余额"""
# 从SimulationEngine获取余额
pass
4. 模拟交易核心引擎 (SimulationEngine)
class SimulationEngine:
"""模拟交易核心引擎"""
def __init__(self, user_id: str, session_id: str,
initial_capital: Decimal, strategy_engine=None):
self.user_id = user_id
self.session_id = session_id
self.initial_capital = initial_capital
self.current_capital = initial_capital
self.positions: Dict[str, Position] = {}
self.orders: Dict[str, Order] = {}
self.trades: List[Trade] = []
self.strategy_engine = strategy_engine
# === 订单管理 ===
def submit_order(self, order: Order) -> OrderResult:
"""提交模拟订单"""
# 1. 订单验证
# 2. 风险检查
# 3. 订单处理
# 4. 成交判定
pass
def cancel_order(self, order_id: str) -> bool:
"""取消订单"""
pass
# === 持仓管理 ===
def get_positions(self) -> List[Position]:
"""获取持仓列表"""
pass
def update_position(self, symbol: str, quantity: Decimal, price: Decimal) -> None:
"""更新持仓"""
pass
# === 资金管理 ===
def get_account_balance(self) -> AccountBalance:
"""获取账户余额"""
pass
def update_capital(self, amount: Decimal) -> None:
"""更新资金"""
pass
# === 市场数据处理 ===
def process_market_data(self, market_data: MarketData) -> None:
"""处理市场数据,检查订单成交"""
# 1. 更新持仓市值
# 2. 检查订单成交
# 3. 更新订单状态
pass
# === 日志管理 ===
def log_message(self, message: str, log_type: str = "log") -> None:
"""统一日志接口"""
pass
🔄 模块交互关系
1. 交易引擎工厂模式
sequenceDiagram
participant TSE as TradingSessionEngine
participant TEF as TradingEngineFactory
participant TE as TradingEngine
participant RTE as RealTradingEngine
participant STE as SimulationTradingEngine
participant SE as SimulationEngine
TSE->>TEF: create_trading_engine()
TEF->>TEF: 根据asset_mode选择引擎类型
alt 真实资产模式
TEF->>RTE: 创建RealTradingEngine
RTE->>RTE: 初始化TradeAdapter
else 模拟资产模式
TEF->>STE: 创建SimulationTradingEngine
STE->>SE: 创建SimulationEngine
SE->>SE: 初始化资金和持仓
end
TEF->>TSE: 返回TradingEngine实例
2. 订单执行流程
sequenceDiagram
participant BS as BaseStrategy
participant TE as TradingEngine
participant SE as SimulationEngine
participant RE as RiskEngine
participant API as 券商API
BS->>TE: submit_order(order)
TE->>RE: 风险检查
RE->>TE: 风险验证结果
alt 风险检查通过
alt 真实交易
TE->>API: 提交真实订单
API->>TE: 订单确认
else 模拟交易
TE->>SE: 处理模拟订单
SE->>SE: 订单验证和成交判定
SE->>TE: 成交结果
end
TE->>BS: 返回OrderResult
else 风险检查失败
TE->>BS: 返回失败结果
end
3. 市场数据处理流程
sequenceDiagram
participant TSE as TradingSessionEngine
participant TE as TradingEngine
participant SE as SimulationEngine
participant BS as BaseStrategy
TSE->>TE: process_market_data(market_data)
alt 模拟交易模式
TE->>SE: process_market_data(market_data)
SE->>SE: 更新持仓市值
SE->>SE: 检查订单成交
SE->>SE: 更新订单状态
SE->>BS: 推送成交确认
else 真实交易模式
TE->>TE: 更新持仓信息
TE->>BS: 推送成交确认
end
📊 数据流设计
1. 交易数据流
graph TD
subgraph "策略层"
BS[BaseStrategy]
end
subgraph "交易层"
TE[TradingEngine]
SE[SimulationEngine]
end
subgraph "数据层"
TA[TradeAdapter]
AA[AssetAdapter]
QA[QuoteAdapter]
end
subgraph "外部系统"
API[券商API]
DB[(数据库)]
REDIS[(Redis)]
end
%% 数据流
BS -->|订单请求| TE
TE -->|风险检查| RE
TE -->|订单执行| SE
SE -->|持仓更新| AA
SE -->|价格查询| QA
TE -->|真实订单| TA
TA -->|API调用| API
AA -->|数据存储| DB
AA -->|缓存更新| REDIS
2. 交易模式对比
交易模式 |
订单执行 |
持仓管理 |
资金管理 |
风险控制 |
真实交易 |
券商API |
真实持仓 |
真实资金 |
严格限制 |
模拟交易 |
模拟引擎 |
虚拟持仓 |
虚拟资金 |
可配置 |
🛠️ 开发指南
1. 交易引擎开发流程
graph TD
A[1. 继承TradingEngine] --> B[2. 实现抽象方法]
B --> C[3. 实现交易逻辑]
C --> D[4. 集成数据适配器]
D --> E[5. 注册到工厂]
A1[定义交易参数] --> A
B1[initialize] --> B
B2[submit_order] --> B
B3[get_positions] --> B
B4[get_account_balance] --> B
C1[订单验证] --> C
C2[风险检查] --> C
C3[成交处理] --> C
D1[TradeAdapter] --> D
D2[AssetAdapter] --> D
E1[工厂注册] --> E
2. 交易引擎开发模板
class MyCustomTradingEngine(TradingEngine):
"""自定义交易引擎模板"""
def __init__(self, user_id: str, session_id: str,
trading_mode: TradingMode, asset_mode: AssetMode):
super().__init__(user_id, session_id, trading_mode, asset_mode)
# 初始化特定参数
self.custom_config = {}
def initialize(self) -> bool:
"""初始化交易引擎 - 必须实现"""
try:
# 初始化数据适配器
# 加载配置
# 验证环境
return True
except Exception as e:
self.log_message(f"❌ 初始化失败: {e}", "error")
return False
def submit_order(self, order: Order) -> OrderResult:
"""提交订单 - 必须实现"""
try:
# 1. 订单验证
if not self._validate_order(order):
return OrderResult.failed("订单验证失败")
# 2. 风险检查
risk_check = self._check_risk(order)
if not risk_check.valid:
return OrderResult.failed(f"风险检查失败: {risk_check.reason}")
# 3. 执行交易
result = self._execute_order(order)
if result.success:
self.log_message(f"🟢 订单执行成功: {order.symbol}", "info")
else:
self.log_message(f"❌ 订单执行失败: {result.error}", "error")
return result
except Exception as e:
self.log_message(f"❌ 提交订单异常: {e}", "error")
return OrderResult.failed(str(e))
def get_positions(self) -> List[Position]:
"""获取持仓 - 必须实现"""
# 实现持仓获取逻辑
pass
def get_account_balance(self) -> AccountBalance:
"""获取账户余额 - 必须实现"""
# 实现余额获取逻辑
pass
3. 交易引擎注册
# trading_engine_factory.py
def create_trading_engine(user_id: str, session_id: str,
trading_mode: TradingMode,
asset_mode: AssetMode,
initial_capital: Decimal = Decimal("100000")) -> TradingEngine:
"""创建交易引擎工厂函数"""
if asset_mode == AssetMode.REAL:
return RealTradingEngine(user_id, session_id, trading_mode, asset_mode)
elif asset_mode == AssetMode.SIMULATION:
return SimulationTradingEngine(
user_id, session_id, trading_mode, asset_mode, initial_capital
)
else:
raise ValueError(f"不支持的资产模式: {asset_mode}")
🔧 最佳实践
1. 接口使用规范
接口类型 |
正确用法 |
错误用法 |
订单提交 |
trading_engine.submit_order() |
直接调用券商API |
持仓查询 |
trading_engine.get_positions() |
直接查询数据库 |
余额查询 |
trading_engine.get_account_balance() |
直接调用资产API |
风险检查 |
在订单提交前检查 |
跳过风险检查 |
日志记录 |
self.log_message() |
使用print() |
2. 错误处理模式
def submit_order(self, order: Order) -> OrderResult:
"""标准订单提交模式"""
try:
# 1. 参数验证
if not self._validate_order_params(order):
return OrderResult.failed("订单参数无效")
# 2. 风险检查
risk_result = self._check_order_risk(order)
if not risk_result.valid:
return OrderResult.failed(f"风险检查失败: {risk_result.reason}")
# 3. 执行交易
execution_result = self._execute_order_logic(order)
if execution_result.success:
self.log_message(f"🟢 订单成功: {order.symbol} {order.quantity}", "info")
return execution_result
else:
self.log_message(f"❌ 订单失败: {execution_result.error}", "error")
return execution_result
except Exception as e:
self.log_message(f"❌ 订单异常: {e}", "error")
return OrderResult.failed(f"系统异常: {e}")
3. 性能优化建议
优化类型 |
实现方式 |
适用场景 |
订单缓存 |
缓存活跃订单状态 |
频繁查询订单状态 |
持仓缓存 |
缓存持仓信息 |
频繁计算持仓价值 |
异步处理 |
异步处理非关键路径 |
日志记录、状态更新 |
批量操作 |
批量处理多个订单 |
高频交易场景 |
📈 现有实现分析
1. 交易引擎对比
引擎类型 |
实现复杂度 |
风险等级 |
适用场景 |
性能特点 |
RealTradingEngine |
高 |
高 |
真实交易 |
依赖网络延迟 |
SimulationTradingEngine |
中 |
低 |
回测/模拟 |
本地处理,速度快 |
2. 模拟交易特性
- 真实模拟: 包含滑点、手续费、市场冲击
- 风险控制: 完整的风险检查和限制
- 性能优化: 批量处理和缓存机制
- 数据一致性: 与真实交易数据格式一致
🚀 扩展建议
1. 交易引擎扩展
graph TB
subgraph "现有引擎"
RTE[RealTradingEngine]
STE[SimulationTradingEngine]
end
subgraph "扩展引擎"
HTE[HighFrequencyTradingEngine]
CTE[CrossExchangeTradingEngine]
ATE[AlgorithmicTradingEngine]
end
RTE --> HTE
RTE --> CTE
STE --> ATE
2. 高级功能扩展
- 高频交易引擎: 支持微秒级订单处理
- 跨交易所套利: 支持多交易所同时交易
- 算法交易: 支持TWAP、VWAP等算法
- 智能路由: 自动选择最优执行路径
📚 总结
核心架构特性
- 抽象设计: 通过TradingEngine抽象基类统一接口
- 工厂模式: 根据资产模式自动创建对应引擎
- 依赖注入: 通过适配器模式访问外部系统
- 风险控制: 内置完整的风险检查机制
- 日志统一: 通过StrategyEngine统一日志管理
开发指导原则
- 接口隔离: 交易引擎只依赖必要的接口
- 单一职责: 每个引擎只负责一种交易模式
- 开闭原则: 对扩展开放,对修改封闭
- 依赖倒置: 依赖抽象而非具体实现
- 统一标准: 所有引擎遵循相同的开发标准
通过遵循本文档的架构设计和开发规范,可以快速开发出高质量、可维护的交易引擎,并轻松集成到量化交易系统中。