数据源模块架构文档
📋 概述
数据源模块是量化交易系统的基础设施组件,负责统一管理各种外部数据源的访问,提供标准化的数据接口。本文档详细描述了数据源模块的架构设计、接口定义、与其他模块的交互方式。
🏗️ 架构设计
1. 模块依赖关系图
graph TB
subgraph "业务层"
TE[TradingEngine]
SE[StrategyEngine]
RE[RiskEngine]
end
subgraph "数据源模块"
AA[AssetAdapter<br/>资产适配器]
QA[QuoteAdapter<br/>行情适配器<br/>日志回调支持]
TA[TradeAdapter<br/>交易适配器]
DA[DataAdapter<br/>数据适配器<br/>日志回调支持]
end
subgraph "数据源适配器层"
ASAA[AssetDataSourceAdapter]
QSAA[QuoteDataSourceAdapter]
TSAA[TradeDataSourceAdapter]
DSAA[DataSourceAdapter]
end
subgraph "客户端工厂"
UCF[UnifiedClientFactory<br/>统一客户端工厂]
end
subgraph "外部数据源"
LP[LongPort API<br/>券商接口]
DB[(数据库)]
REDIS[(Redis缓存)]
end
%% 依赖关系
TE --> AA
SE --> QA
RE --> AA
AA --> ASAA
QA --> QSAA
TA --> TSAA
DA --> DSAA
ASAA --> UCF
QSAA --> UCF
TSAA --> UCF
UCF --> LP
ASAA --> DB
ASAA --> REDIS
QSAA --> DB
QSAA --> REDIS
2. 模块职责划分
模块 |
职责 |
依赖关系 |
AssetAdapter |
资产数据业务适配器 |
依赖 AssetDataSourceAdapter |
QuoteAdapter |
行情数据业务适配器 |
依赖 QuoteDataSourceAdapter |
TradeAdapter |
交易数据业务适配器 |
依赖 TradeDataSourceAdapter |
DataAdapter |
数据导入业务适配器 |
依赖 DataSourceAdapter |
UnifiedClientFactory |
统一客户端工厂 |
依赖外部API |
🔌 核心接口定义
1. 数据源适配器基类 (DataSourceAdapter)
class DataSourceAdapter(ABC):
"""数据源适配器抽象基类"""
def __init__(self, user_id: str):
self.user_id = user_id
self._client: Optional[Dict[str, Any]] = None
def _get_client(self, force_refresh: bool = False) -> Optional[Dict[str, Any]]:
"""获取数据源客户端"""
if not self._client or force_refresh:
self._client = unified_client_factory.get_client(self.user_id, force_refresh)
return self._client
@abstractmethod
def is_available(self) -> bool:
"""检查适配器是否可用"""
pass
2. 资产数据适配器 (AssetDataSourceAdapter)
class AssetDataSourceAdapter(DataSourceAdapter):
"""资产数据源适配器"""
def is_available(self) -> bool:
"""检查资产适配器是否可用"""
return self._get_client() is not None
# === 账户信息 ===
def get_account_balance(self) -> Optional[Dict[str, Any]]:
"""获取账户余额"""
pass
def get_stock_positions(self) -> List[Any]:
"""获取股票持仓"""
pass
def get_cash_flow(self, start_date: date, end_date: date) -> List[Any]:
"""获取资金流水"""
pass
# === 持仓管理 ===
def get_position_details(self, symbol: str) -> Optional[Dict[str, Any]]:
"""获取持仓详情"""
pass
def update_position_cache(self, symbol: str, position_data: Dict[str, Any]) -> None:
"""更新持仓缓存"""
pass
3. 行情数据适配器 (QuoteDataSourceAdapter)
class QuoteDataSourceAdapter(DataSourceAdapter):
"""行情数据源适配器"""
def is_available(self) -> bool:
"""检查行情适配器是否可用"""
return self._get_client() is not None
# === 实时行情 ===
def get_quote(self, symbols: List[str]) -> List[Any]:
"""获取实时行情"""
pass
def get_depth(self, symbol: str) -> Optional[Dict[str, Any]]:
"""获取盘口信息"""
pass
def get_trades(self, symbol: str, count: int) -> List[Any]:
"""获取成交明细"""
pass
# === 历史数据 ===
def get_historical_data(self, symbol: str, start_time: datetime, end_time: datetime, log_callback=None) -> List[MarketData]:
"""获取历史数据 - 从数据库获取分钟级数据,支持日志回调"""
pass
def get_candlesticks(self, symbol: str, period: str, count: int,
adjust_type: str = "NoAdjust", trade_sessions: str = "Intraday") -> List[Any]:
"""获取K线数据"""
pass
# === 订阅管理 ===
def subscribe(self, symbols: List[str], sub_types: List[str], is_first_push: bool = False) -> bool:
"""订阅行情数据"""
pass
def unsubscribe(self, symbols: List[str], sub_types: List[str]) -> bool:
"""取消订阅"""
pass
4. 交易数据适配器 (TradeDataSourceAdapter)
class TradeDataSourceAdapter(DataSourceAdapter):
"""交易数据源适配器"""
def is_available(self) -> bool:
"""检查交易适配器是否可用"""
return self._get_client() is not None
# === 订单管理 ===
def submit_order(self, order: Order) -> OrderResult:
"""提交交易订单"""
pass
def cancel_order(self, order_id: str) -> bool:
"""取消订单"""
pass
def get_order_status(self, order_id: str) -> Optional[Dict[str, Any]]:
"""获取订单状态"""
pass
# === 交易历史 ===
def get_trade_history(self, start_date: date, end_date: date) -> List[Any]:
"""获取交易历史"""
pass
def get_order_history(self, start_date: date, end_date: date) -> List[Any]:
"""获取订单历史"""
pass
5. 统一客户端工厂 (UnifiedClientFactory)
class UnifiedClientFactory:
"""统一客户端工厂"""
def __init__(self):
self._client_cache: Dict[str, Dict[str, Any]] = {}
def get_client(self, user_id: str, force_refresh: bool = False) -> Optional[Dict[str, Any]]:
"""获取用户的数据源客户端"""
cache_key = f"client_{user_id}"
if not force_refresh and cache_key in self._client_cache:
return self._client_cache[cache_key]
try:
# 获取用户数据源配置
config = self._get_user_data_source_config(user_id)
# 创建客户端
client = self._create_client(config)
# 缓存客户端
self._client_cache[cache_key] = client
return client
except Exception as e:
print(f"❌ 创建客户端失败: {e}")
return None
def _get_user_data_source_config(self, user_id: str) -> Dict[str, Any]:
"""获取用户数据源配置"""
pass
def _create_client(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""创建数据源客户端"""
pass
🔄 模块交互关系
1. 数据源初始化流程
sequenceDiagram
participant UCF as UnifiedClientFactory
participant DSAA as DataSourceAdapter
participant LP as LongPort API
participant DB as 数据库
UCF->>UCF: get_client(user_id)
UCF->>DB: 获取用户配置
DB->>UCF: 返回配置信息
UCF->>LP: 创建LongPort客户端
LP->>UCF: 返回客户端实例
UCF->>UCF: 缓存客户端
UCF->>DSAA: 返回客户端
DSAA->>DSAA: 初始化适配器
2. 数据访问流程
sequenceDiagram
participant BS as BaseStrategy
participant QA as QuoteAdapter
participant QSAA as QuoteDataSourceAdapter
participant UCF as UnifiedClientFactory
participant DB as 数据库
BS->>QA: get_historical_data(symbol, start_time, end_time)
QA->>QSAA: get_historical_data()
QSAA->>UCF: get_client()
UCF->>QSAA: 返回客户端
QSAA->>DB: 查询历史数据
DB->>QSAA: 返回数据
QSAA->>QA: 返回MarketData列表
QA->>BS: 返回历史数据
3. 实时数据订阅流程
sequenceDiagram
participant SE as StrategyEngine
participant QA as QuoteAdapter
participant QSAA as QuoteDataSourceAdapter
participant LP as LongPort API
participant WS as WebSocket
SE->>QA: subscribe(symbols, sub_types)
QA->>QSAA: subscribe()
QSAA->>LP: 订阅实时数据
LP->>QSAA: 确认订阅
LP->>QSAA: 推送实时数据
QSAA->>QA: 处理数据
QA->>SE: 推送市场数据
SE->>WS: 推送到前端
📊 数据流设计
1. 数据访问模式
graph TD
subgraph "业务层"
A[业务逻辑]
end
subgraph "适配器层"
B[业务适配器]
C[数据源适配器]
end
subgraph "工厂层"
D[统一客户端工厂]
end
subgraph "数据源"
E[LongPort API]
F[数据库]
G[Redis缓存]
end
%% 数据流
A --> B
B --> C
C --> D
D --> E
C --> F
C --> G
%% 数据返回
E -.-> D
F -.-> C
G -.-> C
D -.-> C
C -.-> B
B -.-> A
2. 数据缓存策略
graph LR
subgraph "数据访问"
A[业务请求]
end
subgraph "缓存层"
B[Redis缓存]
C[内存缓存]
end
subgraph "数据源"
D[数据库]
E[外部API]
end
A --> B
B --> C
C --> D
C --> E
D --> C
E --> C
C --> B
B --> A
🛠️ 开发指南
1. 数据适配器开发流程
graph TD
A[1. 继承DataSourceAdapter] --> B[2. 实现抽象方法]
B --> C[3. 实现数据访问逻辑]
C --> D[4. 集成客户端工厂]
D --> E[5. 注册到系统]
A1[定义数据源类型] --> A
B1[is_available] --> B
B2[数据访问方法] --> B
C1[API调用逻辑] --> C
C2[数据转换逻辑] --> C
C3[错误处理逻辑] --> C
D1[客户端获取] --> D
D2[配置管理] --> D
E1[系统注册] --> E
2. 数据适配器开发模板
class MyCustomDataSourceAdapter(DataSourceAdapter):
"""自定义数据源适配器模板"""
def is_available(self) -> bool:
"""检查适配器是否可用"""
client = self._get_client()
return client is not None
def get_custom_data(self, symbol: str) -> Optional[Dict[str, Any]]:
"""获取自定义数据"""
client = self._get_client()
if not client:
return None
try:
# 实现数据获取逻辑
return self._fetch_data_from_source(client, symbol)
except Exception as e:
print(f"❌ 获取自定义数据失败: {e}")
return None
def _fetch_data_from_source(self, client: Dict[str, Any], symbol: str) -> Dict[str, Any]:
"""从数据源获取数据"""
# 实现具体的数据获取逻辑
pass
3. 客户端工厂扩展
class ExtendedUnifiedClientFactory(UnifiedClientFactory):
"""扩展的统一客户端工厂"""
def _create_client(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""创建扩展客户端"""
data_source_type = config.get("type")
if data_source_type == "LONGPORT":
return self._create_longport_client(config)
elif data_source_type == "CUSTOM":
return self._create_custom_client(config)
else:
raise ValueError(f"不支持的数据源类型: {data_source_type}")
def _create_custom_client(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""创建自定义客户端"""
# 实现自定义客户端创建逻辑
pass
🔧 最佳实践
1. 数据访问规范
访问类型 |
正确用法 |
错误用法 |
实时数据 |
通过适配器订阅 |
直接调用外部API |
历史数据 |
从数据库获取 |
调用实时API获取历史数据 |
缓存数据 |
使用Redis缓存 |
每次都查询数据库 |
错误处理 |
完善的异常处理 |
忽略异常 |
2. 性能优化建议
优化类型 |
实现方式 |
适用场景 |
客户端缓存 |
缓存客户端实例 |
频繁创建客户端 |
数据缓存 |
Redis缓存热点数据 |
频繁查询相同数据 |
连接池 |
使用连接池管理连接 |
高并发数据访问 |
异步处理 |
异步数据获取 |
非关键路径数据访问 |
3. 错误处理模式
def get_data(self, symbol: str) -> Optional[Dict[str, Any]]:
"""标准数据获取模式"""
try:
# 1. 检查客户端可用性
client = self._get_client()
if not client:
self.log_message(f"❌ 客户端不可用: {symbol}", "error")
return None
# 2. 尝试从缓存获取
cached_data = self._get_from_cache(symbol)
if cached_data:
return cached_data
# 3. 从数据源获取
data = self._fetch_from_source(client, symbol)
if data:
# 4. 缓存数据
self._cache_data(symbol, data)
return data
else:
self.log_message(f"❌ 未获取到数据: {symbol}", "warn")
return None
except Exception as e:
self.log_message(f"❌ 获取数据异常: {symbol} - {e}", "error")
return None
📈 现有实现分析
1. 数据适配器对比
适配器类型 |
数据源 |
实现复杂度 |
性能特点 |
可靠性 |
AssetDataSourceAdapter |
LongPort API + 数据库 |
中等 |
依赖网络 |
高 |
QuoteDataSourceAdapter |
LongPort API + 数据库 |
高 |
实时性好 |
高 |
TradeDataSourceAdapter |
LongPort API |
中等 |
依赖网络 |
中 |
DataAdapter |
LongPort API + 数据库 |
高 |
批量处理 |
高 |
2. 数据访问特性
- 统一接口: 所有数据访问通过适配器统一接口
- 缓存优化: Redis缓存热点数据,提高访问速度
- 错误处理: 完善的异常处理和降级机制
- 模式隔离: 回测模式不调用外部API
🚀 扩展建议
1. 数据源扩展
graph TB
subgraph "现有数据源"
A[LongPort API]
B[数据库]
C[Redis缓存]
end
subgraph "扩展数据源"
D[其他券商API]
E[第三方数据源]
F[实时数据流]
G[机器学习数据]
end
A --> D
B --> E
C --> F
E --> G
2. 高级功能扩展
- 多数据源融合: 支持多个数据源的数据融合
- 实时数据流: 支持流式数据处理
- 数据质量监控: 实时监控数据质量和完整性
- 智能缓存: 基于访问模式的智能缓存策略
📚 总结
核心架构特性
- 统一抽象: 通过适配器模式统一不同数据源
- 工厂模式: 统一客户端工厂管理客户端创建
- 缓存优化: 多级缓存提高数据访问性能
- 错误处理: 完善的异常处理和降级机制
- 模式隔离: 不同交易模式使用不同的数据访问策略
开发指导原则
- 接口统一: 所有数据访问通过统一接口
- 缓存优先: 优先使用缓存数据,减少外部调用
- 错误处理: 完善的异常处理和降级机制
- 性能优化: 合理使用缓存和连接池
- 模式隔离: 根据交易模式选择合适的数据访问策略
通过遵循本文档的架构设计和开发规范,可以构建出高效、可靠、可扩展的数据源管理系统,为量化交易提供稳定的数据支撑。