🇨🇳 简体中文
🇺🇸 English
🇯🇵 日本語
Skip to the content.

Data Pipeline 模块架构文档

本文档描述 Data Pipeline 模块的架构设计、数据流程、组件结构和配置说明。

概述

Data Pipeline 模块是 Fire 量化交易系统的数据基础设施,负责从多源数据采集、处理、存储到最终消费的完整数据流转。系统采用适配器模式统一不同数据源接口,通过自动路由和缓存优化保证数据的实时性和可靠性。

核心功能

数据采集能力

数据处理能力

实时处理能力

性能特点

组件

组件架构图

graph TD
    subgraph "数据源层"
        LP[LongPort API<br/>券商接口]
        REDIS[(Redis<br/>数据存储)]
        WS[WebSocket<br/>实时推送]
    end

    subgraph "适配器层"
        CF[ClientFactory<br/>客户端工厂]
        ASAA[AssetDataSourceAdapter<br/>资产适配器]
        QSAA[QuoteDataSourceAdapter<br/>行情适配器]
        TSAA[TradeDataSourceAdapter<br/>交易适配器]
    end

    subgraph "处理层"
        HDM[HistoricalDataManager<br/>历史数据管理]
        TR[TimestampRegistry<br/>时间戳去重]
        MDB[MarketDataBatch<br/>批量容器]
        Cache[DataCache<br/>数据缓存]
    end

    subgraph "消费层"
        SE[StrategyEngine<br/>策略引擎]
        TE[TradingEngine<br/>交易引擎]
        BE[BacktestEngine<br/>回测引擎]
    end

    LP --> CF
    REDIS --> ASAA
    WS --> QSAA

    CF --> ASAA
    CF --> QSAA
    CF --> TSAA

    QSAA --> HDM
    HDM --> TR
    HDM --> MDB
    MDB --> Cache

    Cache --> SE
    Cache --> TE
    Cache --> BE

    style HDM fill:#e1f5ff
    style CF fill:#e8f5e9
    style Cache fill:#fff4e6

核心组件说明

ClientFactory (客户端工厂)

QuoteDataSourceAdapter (行情数据适配器)

HistoricalDataManager (历史数据管理器)

TimestampRegistry (时间戳注册表)

依赖关系

内部依赖

模块名 用途
core.models 数据模型定义
core.data_source.adapters 数据源适配器
core.data_source.factories 客户端和配置工厂
core.trading.engines.historical_data_manager 历史数据管理
core.trading.utils.timestamp_registry 时间戳去重
infrastructure.database.redis_client Redis 数据库连接

外部依赖

库名 版本要求 用途
pandas ≥2.3.0 数据处理
numpy ≥1.24.0 数值计算
redis ≥5.0.0 数据存储
longport ≥3.0.0 长桥证券API
asyncio 标准库 异步 IO

数据流

数据采集流程

sequenceDiagram
    participant Client as 客户端
    participant Factory as 工厂
    participant Adapter as 适配器
    participant Source as 数据源
    participant Cache as 缓存

    Client->>Factory: 请求数据
    Factory->>Adapter: 创建适配器
    Adapter->>Cache: 检查缓存

    alt 缓存命中
        Cache-->>Client: 返回缓存数据
    else 缓存未命中
        Adapter->>Source: 查询数据
        Source-->>Adapter: 原始数据
        Adapter->>Adapter: 数据转换
        Adapter->>Cache: 更新缓存
        Adapter-->>Client: 返回数据
    end

批量数据处理

flowchart LR
    Start([开始]) --> Collect[收集请求]
    Collect --> Check{达到批量<br/>阈值?}
    Check -->|否| Timer{超时?}
    Timer -->|否| Collect
    Timer -->|是| Batch[打包批量]
    Check -->|是| Batch
    Batch --> Query[批量查询]
    Query --> Parse[解析数据]
    Parse --> Distribute[分发结果]
    Distribute --> End([结束])

    style Batch fill:#e1f5ff
    style Query fill:#fff4e6

数据结构

