🇨🇳 简体中文
🇺🇸 English
🇯🇵 日本語
Skip to the content.

数据源模块架构文档

📋 概述

数据源模块是量化交易系统的基础设施组件,负责统一管理各种外部数据源的访问,提供标准化的数据接口。本文档详细描述了数据源模块的架构设计、接口定义、与其他模块的交互方式。

🏗️ 架构设计

1. 模块依赖关系图

graph TB
    subgraph "业务层"
        TE[TradingEngine]
        SE[StrategyEngine]
        RE[RiskEngine]
    end
    
    subgraph "数据源模块"
        AA[AssetAdapter<br/>资产适配器]
        QA[QuoteAdapter<br/>行情适配器<br/>日志回调支持]
        TA[TradeAdapter<br/>交易适配器]
        DA[DataAdapter<br/>数据适配器<br/>日志回调支持]
    end
    
    subgraph "数据源适配器层"
        ASAA[AssetDataSourceAdapter]
        QSAA[QuoteDataSourceAdapter]
        TSAA[TradeDataSourceAdapter]
        DSAA[DataSourceAdapter]
    end
    
    subgraph "客户端工厂"
        UCF[UnifiedClientFactory<br/>统一客户端工厂]
    end
    
    subgraph "外部数据源"
        LP[LongPort API<br/>券商接口]
        DB[(数据库)]
        REDIS[(Redis缓存)]
    end
    
    %% 依赖关系
    TE --> AA
    SE --> QA
    RE --> AA
    
    AA --> ASAA
    QA --> QSAA
    TA --> TSAA
    DA --> DSAA
    
    ASAA --> UCF
    QSAA --> UCF
    TSAA --> UCF
    
    UCF --> LP
    ASAA --> DB
    ASAA --> REDIS
    QSAA --> DB
    QSAA --> REDIS

2. 模块职责划分

模块 职责 依赖关系
AssetAdapter 资产数据业务适配器 依赖 AssetDataSourceAdapter
QuoteAdapter 行情数据业务适配器 依赖 QuoteDataSourceAdapter
TradeAdapter 交易数据业务适配器 依赖 TradeDataSourceAdapter
DataAdapter 数据导入业务适配器 依赖 DataSourceAdapter
UnifiedClientFactory 统一客户端工厂 依赖外部API

🔌 核心接口定义

1. 数据源适配器基类 (DataSourceAdapter)

class DataSourceAdapter(ABC):
    """数据源适配器抽象基类"""
    
    def __init__(self, user_id: str):
        self.user_id = user_id
        self._client: Optional[Dict[str, Any]] = None
    
    def _get_client(self, force_refresh: bool = False) -> Optional[Dict[str, Any]]:
        """获取数据源客户端"""
        if not self._client or force_refresh:
            self._client = unified_client_factory.get_client(self.user_id, force_refresh)
        return self._client
    
    @abstractmethod
    def is_available(self) -> bool:
        """检查适配器是否可用"""
        pass

2. 资产数据适配器 (AssetDataSourceAdapter)

class AssetDataSourceAdapter(DataSourceAdapter):
    """资产数据源适配器"""
    
    def is_available(self) -> bool:
        """检查资产适配器是否可用"""
        return self._get_client() is not None
    
    # === 账户信息 ===
    def get_account_balance(self) -> Optional[Dict[str, Any]]:
        """获取账户余额"""
        pass
    
    def get_stock_positions(self) -> List[Any]:
        """获取股票持仓"""
        pass
    
    def get_cash_flow(self, start_date: date, end_date: date) -> List[Any]:
        """获取资金流水"""
        pass
    
    # === 持仓管理 ===
    def get_position_details(self, symbol: str) -> Optional[Dict[str, Any]]:
        """获取持仓详情"""
        pass
    
    def update_position_cache(self, symbol: str, position_data: Dict[str, Any]) -> None:
        """更新持仓缓存"""
        pass

3. 行情数据适配器 (QuoteDataSourceAdapter)

class QuoteDataSourceAdapter(DataSourceAdapter):
    """行情数据源适配器"""
    
    def is_available(self) -> bool:
        """检查行情适配器是否可用"""
        return self._get_client() is not None
    
    # === 实时行情 ===
    def get_quote(self, symbols: List[str]) -> List[Any]:
        """获取实时行情"""
        pass
    
    def get_depth(self, symbol: str) -> Optional[Dict[str, Any]]:
        """获取盘口信息"""
        pass
    
    def get_trades(self, symbol: str, count: int) -> List[Any]:
        """获取成交明细"""
        pass
    
    # === 历史数据 ===
    def get_historical_data(self, symbol: str, start_time: datetime, end_time: datetime, log_callback=None) -> List[MarketData]:
        """获取历史数据 - 从数据库获取分钟级数据,支持日志回调"""
        pass
    
    def get_candlesticks(self, symbol: str, period: str, count: int, 
                        adjust_type: str = "NoAdjust", trade_sessions: str = "Intraday") -> List[Any]:
        """获取K线数据"""
        pass
    
    # === 订阅管理 ===
    def subscribe(self, symbols: List[str], sub_types: List[str], is_first_push: bool = False) -> bool:
        """订阅行情数据"""
        pass
    
    def unsubscribe(self, symbols: List[str], sub_types: List[str]) -> bool:
        """取消订阅"""
        pass

