🇨🇳 简体中文
🇺🇸 English
🇯🇵 日本語
Skip to the content.

市场数据缺失处理规范

重要文档 - 策略开发必读

本文档定义了 Fire 量化交易系统中市场数据缺失的处理规则。所有策略开发者和数据处理模块都必须遵循这些规范,以确保在真实市场环境中正确处理数据缺失情况。


目录


背景说明

为什么会出现数据缺失?

股票市场不是时时刻刻都开启的,以下情况都会导致数据缺失:

  1. 定期休市
    • 周末(周六、周日)
    • 每日收盘后(盘后时段)
  2. 节假日休市
    • 国家法定节假日
    • 交易所特殊假期
    • 国际市场时差导致的非同步休市
  3. 临时停市
    • 券商系统维护
    • 交易所技术故障
    • 个股停牌(公司公告、重组等)
    • 市场熔断机制触发
  4. 数据采集问题
    • 上游数据源临时故障
    • 网络连接中断
    • 数据同步延迟

对策略的影响

数据缺失是真实市场环境的常态,而不是异常情况。策略必须能够:


核心原则

原则 1:数据完整性优先

当策略需要 N 个数据点时,必须凑齐 N 个有效数据点,而不是简单地向前查找 N 个时间单位。

错误做法

# 需要10小时的数据,直接向前推10小时
start_time = current_time - timedelta(hours=10)
data = get_data(symbol, start_time, current_time)
# 问题:如果中间有周末,可能只有2小时的数据!

正确做法

# 需要10小时的数据,持续向前找直到凑齐10个小时K线
collected_hours = []
lookback_time = current_time
while len(collected_hours) < 10:
    data_point = get_nearest_data(symbol, lookback_time)
    if data_point:
        collected_hours.append(data_point)
    lookback_time = data_point.timestamp - timedelta(hours=1)

原则 2:向前查找,直到凑齐

遇到数据缺失时,继续往前推,直到找到足够的有效数据点。

原则 3:区分粒度处理

不同时间粒度的数据有不同的处理规则:

时间粒度 数据要求 缺失判定
分钟级 凑齐N个分钟K线 继续向前找,直到凑齐
小时级 凑齐N个小时K线 继续向前找,直到凑齐
天级 凑齐N个交易日 如全天无数据,推进到前一交易日

数据缺失场景

场景 1:周末数据缺失

时间线

周五 15:00 - 市场收盘,最后一个数据点
         ↓
    [周末空白 - 无交易数据]
         ↓
周一 09:30 - 市场开盘,下一个数据点

策略需求:周一 10:00 需要获取过去 10 小时的数据

错误行为

正确行为

场景 2:节假日数据缺失

时间线

5月1日 - 劳动节假期(无数据)
5月2日 - 劳动节假期(无数据)
5月3日 - 劳动节假期(无数据)
         ↓
5月4日 09:30 - 市场恢复交易

策略需求:5月4日需要获取过去 3 天的天级 K 线

正确行为

场景 3:个股停牌

时间线

3月10日 - 正常交易,最后数据
3月11日 - 停牌公告(无数据)
3月12日 - 停牌中(无数据)
3月13日 - 停牌中(无数据)
3月14日 - 复牌交易

策略行为


处理规则

分钟级/小时级数据

规则定义

当策略需要获取往前 N 小时的 K 线数据时:

  1. 从当前时间点向前查找
  2. 遇到缺失数据,继续向前推进
  3. 直到凑齐 N 个有效的小时级 K 线

查找窗口

代码实现

参考 backend/core/repositories/stock_repository.pyget_stock_data() 方法:

async def get_stock_data(self, code: str, timestamp: int) -> Optional[StockData]:
    """获取单条股票数据(支持最近时间戳查找)"""

    # 1. 首先尝试精确匹配(性能优化)
    data = self.redis.hgetall(stock_key)

    if not data:
        # 2. 短窗口查找(±5分钟)- 处理时间戳微小偏差
        nearest_timestamps = self.redis.zrangebyscore(
            time_index_key, timestamp, timestamp + 300, start=0, num=1
        )

        if not nearest_timestamps:
            # 3. 长窗口查找(±1天)- 处理周末、假期缺失
            nearest_timestamps = self.redis.zrangebyscore(
                time_index_key, timestamp, timestamp + 86400, start=0, num=1
            )

    return data

示例:获取过去 10 小时数据

