Data Pipeline 模块架构文档
本文档描述 Data Pipeline 模块的架构设计、数据流程、组件结构和配置说明。
概述
Data Pipeline 模块是 Fire 量化交易系统的数据基础设施,负责从多源数据采集、处理、存储到最终消费的完整数据流转。系统采用适配器模式统一不同数据源接口,通过自动路由和缓存优化保证数据的实时性和可靠性。
核心功能
数据采集能力
- 多源数据接入: 支持 LongPort API、Redis 等多种数据源
- 统一适配接口: 适配器模式屏蔽不同数据源的实现差异
- 自动路由选择: 根据交易模式(回测/实时)和资产模式自动选择数据源
- 容错与降级: 数据源故障时自动降级和重试机制
数据处理能力
- 历史数据管理: 策略预热所需的历史数据自动加载
- 时序一致性: 保证数据时间戳的顺序性和去重处理
- 批量优化: 将分散请求合并为批量操作,提升效率
- 内存管理: LRU 缓存策略控制内存占用
实时处理能力
- WebSocket 推送: 毫秒级行情数据推送
- 并发数据处理: 支持多策略并行消费数据
- 背压控制: 自适应流量控制防止系统过载
- 状态同步: 预热期间缓冲实时数据,保证数据完整性
性能特点
- 快速启动: 优化后系统启动时间从 13+ 分钟降至 <5 秒
- 内存可控: 通过 LRU 缓存和批量处理控制内存占用
- 低延迟: WebSocket 实时推送支持毫秒级数据延迟
- 并发处理: 支持多标的并行数据加载和处理
组件
组件架构图
graph TD
subgraph "数据源层"
LP[LongPort API<br/>券商接口]
REDIS[(Redis<br/>数据存储)]
WS[WebSocket<br/>实时推送]
end
subgraph "适配器层"
CF[ClientFactory<br/>客户端工厂]
ASAA[AssetDataSourceAdapter<br/>资产适配器]
QSAA[QuoteDataSourceAdapter<br/>行情适配器]
TSAA[TradeDataSourceAdapter<br/>交易适配器]
end
subgraph "处理层"
HDM[HistoricalDataManager<br/>历史数据管理]
TR[TimestampRegistry<br/>时间戳去重]
MDB[MarketDataBatch<br/>批量容器]
Cache[DataCache<br/>数据缓存]
end
subgraph "消费层"
SE[StrategyEngine<br/>策略引擎]
TE[TradingEngine<br/>交易引擎]
BE[BacktestEngine<br/>回测引擎]
end
LP --> CF
REDIS --> ASAA
WS --> QSAA
CF --> ASAA
CF --> QSAA
CF --> TSAA
QSAA --> HDM
HDM --> TR
HDM --> MDB
MDB --> Cache
Cache --> SE
Cache --> TE
Cache --> BE
style HDM fill:#e1f5ff
style CF fill:#e8f5e9
style Cache fill:#fff4e6
核心组件说明
ClientFactory (客户端工厂)
- 职责: 创建和管理不同数据源的客户端实例
- 实现类:
backend.core.data_source.factories.client_factory.ClientFactory - 主要方法:
get_or_create_client(): 获取或创建单例客户端close_client(): 关闭客户端连接clear_cache(): 清理客户端缓存
- 状态管理: 有状态(维护客户端连接池)
QuoteDataSourceAdapter (行情数据适配器)
- 职责: 统一不同数据源的行情数据接口
- 实现类:
backend.core.data_source.adapters.quote_adapter.QuoteDataSourceAdapter - 主要方法:
get_historical_data(): 获取历史行情数据subscribe_realtime(): 订阅实时行情get_batch_quotes(): 批量获取行情数据
- 状态管理: 无状态
HistoricalDataManager (历史数据管理器)
- 职责: 管理策略预热所需的历史数据加载和分发
- 实现类:
backend.core.trading.engines.historical_data_manager.HistoricalDataManager - 主要功能:
- 并行加载多个标的的历史数据
- 时间戳去重和排序
- 批量数据打包和分发
- 内存占用控制
- 性能优化:
- 使用
asyncio并发加载 - LRU 缓存减少重复查询
- 批量操作减少网络开销
- 使用
TimestampRegistry (时间戳注册表)
- 职责: 防止重复数据和保证时序一致性
- 实现类:
backend.core.trading.utils.timestamp_registry.TimestampRegistry - 主要功能:
- 记录已处理的时间戳
- 过滤重复数据
- 保证数据按时间顺序处理
依赖关系
内部依赖
| 模块名 | 用途 |
|---|---|
core.models |
数据模型定义 |
core.data_source.adapters |
数据源适配器 |
core.data_source.factories |
客户端和配置工厂 |
core.trading.engines.historical_data_manager |
历史数据管理 |
core.trading.utils.timestamp_registry |
时间戳去重 |
infrastructure.database.redis_client |
Redis 数据库连接 |
外部依赖
| 库名 | 版本要求 | 用途 |
|---|---|---|
pandas |
≥2.3.0 | 数据处理 |
numpy |
≥1.24.0 | 数值计算 |
redis |
≥5.0.0 | 数据存储 |
longport |
≥3.0.0 | 长桥证券API |
asyncio |
标准库 | 异步 IO |
数据流
数据采集流程
sequenceDiagram
participant Client as 客户端
participant Factory as 工厂
participant Adapter as 适配器
participant Source as 数据源
participant Cache as 缓存
Client->>Factory: 请求数据
Factory->>Adapter: 创建适配器
Adapter->>Cache: 检查缓存
alt 缓存命中
Cache-->>Client: 返回缓存数据
else 缓存未命中
Adapter->>Source: 查询数据
Source-->>Adapter: 原始数据
Adapter->>Adapter: 数据转换
Adapter->>Cache: 更新缓存
Adapter-->>Client: 返回数据
end
批量数据处理
flowchart LR
Start([开始]) --> Collect[收集请求]
Collect --> Check{达到批量<br/>阈值?}
Check -->|否| Timer{超时?}
Timer -->|否| Collect
Timer -->|是| Batch[打包批量]
Check -->|是| Batch
Batch --> Query[批量查询]
Query --> Parse[解析数据]
Parse --> Distribute[分发结果]
Distribute --> End([结束])
style Batch fill:#e1f5ff
style Query fill:#fff4e6
数据结构
输入数据格式
市场数据请求 (DataRequest):
@dataclass
class DataRequest:
symbols: List[str] # 标的列表
start_time: datetime # 开始时间
end_time: datetime # 结束时间
data_type: str # 数据类型 (kline, tick, quote)
interval: Optional[str] # K线周期
adjust_type: Optional[str] # 复权类型
输出数据格式
市场数据批次 (MarketDataBatch):
@dataclass
class MarketDataBatch:
symbol: str
timestamp: datetime
data: pd.DataFrame # 标准化的数据框
metadata: Dict[str, Any] # 元数据信息
# DataFrame 列结构
# - open, high, low, close
# - volume, turnover
# - timestamp, symbol
接口
Python API
DataSourceAdapter 接口
class DataSourceAdapter(ABC):
"""数据源适配器基类"""
@abstractmethod
async def get_historical_data(
self,
symbols: List[str],
start: datetime,
end: datetime,
interval: str = "1d"
) -> Dict[str, pd.DataFrame]:
"""获取历史数据"""
pass
@abstractmethod
async def subscribe_realtime(
self,
symbols: List[str],
callback: Callable
) -> None:
"""订阅实时数据"""
pass
HistoricalDataManager 使用示例
from backend.core.trading.engines.historical_data_manager import HistoricalDataManager
from backend.core.data_source.adapters.data_source_adapter import QuoteDataSourceAdapter
# 创建适配器和管理器
quote_adapter = QuoteDataSourceAdapter(config)
manager = HistoricalDataManager(
adapter=quote_adapter,
cache_size=1000,
batch_size=100
)
# 加载历史数据
await manager.load_historical_data(
symbols=["AAPL", "GOOGL"],
days_back=30,
interval="1d"
)
# 获取数据
data = manager.get_data("AAPL")
扩展点
自定义数据源适配器
from backend.core.data_source.adapters import DataSourceAdapter
class CustomDataAdapter(DataSourceAdapter):
"""自定义数据源适配器"""
def __init__(self, config: Dict[str, Any]):
self.config = config
# 初始化客户端
pass
async def get_historical_data(
self,
symbols: List[str],
start: datetime,
end: datetime,
interval: str = "1d"
) -> Dict[str, pd.DataFrame]:
"""实现自定义数据获取逻辑"""
# 自定义实现
pass
使用自定义适配器
from backend.core.data_source.factories.client_factory import ClientFactory
# 创建自定义适配器实例
custom_adapter = CustomDataAdapter(config={
"api_url": "https://api.custom.com",
"timeout": 30
})
# 在策略或交易引擎中使用
historical_data = await custom_adapter.get_historical_data(
symbols=["AAPL"],
start=datetime(2024, 1, 1),
end=datetime(2024, 12, 31),
interval="1d"
)
配置
实际配置方式
Data Pipeline 模块的配置通过以下方式管理:
- 数据源配置: 通过
ConfigFactory和unified_config_factory实例管理 - 环境变量: 通过
infrastructure/config/settings.py - 运行时参数: 通过 API 调用传递
支持的环境变量
基于实际的 settings.py:
| 环境变量 | 说明 | 默认值 |
|---|---|---|
REDIS_HOST |
Redis 主机 | localhost |
REDIS_PORT |
Redis 端口 | 6379 |
LONGPORT_APP_KEY |
长桥 API Key | None |
LONGPORT_APP_SECRET |
长桥 API Secret | None |
LONGPORT_ACCESS_TOKEN |
长桥访问令牌 | None |
配置示例
# 从配置工厂获取数据源配置
from backend.core.data_source.factories.config_factory import unified_config_factory
# 获取用户数据源配置
config = unified_config_factory.get_data_source_config(
user_id="user_123"
)
# 创建数据源客户端
from backend.core.data_source.factories.client_factory import unified_client_factory
client = unified_client_factory.get_or_create_client(
user_id="user_123",
config=config
)
相关文档
- Trading 模块架构 - 交易引擎详细设计
- Strategy 模块架构 - 策略系统详细设计
- 风险管理模块架构 - 风险控制详细设计
- 基础架构文档 - 系统基础架构