4. 交易数据适配器 (TradeDataSourceAdapter)

class TradeDataSourceAdapter(DataSourceAdapter):
    """交易数据源适配器"""
    
    def is_available(self) -> bool:
        """检查交易适配器是否可用"""
        return self._get_client() is not None
    
    # === 订单管理 ===
    def submit_order(self, order: Order) -> OrderResult:
        """提交交易订单"""
        pass
    
    def cancel_order(self, order_id: str) -> bool:
        """取消订单"""
        pass
    
    def get_order_status(self, order_id: str) -> Optional[Dict[str, Any]]:
        """获取订单状态"""
        pass
    
    # === 交易历史 ===
    def get_trade_history(self, start_date: date, end_date: date) -> List[Any]:
        """获取交易历史"""
        pass
    
    def get_order_history(self, start_date: date, end_date: date) -> List[Any]:
        """获取订单历史"""
        pass

5. 统一客户端工厂 (UnifiedClientFactory)

class UnifiedClientFactory:
    """统一客户端工厂"""
    
    def __init__(self):
        self._client_cache: Dict[str, Dict[str, Any]] = {}
    
    def get_client(self, user_id: str, force_refresh: bool = False) -> Optional[Dict[str, Any]]:
        """获取用户的数据源客户端"""
        cache_key = f"client_{user_id}"
        
        if not force_refresh and cache_key in self._client_cache:
            return self._client_cache[cache_key]
        
        try:
            # 获取用户数据源配置
            config = self._get_user_data_source_config(user_id)
            
            # 创建客户端
            client = self._create_client(config)
            
            # 缓存客户端
            self._client_cache[cache_key] = client
            
            return client
            
        except Exception as e:
            print(f"❌ 创建客户端失败: {e}")
            return None
    
    def _get_user_data_source_config(self, user_id: str) -> Dict[str, Any]:
        """获取用户数据源配置"""
        pass
    
    def _create_client(self, config: Dict[str, Any]) -> Dict[str, Any]:
        """创建数据源客户端"""
        pass

🔄 模块交互关系

1. 数据源初始化流程

sequenceDiagram
    participant UCF as UnifiedClientFactory
    participant DSAA as DataSourceAdapter
    participant LP as LongPort API
    participant DB as 数据库
    
    UCF->>UCF: get_client(user_id)
    UCF->>DB: 获取用户配置
    DB->>UCF: 返回配置信息
    
    UCF->>LP: 创建LongPort客户端
    LP->>UCF: 返回客户端实例
    
    UCF->>UCF: 缓存客户端
    UCF->>DSAA: 返回客户端
    
    DSAA->>DSAA: 初始化适配器

2. 数据访问流程

sequenceDiagram
    participant BS as BaseStrategy
    participant QA as QuoteAdapter
    participant QSAA as QuoteDataSourceAdapter
    participant UCF as UnifiedClientFactory
    participant DB as 数据库
    
    BS->>QA: get_historical_data(symbol, start_time, end_time)
    QA->>QSAA: get_historical_data()
    QSAA->>UCF: get_client()
    UCF->>QSAA: 返回客户端
    
    QSAA->>DB: 查询历史数据
    DB->>QSAA: 返回数据
    QSAA->>QA: 返回MarketData列表
    QA->>BS: 返回历史数据

3. 实时数据订阅流程

sequenceDiagram
    participant SE as StrategyEngine
    participant QA as QuoteAdapter
    participant QSAA as QuoteDataSourceAdapter
    participant LP as LongPort API
    participant WS as WebSocket
    
    SE->>QA: subscribe(symbols, sub_types)
    QA->>QSAA: subscribe()
    QSAA->>LP: 订阅实时数据
    LP->>QSAA: 确认订阅
    
    LP->>QSAA: 推送实时数据
    QSAA->>QA: 处理数据
    QA->>SE: 推送市场数据
    SE->>WS: 推送到前端

📊 数据流设计

1. 数据访问模式

graph TD
    subgraph "业务层"
        A[业务逻辑]
    end
    
    subgraph "适配器层"
        B[业务适配器]
        C[数据源适配器]
    end
    
    subgraph "工厂层"
        D[统一客户端工厂]
    end
    
    subgraph "数据源"
        E[LongPort API]
        F[数据库]
        G[Redis缓存]
    end
    
    %% 数据流
    A --> B
    B --> C
    C --> D
    D --> E
    C --> F
    C --> G
    
    %% 数据返回
    E -.-> D
    F -.-> C
    G -.-> C
    D -.-> C
    C -.-> B
    B -.-> A

2. 数据缓存策略

graph LR
    subgraph "数据访问"
        A[业务请求]
    end
    
    subgraph "缓存层"
        B[Redis缓存]
        C[内存缓存]
    end
    
    subgraph "数据源"
        D[数据库]
        E[外部API]
    end
    
    A --> B
    B --> C
    C --> D
    C --> E
    
    D --> C
    E --> C
    C --> B
    B --> A