def get_last_n_hours_data(symbol: str, current_time: datetime, n_hours: int) -> List[StockData]:
    """
    获取过去 N 小时的数据(处理缺失)

    Args:
        symbol: 股票代码
        current_time: 当前时间
        n_hours: 需要的小时数

    Returns:
        包含 N 个小时级 K 线的列表(按时间倒序)
    """
    collected_data = []
    lookback_time = current_time
    max_iterations = n_hours * 100  # 防止无限循环(最多回溯100倍时间)
    iteration = 0

    while len(collected_data) < n_hours and iteration < max_iterations:
        # 查找最近的数据点
        data_point = await get_nearest_data(symbol, lookback_time)

        if data_point:
            # 避免重复添加同一数据点
            if not collected_data or data_point.timestamp != collected_data[-1].timestamp:
                collected_data.append(data_point)

            # 继续向前查找(推进1小时 + 1分钟缓冲)
            lookback_time = datetime.fromtimestamp(data_point.timestamp) - timedelta(hours=1, minutes=1)
        else:
            # 如果实在找不到数据,推进更大的时间窗口
            lookback_time -= timedelta(days=1)

        iteration += 1

    if len(collected_data) < n_hours:
        logger.warning(f"只找到 {len(collected_data)}/{n_hours} 小时的数据")

    return collected_data

天级K线数据

规则定义

天级 K 线的处理逻辑与分钟级/小时级有所不同:

  1. 如果某日开盘了 → 算作有数据
    • 即使只有部分时段数据(如开盘后停牌)
    • 即使成交量很小
  2. 如果全天没有数据 → 视为无数据
    • 继续向前推一个交易日
    • 不尝试插值或估算
  3. 凑齐 N 个交易日的数据
    • 跳过周末
    • 跳过节假日
    • 跳过停牌日

判定逻辑

def is_valid_daily_data(data: StockData) -> bool:
    """
    判断天级数据是否有效

    有效条件:
    - 有开盘价
    - 有收盘价
    - 成交量 > 0(可选,取决于市场规则)
    """
    return (
        data.open > 0 and
        data.close > 0 and
        data.volume > 0
    )

示例:获取过去 N 个交易日数据

def get_last_n_trading_days(symbol: str, current_date: date, n_days: int) -> List[StockData]:
    """
    获取过去 N 个交易日的数据

    Args:
        symbol: 股票代码
        current_date: 当前日期
        n_days: 需要的交易日数量

    Returns:
        包含 N 个交易日数据的列表(按日期倒序)
    """
    collected_days = []
    lookback_date = current_date
    max_iterations = n_days * 10  # 防止无限循环(考虑长假期)
    iteration = 0

    while len(collected_days) < n_days and iteration < max_iterations:
        # 查找该日期的数据
        daily_data = await get_daily_data(symbol, lookback_date)

        if daily_data and is_valid_daily_data(daily_data):
            collected_days.append(daily_data)

        # 向前推进一天
        lookback_date -= timedelta(days=1)
        iteration += 1

    if len(collected_days) < n_days:
        logger.warning(f"只找到 {len(collected_days)}/{n_days} 个交易日的数据")

    return collected_days

实现示例

示例 1:策略初始化时获取历史数据

class MomentumStrategy(BaseStrategy):
    """动量策略 - 需要过去20个小时的数据"""

    def __init__(self):
        self.lookback_hours = 20

    async def analyze(self, symbol: str, market_data: pd.DataFrame) -> Signal:
        """分析市场数据生成信号"""

        # 确保有足够的历史数据
        if len(market_data) < self.lookback_hours:
            logger.warning(
                f"数据不足:需要 {self.lookback_hours} 小时,"
                f"实际只有 {len(market_data)} 小时"
            )
            return Signal.HOLD

        # 使用最近的 N 小时数据
        recent_data = market_data.tail(self.lookback_hours)

        # ... 策略逻辑

示例 2:回测引擎中的数据获取