输入数据格式

市场数据请求 (DataRequest):

@dataclass
class DataRequest:
    symbols: List[str]          # 标的列表
    start_time: datetime         # 开始时间
    end_time: datetime          # 结束时间
    data_type: str              # 数据类型 (kline, tick, quote)
    interval: Optional[str]      # K线周期
    adjust_type: Optional[str]   # 复权类型

输出数据格式

市场数据批次 (MarketDataBatch):

@dataclass
class MarketDataBatch:
    symbol: str
    timestamp: datetime
    data: pd.DataFrame          # 标准化的数据框
    metadata: Dict[str, Any]    # 元数据信息

    # DataFrame 列结构
    # - open, high, low, close
    # - volume, turnover
    # - timestamp, symbol

接口

Python API

DataSourceAdapter 接口

class DataSourceAdapter(ABC):
    """数据源适配器基类"""

    @abstractmethod
    async def get_historical_data(
        self,
        symbols: List[str],
        start: datetime,
        end: datetime,
        interval: str = "1d"
    ) -> Dict[str, pd.DataFrame]:
        """获取历史数据"""
        pass

    @abstractmethod
    async def subscribe_realtime(
        self,
        symbols: List[str],
        callback: Callable
    ) -> None:
        """订阅实时数据"""
        pass

HistoricalDataManager 使用示例

from backend.core.trading.engines.historical_data_manager import HistoricalDataManager
from backend.core.data_source.adapters.data_source_adapter import QuoteDataSourceAdapter

# 创建适配器和管理器
quote_adapter = QuoteDataSourceAdapter(config)
manager = HistoricalDataManager(
    adapter=quote_adapter,
    cache_size=1000,
    batch_size=100
)

# 加载历史数据
await manager.load_historical_data(
    symbols=["AAPL", "GOOGL"],
    days_back=30,
    interval="1d"
)

# 获取数据
data = manager.get_data("AAPL")

扩展点

自定义数据源适配器

from backend.core.data_source.adapters import DataSourceAdapter

class CustomDataAdapter(DataSourceAdapter):
    """自定义数据源适配器"""

    def __init__(self, config: Dict[str, Any]):
        self.config = config
        # 初始化客户端
        pass

    async def get_historical_data(
        self,
        symbols: List[str],
        start: datetime,
        end: datetime,
        interval: str = "1d"
    ) -> Dict[str, pd.DataFrame]:
        """实现自定义数据获取逻辑"""
        # 自定义实现
        pass

使用自定义适配器

from backend.core.data_source.factories.client_factory import ClientFactory

# 创建自定义适配器实例
custom_adapter = CustomDataAdapter(config={
    "api_url": "https://api.custom.com",
    "timeout": 30
})

# 在策略或交易引擎中使用
historical_data = await custom_adapter.get_historical_data(
    symbols=["AAPL"],
    start=datetime(2024, 1, 1),
    end=datetime(2024, 12, 31),
    interval="1d"
)

配置

实际配置方式

Data Pipeline 模块的配置通过以下方式管理:

  1. 数据源配置: 通过 ConfigFactoryunified_config_factory 实例管理
  2. 环境变量: 通过 infrastructure/config/settings.py
  3. 运行时参数: 通过 API 调用传递

支持的环境变量

基于实际的 settings.py:

环境变量 说明 默认值
REDIS_HOST Redis 主机 localhost
REDIS_PORT Redis 端口 6379
LONGPORT_APP_KEY 长桥 API Key None
LONGPORT_APP_SECRET 长桥 API Secret None
LONGPORT_ACCESS_TOKEN 长桥访问令牌 None

配置示例

# 从配置工厂获取数据源配置
from backend.core.data_source.factories.config_factory import unified_config_factory

# 获取用户数据源配置
config = unified_config_factory.get_data_source_config(
    user_id="user_123"
)

# 创建数据源客户端
from backend.core.data_source.factories.client_factory import unified_client_factory

client = unified_client_factory.get_or_create_client(
    user_id="user_123",
    config=config
)

相关文档