🛠️ 开发指南

1. 数据适配器开发流程

graph TD
    A[1. 继承DataSourceAdapter] --> B[2. 实现抽象方法]
    B --> C[3. 实现数据访问逻辑]
    C --> D[4. 集成客户端工厂]
    D --> E[5. 注册到系统]
    
    A1[定义数据源类型] --> A
    B1[is_available] --> B
    B2[数据访问方法] --> B
    
    C1[API调用逻辑] --> C
    C2[数据转换逻辑] --> C
    C3[错误处理逻辑] --> C
    
    D1[客户端获取] --> D
    D2[配置管理] --> D
    E1[系统注册] --> E

2. 数据适配器开发模板

class MyCustomDataSourceAdapter(DataSourceAdapter):
    """自定义数据源适配器模板"""
    
    def is_available(self) -> bool:
        """检查适配器是否可用"""
        client = self._get_client()
        return client is not None
    
    def get_custom_data(self, symbol: str) -> Optional[Dict[str, Any]]:
        """获取自定义数据"""
        client = self._get_client()
        if not client:
            return None
        
        try:
            # 实现数据获取逻辑
            return self._fetch_data_from_source(client, symbol)
        except Exception as e:
            print(f"❌ 获取自定义数据失败: {e}")
            return None
    
    def _fetch_data_from_source(self, client: Dict[str, Any], symbol: str) -> Dict[str, Any]:
        """从数据源获取数据"""
        # 实现具体的数据获取逻辑
        pass

3. 客户端工厂扩展

class ExtendedUnifiedClientFactory(UnifiedClientFactory):
    """扩展的统一客户端工厂"""
    
    def _create_client(self, config: Dict[str, Any]) -> Dict[str, Any]:
        """创建扩展客户端"""
        data_source_type = config.get("type")
        
        if data_source_type == "LONGPORT":
            return self._create_longport_client(config)
        elif data_source_type == "CUSTOM":
            return self._create_custom_client(config)
        else:
            raise ValueError(f"不支持的数据源类型: {data_source_type}")
    
    def _create_custom_client(self, config: Dict[str, Any]) -> Dict[str, Any]:
        """创建自定义客户端"""
        # 实现自定义客户端创建逻辑
        pass

🔧 最佳实践

1. 数据访问规范

访问类型 正确用法 错误用法
实时数据 通过适配器订阅 直接调用外部API
历史数据 从数据库获取 调用实时API获取历史数据
缓存数据 使用Redis缓存 每次都查询数据库
错误处理 完善的异常处理 忽略异常

2. 性能优化建议

优化类型 实现方式 适用场景
客户端缓存 缓存客户端实例 频繁创建客户端
数据缓存 Redis缓存热点数据 频繁查询相同数据
连接池 使用连接池管理连接 高并发数据访问
异步处理 异步数据获取 非关键路径数据访问

3. 错误处理模式

def get_data(self, symbol: str) -> Optional[Dict[str, Any]]:
    """标准数据获取模式"""
    try:
        # 1. 检查客户端可用性
        client = self._get_client()
        if not client:
            self.log_message(f"❌ 客户端不可用: {symbol}", "error")
            return None
        
        # 2. 尝试从缓存获取
        cached_data = self._get_from_cache(symbol)
        if cached_data:
            return cached_data
        
        # 3. 从数据源获取
        data = self._fetch_from_source(client, symbol)
        if data:
            # 4. 缓存数据
            self._cache_data(symbol, data)
            return data
        else:
            self.log_message(f"❌ 未获取到数据: {symbol}", "warn")
            return None
            
    except Exception as e:
        self.log_message(f"❌ 获取数据异常: {symbol} - {e}", "error")
        return None

📈 现有实现分析

1. 数据适配器对比

适配器类型 数据源 实现复杂度 性能特点 可靠性
AssetDataSourceAdapter LongPort API + 数据库 中等 依赖网络
QuoteDataSourceAdapter LongPort API + 数据库 实时性好
TradeDataSourceAdapter LongPort API 中等 依赖网络
DataAdapter LongPort API + 数据库 批量处理

2. 数据访问特性

🚀 扩展建议

1. 数据源扩展

graph TB
    subgraph "现有数据源"
        A[LongPort API]
        B[数据库]
        C[Redis缓存]
    end
    
    subgraph "扩展数据源"
        D[其他券商API]
        E[第三方数据源]
        F[实时数据流]
        G[机器学习数据]
    end
    
    A --> D
    B --> E
    C --> F
    E --> G

2. 高级功能扩展

📚 总结

核心架构特性

  1. 统一抽象: 通过适配器模式统一不同数据源
  2. 工厂模式: 统一客户端工厂管理客户端创建
  3. 缓存优化: 多级缓存提高数据访问性能
  4. 错误处理: 完善的异常处理和降级机制
  5. 模式隔离: 不同交易模式使用不同的数据访问策略

开发指导原则

通过遵循本文档的架构设计和开发规范,可以构建出高效、可靠、可扩展的数据源管理系统,为量化交易提供稳定的数据支撑。