class BacktestEngine:
    """回测引擎"""

    async def get_historical_data(
        self,
        symbol: str,
        current_time: datetime,
        lookback_hours: int
    ) -> pd.DataFrame:
        """
        获取历史数据(处理缺失)

        重要:必须凑齐 lookback_hours 个小时的数据
        """
        collected_data = []
        search_time = current_time
        iterations = 0
        max_iterations = lookback_hours * 200  # 防止无限循环

        while len(collected_data) < lookback_hours and iterations < max_iterations:
            # 使用 StockRepository 的智能查找
            data_point = await self.stock_repo.get_stock_data(
                symbol,
                int(search_time.timestamp())
            )

            if data_point:
                # 避免重复
                if not collected_data or data_point.timestamp != collected_data[-1]['timestamp']:
                    collected_data.append({
                        'timestamp': data_point.timestamp,
                        'open': data_point.open,
                        'high': data_point.high,
                        'low': data_point.low,
                        'close': data_point.close,
                        'volume': data_point.volume,
                    })

                # 向前推进(data_point 的时间戳往前1小时)
                search_time = datetime.fromtimestamp(data_point.timestamp) - timedelta(hours=1)
            else:
                # 没找到数据,跳过更大的时间窗口
                search_time -= timedelta(days=1)

            iterations += 1

        # 检查数据完整性
        if len(collected_data) < lookback_hours:
            logger.warning(
                f"数据不足:目标 {lookback_hours} 小时,"
                f"实际 {len(collected_data)} 小时({symbol} @ {current_time})"
            )

        # 转换为 DataFrame(按时间正序)
        df = pd.DataFrame(collected_data[::-1])  # 反转为正序
        return df

测试要求

单元测试

所有数据获取相关的代码必须通过以下测试场景:

测试 1:正常连续数据

def test_continuous_data():
    """测试连续数据获取"""
    # 假设有连续的分钟数据
    result = get_last_n_hours_data("AAPL.US", datetime(2025, 1, 15, 10, 0), 10)
    assert len(result) == 10
    assert all(data.close > 0 for data in result)

测试 2:周末数据缺失

def test_weekend_gap():
    """测试周末数据缺失场景"""
    # 周一 10:00 请求 10 小时数据
    monday_10am = datetime(2025, 1, 13, 10, 0)  # 周一
    result = get_last_n_hours_data("AAPL.US", monday_10am, 10)

    # 应该跨越周末,获取周五的数据
    assert len(result) == 10
    # 验证数据包含周五的时间戳
    friday_data = [d for d in result if d.timestamp < monday_10am - timedelta(days=2)]
    assert len(friday_data) > 0

测试 3:节假日数据缺失

def test_holiday_gap():
    """测试节假日数据缺失场景"""
    # 5月4日请求过去3天的日线数据(5月1-3日为假期)
    result = get_last_n_trading_days("AAPL.US", date(2025, 5, 4), 3)

    # 应该跳过假期,获取 4/30, 4/29, 4/28 的数据
    assert len(result) == 3
    assert all(d.timestamp < datetime(2025, 5, 1).timestamp() for d in result)

测试 4:数据完全不存在

def test_no_data_available():
    """测试数据完全不存在的场景"""
    # 请求一个不存在的股票代码
    result = get_last_n_hours_data("INVALID.US", datetime.now(), 10)

    # 应该返回空列表或部分数据,并记录警告
    assert isinstance(result, list)
    assert len(result) < 10

集成测试

在回测引擎中进行端到端测试:

def test_backtest_with_data_gaps():
    """测试包含数据缺口的回测"""
    # 创建跨周末的回测时间范围
    backtest_config = {
        "start_date": "2025-01-10",  # 周五
        "end_date": "2025-01-15",    # 周三
        "symbol": "AAPL.US",
        "strategy": "MomentumStrategy"
    }

    # 运行回测
    result = run_backtest(backtest_config)

    # 验证:
    # 1. 回测成功完成
    assert result.status == "completed"

    # 2. 策略在周一能正确获取数据(跨越周末)
    monday_signals = [s for s in result.signals if s.date.weekday() == 0]
    assert len(monday_signals) > 0

    # 3. 没有因数据缺失而报错
    assert result.errors == []

常见问题

Q1: 为什么不能简单地向前推 N 个时间单位?

A: 因为市场不是连续的。简单向前推 10 小时可能跨越周末,导致只有 2 小时的有效数据。策略需要的是 10 个有效的小时级 K 线,而不是 10 小时的时间跨度

Q2: 如果凑不齐数据怎么办?

A: 根据策略需求:

Q3: 跨越多长时间去查找数据?

A: 理论上没有上限,但实践中:

Q4: 停牌的股票如何处理?

A: 停牌期间:

Q5: 不同市场的休市时间不同怎么办?

A:

Q6: 插值(Interpolation)可以使用吗?

A: 不推荐用于价格数据:

Q7: 回测和实盘的数据处理有区别吗?

A: 原则上应该一致:


文档版本


相关文档


重要提醒

此文档定义的规则是 Fire 量化交易系统的核心约定。违反这些规则可能导致:

所有策略开发者和数据处理模块开发者都必须熟悉并遵循这些规范。