市场数据缺失处理规范
重要文档 - 策略开发必读
本文档定义了 Fire 量化交易系统中市场数据缺失的处理规则。所有策略开发者和数据处理模块都必须遵循这些规范,以确保在真实市场环境中正确处理数据缺失情况。
目录
背景说明
为什么会出现数据缺失?
股票市场不是时时刻刻都开启的,以下情况都会导致数据缺失:
- 定期休市
- 周末(周六、周日)
- 每日收盘后(盘后时段)
- 节假日休市
- 国家法定节假日
- 交易所特殊假期
- 国际市场时差导致的非同步休市
- 临时停市
- 券商系统维护
- 交易所技术故障
- 个股停牌(公司公告、重组等)
- 市场熔断机制触发
- 数据采集问题
- 上游数据源临时故障
- 网络连接中断
- 数据同步延迟
对策略的影响
数据缺失是真实市场环境的常态,而不是异常情况。策略必须能够:
- 正确处理不连续的时间序列数据
- 在数据缺失时找到最近可用的历史数据
- 区分”真正无数据”和”临时缺失”
- 在回测和实盘中保持一致的行为
核心原则
原则 1:数据完整性优先
当策略需要 N 个数据点时,必须凑齐 N 个有效数据点,而不是简单地向前查找 N 个时间单位。
错误做法:
# 需要10小时的数据,直接向前推10小时
start_time = current_time - timedelta(hours=10)
data = get_data(symbol, start_time, current_time)
# 问题:如果中间有周末,可能只有2小时的数据!
正确做法:
# 需要10小时的数据,持续向前找直到凑齐10个小时K线
collected_hours = []
lookback_time = current_time
while len(collected_hours) < 10:
data_point = get_nearest_data(symbol, lookback_time)
if data_point:
collected_hours.append(data_point)
lookback_time = data_point.timestamp - timedelta(hours=1)
原则 2:向前查找,直到凑齐
遇到数据缺失时,继续往前推,直到找到足够的有效数据点。
- 不要因为单个数据点缺失就放弃
- 不要假设数据是连续的
- 不要对缺失数据进行插值(除非策略明确要求)
原则 3:区分粒度处理
不同时间粒度的数据有不同的处理规则:
| 时间粒度 | 数据要求 | 缺失判定 |
|---|---|---|
| 分钟级 | 凑齐N个分钟K线 | 继续向前找,直到凑齐 |
| 小时级 | 凑齐N个小时K线 | 继续向前找,直到凑齐 |
| 天级 | 凑齐N个交易日 | 如全天无数据,推进到前一交易日 |
数据缺失场景
场景 1:周末数据缺失
时间线:
周五 15:00 - 市场收盘,最后一个数据点
↓
[周末空白 - 无交易数据]
↓
周一 09:30 - 市场开盘,下一个数据点
策略需求:周一 10:00 需要获取过去 10 小时的数据
错误行为:
- 向前推 10 小时 → 周一 00:00(周末,无数据)
- 只获取到 0.5 小时数据(周一 09:30-10:00)
正确行为:
- 继续向前找 → 跨越周末 → 获取周五收盘前的数据
- 最终凑齐 10 个小时级 K 线
场景 2:节假日数据缺失
时间线:
5月1日 - 劳动节假期(无数据)
5月2日 - 劳动节假期(无数据)
5月3日 - 劳动节假期(无数据)
↓
5月4日 09:30 - 市场恢复交易
策略需求:5月4日需要获取过去 3 天的天级 K 线
正确行为:
- 4月30日(有数据)
- 4月29日(有数据)
- 4月28日(有数据)
- 跳过 5月1日-3日(无数据)
场景 3:个股停牌
时间线:
3月10日 - 正常交易,最后数据
3月11日 - 停牌公告(无数据)
3月12日 - 停牌中(无数据)
3月13日 - 停牌中(无数据)
3月14日 - 复牌交易
策略行为:
- 3月14日获取该股票数据时,应跳过停牌日期
- 使用 3月10日的数据作为最近可用数据
处理规则
分钟级/小时级数据
规则定义
当策略需要获取往前 N 小时的 K 线数据时:
- 从当前时间点向前查找
- 遇到缺失数据,继续向前推进
- 直到凑齐 N 个有效的小时级 K 线
查找窗口
- 初始窗口:±5 分钟(处理时间戳微小偏差)
- 扩展窗口:±1 天(处理周末、假期缺失)
- 最大回溯:理论上无限制,直到凑齐或到达数据起始点
代码实现
参考 backend/core/repositories/stock_repository.py 的 get_stock_data() 方法:
async def get_stock_data(self, code: str, timestamp: int) -> Optional[StockData]:
"""获取单条股票数据(支持最近时间戳查找)"""
# 1. 首先尝试精确匹配(性能优化)
data = self.redis.hgetall(stock_key)
if not data:
# 2. 短窗口查找(±5分钟)- 处理时间戳微小偏差
nearest_timestamps = self.redis.zrangebyscore(
time_index_key, timestamp, timestamp + 300, start=0, num=1
)
if not nearest_timestamps:
# 3. 长窗口查找(±1天)- 处理周末、假期缺失
nearest_timestamps = self.redis.zrangebyscore(
time_index_key, timestamp, timestamp + 86400, start=0, num=1
)
return data
示例:获取过去 10 小时数据
def get_last_n_hours_data(symbol: str, current_time: datetime, n_hours: int) -> List[StockData]:
"""
获取过去 N 小时的数据(处理缺失)
Args:
symbol: 股票代码
current_time: 当前时间
n_hours: 需要的小时数
Returns:
包含 N 个小时级 K 线的列表(按时间倒序)
"""
collected_data = []
lookback_time = current_time
max_iterations = n_hours * 100 # 防止无限循环(最多回溯100倍时间)
iteration = 0
while len(collected_data) < n_hours and iteration < max_iterations:
# 查找最近的数据点
data_point = await get_nearest_data(symbol, lookback_time)
if data_point:
# 避免重复添加同一数据点
if not collected_data or data_point.timestamp != collected_data[-1].timestamp:
collected_data.append(data_point)
# 继续向前查找(推进1小时 + 1分钟缓冲)
lookback_time = datetime.fromtimestamp(data_point.timestamp) - timedelta(hours=1, minutes=1)
else:
# 如果实在找不到数据,推进更大的时间窗口
lookback_time -= timedelta(days=1)
iteration += 1
if len(collected_data) < n_hours:
logger.warning(f"只找到 {len(collected_data)}/{n_hours} 小时的数据")
return collected_data
天级K线数据
规则定义
天级 K 线的处理逻辑与分钟级/小时级有所不同:
- 如果某日开盘了 → 算作有数据
- 即使只有部分时段数据(如开盘后停牌)
- 即使成交量很小
- 如果全天没有数据 → 视为无数据
- 继续向前推一个交易日
- 不尝试插值或估算
- 凑齐 N 个交易日的数据
- 跳过周末
- 跳过节假日
- 跳过停牌日
判定逻辑
def is_valid_daily_data(data: StockData) -> bool:
"""
判断天级数据是否有效
有效条件:
- 有开盘价
- 有收盘价
- 成交量 > 0(可选,取决于市场规则)
"""
return (
data.open > 0 and
data.close > 0 and
data.volume > 0
)
示例:获取过去 N 个交易日数据
def get_last_n_trading_days(symbol: str, current_date: date, n_days: int) -> List[StockData]:
"""
获取过去 N 个交易日的数据
Args:
symbol: 股票代码
current_date: 当前日期
n_days: 需要的交易日数量
Returns:
包含 N 个交易日数据的列表(按日期倒序)
"""
collected_days = []
lookback_date = current_date
max_iterations = n_days * 10 # 防止无限循环(考虑长假期)
iteration = 0
while len(collected_days) < n_days and iteration < max_iterations:
# 查找该日期的数据
daily_data = await get_daily_data(symbol, lookback_date)
if daily_data and is_valid_daily_data(daily_data):
collected_days.append(daily_data)
# 向前推进一天
lookback_date -= timedelta(days=1)
iteration += 1
if len(collected_days) < n_days:
logger.warning(f"只找到 {len(collected_days)}/{n_days} 个交易日的数据")
return collected_days
实现示例
示例 1:策略初始化时获取历史数据
class MomentumStrategy(BaseStrategy):
"""动量策略 - 需要过去20个小时的数据"""
def __init__(self):
self.lookback_hours = 20
async def analyze(self, symbol: str, market_data: pd.DataFrame) -> Signal:
"""分析市场数据生成信号"""
# 确保有足够的历史数据
if len(market_data) < self.lookback_hours:
logger.warning(
f"数据不足:需要 {self.lookback_hours} 小时,"
f"实际只有 {len(market_data)} 小时"
)
return Signal.HOLD
# 使用最近的 N 小时数据
recent_data = market_data.tail(self.lookback_hours)
# ... 策略逻辑
示例 2:回测引擎中的数据获取
class BacktestEngine:
"""回测引擎"""
async def get_historical_data(
self,
symbol: str,
current_time: datetime,
lookback_hours: int
) -> pd.DataFrame:
"""
获取历史数据(处理缺失)
重要:必须凑齐 lookback_hours 个小时的数据
"""
collected_data = []
search_time = current_time
iterations = 0
max_iterations = lookback_hours * 200 # 防止无限循环
while len(collected_data) < lookback_hours and iterations < max_iterations:
# 使用 StockRepository 的智能查找
data_point = await self.stock_repo.get_stock_data(
symbol,
int(search_time.timestamp())
)
if data_point:
# 避免重复
if not collected_data or data_point.timestamp != collected_data[-1]['timestamp']:
collected_data.append({
'timestamp': data_point.timestamp,
'open': data_point.open,
'high': data_point.high,
'low': data_point.low,
'close': data_point.close,
'volume': data_point.volume,
})
# 向前推进(data_point 的时间戳往前1小时)
search_time = datetime.fromtimestamp(data_point.timestamp) - timedelta(hours=1)
else:
# 没找到数据,跳过更大的时间窗口
search_time -= timedelta(days=1)
iterations += 1
# 检查数据完整性
if len(collected_data) < lookback_hours:
logger.warning(
f"数据不足:目标 {lookback_hours} 小时,"
f"实际 {len(collected_data)} 小时({symbol} @ {current_time})"
)
# 转换为 DataFrame(按时间正序)
df = pd.DataFrame(collected_data[::-1]) # 反转为正序
return df
测试要求
单元测试
所有数据获取相关的代码必须通过以下测试场景:
测试 1:正常连续数据
def test_continuous_data():
"""测试连续数据获取"""
# 假设有连续的分钟数据
result = get_last_n_hours_data("AAPL.US", datetime(2025, 1, 15, 10, 0), 10)
assert len(result) == 10
assert all(data.close > 0 for data in result)
测试 2:周末数据缺失
def test_weekend_gap():
"""测试周末数据缺失场景"""
# 周一 10:00 请求 10 小时数据
monday_10am = datetime(2025, 1, 13, 10, 0) # 周一
result = get_last_n_hours_data("AAPL.US", monday_10am, 10)
# 应该跨越周末,获取周五的数据
assert len(result) == 10
# 验证数据包含周五的时间戳
friday_data = [d for d in result if d.timestamp < monday_10am - timedelta(days=2)]
assert len(friday_data) > 0
测试 3:节假日数据缺失
def test_holiday_gap():
"""测试节假日数据缺失场景"""
# 5月4日请求过去3天的日线数据(5月1-3日为假期)
result = get_last_n_trading_days("AAPL.US", date(2025, 5, 4), 3)
# 应该跳过假期,获取 4/30, 4/29, 4/28 的数据
assert len(result) == 3
assert all(d.timestamp < datetime(2025, 5, 1).timestamp() for d in result)
测试 4:数据完全不存在
def test_no_data_available():
"""测试数据完全不存在的场景"""
# 请求一个不存在的股票代码
result = get_last_n_hours_data("INVALID.US", datetime.now(), 10)
# 应该返回空列表或部分数据,并记录警告
assert isinstance(result, list)
assert len(result) < 10
集成测试
在回测引擎中进行端到端测试:
def test_backtest_with_data_gaps():
"""测试包含数据缺口的回测"""
# 创建跨周末的回测时间范围
backtest_config = {
"start_date": "2025-01-10", # 周五
"end_date": "2025-01-15", # 周三
"symbol": "AAPL.US",
"strategy": "MomentumStrategy"
}
# 运行回测
result = run_backtest(backtest_config)
# 验证:
# 1. 回测成功完成
assert result.status == "completed"
# 2. 策略在周一能正确获取数据(跨越周末)
monday_signals = [s for s in result.signals if s.date.weekday() == 0]
assert len(monday_signals) > 0
# 3. 没有因数据缺失而报错
assert result.errors == []
常见问题
Q1: 为什么不能简单地向前推 N 个时间单位?
A: 因为市场不是连续的。简单向前推 10 小时可能跨越周末,导致只有 2 小时的有效数据。策略需要的是 10 个有效的小时级 K 线,而不是 10 小时的时间跨度。
Q2: 如果凑不齐数据怎么办?
A: 根据策略需求:
- 严格策略:数据不足时返回 HOLD 信号,不进行交易
- 宽松策略:使用可用的部分数据,但记录警告
- 回测场景:跳过该时间点,继续推进
Q3: 跨越多长时间去查找数据?
A: 理论上没有上限,但实践中:
- 分钟级/小时级:最多回溯 100 倍时间(如需要 10 小时,最多回溯 1000 小时)
- 天级:最多回溯 10 倍时间(如需要 30 天,最多回溯 300 天)
- 设置合理的迭代上限,防止无限循环
Q4: 停牌的股票如何处理?
A: 停牌期间:
- 无新数据生成
- 策略使用停牌前的最后一个数据点
- 建议在策略中检测长期无数据更新,并避免交易
Q5: 不同市场的休市时间不同怎么办?
A:
- 数据层应该已经过滤掉休市时段
- 策略只需关注”是否有数据”,不需要关心”为什么没数据”
- 使用统一的数据获取接口,底层自动处理
Q6: 插值(Interpolation)可以使用吗?
A: 不推荐用于价格数据:
- 不要对价格进行线性插值(会产生虚假数据)
- 不要对成交量进行估算
- 可以对某些技术指标进行平滑处理(如果策略明确需要)
- 可以使用前值填充(forward-fill)作为临时占位
Q7: 回测和实盘的数据处理有区别吗?
A: 原则上应该一致:
- 回测和实盘使用相同的数据获取逻辑
- 回测不应该有”未来数据泄露”
- 实盘中的数据缺失应该与回测中的处理方式一致
- 实盘可能有额外的延迟和数据更新问题
文档版本
- 创建日期: 2025-01-16
- 最后更新: 2025-01-16
- 维护者: Fire 开发团队
- 相关代码:
backend/core/repositories/stock_repository.py- 数据获取实现backend/core/trading/engines/trading_session_engine.py- 回测引擎backend/core/trading/strategies/base_strategy.py- 策略基类
相关文档
重要提醒:
此文档定义的规则是 Fire 量化交易系统的核心约定。违反这些规则可能导致:
- 回测结果不准确
- 实盘交易异常
- 策略行为不可预测
所有策略开发者和数据处理模块开发者都必须熟悉并遵循这些规范。