交易数据模型
概述
交易数据模型定义了量化交易系统中所有与交易相关的数据结构,包括资产、持仓、订单等核心业务实体。
资产模型 (Assets)
用户资产 (UserAsset)
class UserAsset(BaseModel):
id: str # 资产ID
user_id: str # 用户ID
currency: str # 货币类型 (USD|HKD|CNY)
amount: float # 资产金额
asset_type: str # 资产类型 (cash|position)
created_at: datetime # 创建时间
updated_at: datetime # 更新时间
用户持仓 (UserPosition)
class UserPosition(BaseModel):
user_id: str # 用户ID
symbol: str # 证券代码 (股票: YINN.US, 期权: BILI251017C30000.US)
symbol_name: str # 证券名称
asset_type: AssetType # 资产类型 (stock|fund|option|bond|warrant)
quantity: Decimal # 持仓数量
available_quantity: Decimal # 可用数量
cost_price: Decimal # 成本价
current_price: Optional[Decimal] # 当前价格
market: MarketType # 市场类型 (US|HK|CN|SG|CRYPTO)
currency: CurrencyType # 货币类型 (USD|HKD|CNY)
# 期权相关字段 (仅当 asset_type=option 时有效)
contract_multiplier: Optional[int] # 合约乘数(期权通常为100,股票为1)
option_type: Optional[OptionType] # 期权类型 (call|put)
strike_price: Optional[Decimal] # 行权价
expiry_date: Optional[date] # 到期日
updated_at: datetime # 更新时间
# 相关枚举定义
class AssetType(str, Enum):
STOCK = "stock" # 股票
FUND = "fund" # 基金
CASH = "cash" # 现金
BOND = "bond" # 债券
OPTION = "option" # 期权
WARRANT = "warrant" # 权证
class OptionType(str, Enum):
CALL = "call" # 看涨期权
PUT = "put" # 看跌期权
期权代码格式说明:
- 格式:
{标的股票}{YYMMDD}{C/P}{行权价}.{市场} - 示例:
BILI251017C30000.USBILI: 标的股票 (Bilibili)251017: 到期日 2025年10月17日C: Call 看涨期权(P为Put看跌期权)30000: 行权价 $30.00(需除以1000)US: 美国市场
市值计算:
- 股票/基金:市值 = 数量 × 当前价格
- 期权:市值 = 数量 × 当前价格 × 合约乘数(通常为100)
- 示例:持有15张BILI期权,价格$2.50
- 市值 = 15 × $2.50 × 100 = $3,750.00
模拟资产 (SimulatedAsset)
class SimulatedAsset(BaseModel):
id: str # 资产ID
user_id: str # 用户ID
currency: str # 货币类型 (USD|HKD|CNY)
amount: float # 资产金额
asset_type: str # 资产类型 (cash|position)
created_at: datetime # 创建时间
updated_at: datetime # 更新时间
模拟持仓 (SimulatedPosition)
class SimulatedPosition(BaseModel):
id: str # 持仓ID
user_id: str # 用户ID
symbol: str # 证券代码 (股票: YINN.US, 期权: BILI251017C30000.US)
symbol_name: str # 证券名称
asset_type: AssetType # 资产类型 (stock|fund|option|bond|warrant)
quantity: Decimal # 持仓数量
cost_price: Decimal # 成本价
current_price: Optional[Decimal] # 当前价格
market: MarketType # 市场类型 (US|HK|CN|SG|CRYPTO)
currency: CurrencyType # 货币类型 (USD|HKD|CNY)
# 期权相关字段 (仅当 asset_type=option 时有效)
contract_multiplier: Optional[int] # 合约乘数(期权通常为100,股票为1)
option_type: Optional[OptionType] # 期权类型 (call|put)
strike_price: Optional[Decimal] # 行权价
expiry_date: Optional[date] # 到期日
created_at: datetime # 创建时间
updated_at: datetime # 更新时间
股票信息 (Stock)
class Stock(BaseModel):
symbol: str # 股票代码 (如: YINN.US)
name: str # 股票名称
market: str # 市场 (US|HK|CN)
sector: Optional[str] # 行业
market_cap: Optional[float] # 市值
price: float # 当前价格
change: float # 涨跌幅
volume: int # 成交量
last_update: datetime # 最后更新时间
交易订单 (Order)
class Order(BaseModel):
id: str # 订单ID
user_id: str # 用户ID
account_id: str # 账户ID
symbol: str # 股票代码 (如: YINN.US)
side: OrderSide # 买卖方向 (buy|sell)
order_type: OrderType # 订单类型 (market|limit|stop)
quantity: int # 数量
price: Optional[float] # 价格 (限价单)
stop_price: Optional[float] # 止损价格
status: OrderStatus # 订单状态
filled_quantity: int # 已成交数量
average_price: float # 平均成交价
created_at: datetime # 创建时间
updated_at: datetime # 更新时间
订单相关枚举
class OrderSide(str, Enum):
BUY = "buy"
SELL = "sell"
class OrderType(str, Enum):
MARKET = "market"
LIMIT = "limit"
STOP = "stop"
class OrderStatus(str, Enum):
PENDING = "pending"
FILLED = "filled"
CANCELLED = "cancelled"
REJECTED = "rejected"
持仓记录 (Position)
class Position(BaseModel):
id: str # 持仓ID
user_id: str # 用户ID
account_id: str # 账户ID
symbol: str # 股票代码 (如: YINN.US)
quantity: int # 持仓数量
average_price: float # 平均成本价
current_price: float # 当前价格
market_value: float # 市值
unrealized_pnl: float # 未实现盈亏
realized_pnl: float # 已实现盈亏
currency: str # 货币类型
created_at: datetime # 创建时间
updated_at: datetime # 更新时间
交易历史 (Trade)
class Trade(BaseModel):
id: str # 交易ID
order_id: str # 订单ID
user_id: str # 用户ID
account_id: str # 账户ID
symbol: str # 股票代码 (如: YINN.US)
side: OrderSide # 买卖方向 (buy|sell)
quantity: int # 数量
price: float # 成交价格
amount: float # 成交金额
commission: float # 手续费
currency: str # 货币类型
timestamp: datetime # 成交时间
资产概览模型
资产概览 (AssetOverview)
class AssetOverview(BaseModel):
# 多货币资产数据
total_assets_by_currency: Optional[Dict[CurrencyType, Dict[str, Decimal]]]
cash_assets_by_currency: Optional[Dict[CurrencyType, Decimal]]
position_assets_by_currency: Optional[Dict[CurrencyType, Decimal]]
positions_by_currency: Optional[Dict[CurrencyType, List[UserPositionResponse]]]
# 兼容性字段(用于单货币显示)
total_assets: Optional[Decimal] # 总资产(主要货币)
cash_assets: Optional[Decimal] # 现金资产(主要货币)
position_assets: Optional[Decimal] # 持仓资产(主要货币)
today_pnl: Optional[Decimal] # 今日盈亏
currency: Optional[CurrencyType] # 主要货币类型
positions: List[UserPositionResponse] # 所有持仓列表
# 数据更新时间
updated_at: Optional[datetime] # 资产数据最后更新时间
模拟资产概览 (SimulatedAssetOverview)
class SimulatedAssetOverview(BaseModel):
# 多货币资产数据
total_assets_by_currency: Optional[Dict[CurrencyType, Dict[str, Decimal]]]
positions_by_currency: Optional[Dict[CurrencyType, List[SimulatedPositionResponse]]]
# 兼容性字段(用于单货币显示)
total_assets: Optional[Decimal] # 总资产(主要货币)
cash_assets: Optional[Decimal] # 现金资产(主要货币)
position_assets: Optional[Decimal] # 持仓资产(主要货币)
today_pnl: Optional[Decimal] # 今日盈亏
currency: Optional[CurrencyType] # 主要货币类型
positions: List[SimulatedPositionResponse] # 所有模拟持仓列表
# 数据更新时间
updated_at: Optional[datetime] # 资产数据最后更新时间
数据存储
Redis存储结构
user_asset:{user_id}:{currency} # 用户资产 (Hash)
user_position:{user_id}:{symbol} # 用户持仓 (Hash)
simulated_asset:{user_id}:{currency} # 模拟资产 (Hash)
simulated_position:{user_id}:{symbol} # 模拟持仓 (Hash)
持仓 Hash 字段(适用于 user_position 和 simulated_position):
id # 持仓ID
user_id # 用户ID
symbol # 证券代码
symbol_name # 证券名称
asset_type # 资产类型 (stock|fund|option|bond|warrant)
quantity # 持仓数量
available_quantity # 可用数量
cost_price # 成本价
current_price # 当前价格
market # 市场类型
currency # 货币类型
contract_multiplier # 合约乘数 (期权字段)
option_type # 期权类型 (期权字段)
strike_price # 行权价 (期权字段)
expiry_date # 到期日 (期权字段)
created_at # 创建时间
updated_at # 更新时间
API接口
资产管理
GET /api/v1/assets/overview- 获取资产概览POST /api/v1/assets/- 创建用户资产GET /api/v1/assets/- 获取用户资产PUT /api/v1/assets/- 更新用户资产DELETE /api/v1/assets/- 删除用户资产
持仓管理
GET /api/v1/assets/positions- 获取持仓列表POST /api/v1/assets/positions- 创建持仓PUT /api/v1/assets/positions/{id}- 更新持仓DELETE /api/v1/assets/positions/{id}- 删除持仓
模拟资产管理
GET /api/v1/assets/simulated/overview- 获取模拟资产概览POST /api/v1/assets/simulated/- 创建模拟资产GET /api/v1/assets/simulated/- 获取模拟资产PUT /api/v1/assets/simulated/- 更新模拟资产
资产同步
POST /api/v1/assets/sync-from-longport- 从LongPort同步资产POST /api/v1/assets/sync-to-simulated- 同步到模拟资产POST /api/v1/assets/sync- 通用同步接口
业务规则
资产管理
- 支持多币种资产 (USD, HKD, CNY)
- 资产金额必须 ≥ 0
- 资产类型分为现金和持仓
- 支持真实资产和模拟资产
持仓管理
- 持仓数量必须 ≥ 0
- 成本价和当前价格必须 ≥ 0
- 支持多种资产类型(股票、基金、期权、债券、权证)
- 支持多币种持仓
市值计算规则:
- 股票/基金/债券/权证:市值 = 数量 × 当前价格 × 1
- 期权:市值 = 数量 × 当前价格 × 合约乘数(默认100)
- 合约乘数字段 (
contract_multiplier) 支持自定义,如不存在则根据资产类型自动推断
期权特有属性:
option_type: 区分 Call(看涨)和 Put(看跌)strike_price: 行权价格expiry_date: 到期日期contract_multiplier: 合约乘数(通常为100)
数据同步
- 支持从LongPort API同步真实资产
- 支持真实资产同步到模拟资产
- 同步过程支持WebSocket实时日志
- 同步失败时保持原有数据不变
历史数据管理模型
策略状态 (StrategyState)
class StrategyState(Enum):
"""策略生命周期状态"""
INITIALIZING = "initializing" # 初始化中
WARMING_UP = "warming_up" # 预热中(加载历史数据)
READY = "ready" # 就绪(预热完成,等待交易)
TRADING = "trading" # 交易中
STOPPED = "stopped" # 已停止
ERROR = "error" # 错误状态
状态转换规则:
- 允许相同状态转换(幂等性)
- 任何状态都可转换到 STOPPED 或 ERROR
- 正常流程: INITIALIZING → WARMING_UP → READY → TRADING
- 不允许逆向转换
策略数据需求 (StrategyDataRequirement)
@dataclass
class StrategyDataRequirement:
"""策略声明的历史数据需求"""
required_bar_count: int # 需要的K线数量
bar_interval: timedelta = timedelta(minutes=1) # K线间隔(默认1分钟)
warmup_mode: Literal["strict", "best_effort"] = "best_effort" # 预热模式
def calculate_time_range(self, reference_time: datetime) -> tuple[datetime, datetime]:
"""计算需要加载的时间范围"""
total_duration = self.required_bar_count * self.bar_interval
start_time = reference_time - total_duration
end_time = reference_time - self.bar_interval # 不包含参考时间点
return (start_time, end_time)
使用示例:
# MACD策略需要 slow_period(26) + signal_period(17) = 43 条数据
requirement = StrategyDataRequirement(
required_bar_count=43,
bar_interval=timedelta(minutes=1),
warmup_mode="best_effort"
)
市场数据批次 (MarketDataBatch)
@dataclass
class MarketDataBatch:
"""历史数据批次容器"""
symbol: str # 股票代码
data: List[MarketData] # 市场数据列表
time_range: tuple[datetime, datetime] # 时间范围 (start, end)
count: int = field(init=False) # 数据条数(自动计算)
loaded_at: datetime = field(default_factory=datetime.now) # 加载时间
cache_key: Optional[str] = None # 缓存键
source: Literal["database", "cache"] = "database" # 数据来源
def is_empty(self) -> bool:
"""检查批次是否为空"""
return self.count == 0
def get_first_timestamp(self) -> Optional[datetime]:
"""获取第一条数据的时间戳"""
return self.data[0].timestamp if self.data else None
def get_last_timestamp(self) -> Optional[datetime]:
"""获取最后一条数据的时间戳"""
return self.data[-1].timestamp if self.data else None
数据加载上下文 (DataLoadingContext)
@dataclass
class DataLoadingContext:
"""数据加载进度跟踪"""
mode: Literal["backtest", "realtime"] # 运行模式
symbol: str # 股票代码
start_time: datetime # 开始时间
end_time: datetime # 结束时间
bar_count: int # 预期数据条数
status: Literal["pending", "loading", "complete", "error"] # 加载状态
progress: float = 0.0 # 加载进度 (0.0 to 1.0)
error_message: Optional[str] = None # 错误信息
def update_progress(self, loaded: int, total: int) -> None:
"""更新加载进度"""
self.status = "loading"
self.progress = min(loaded / total, 1.0) if total > 0 else 0.0
def mark_complete(self) -> None:
"""标记加载完成"""
self.status = "complete"
self.progress = 1.0
def mark_error(self, error_message: str) -> None:
"""标记加载失败"""
self.status = "error"
self.error_message = error_message
时间戳注册表 (TimestampRegistry)
class TimestampRegistry:
"""防止重复处理相同时间戳的数据(LRU淘汰)"""
def __init__(self, max_per_symbol: int = 10000):
"""
初始化时间戳注册表
Args:
max_per_symbol: 每个股票最多保留的时间戳数量(LRU淘汰)
"""
self.max_per_symbol = max_per_symbol
self._timestamps: Dict[str, deque] = defaultdict(
lambda: deque(maxlen=self.max_per_symbol)
)
self._timestamp_sets: Dict[str, Set[datetime]] = defaultdict(set)
def register(self, symbol: str, timestamp: datetime) -> bool:
"""
注册时间戳
Returns:
True: 新时间戳,注册成功
False: 重复时间戳,已存在
"""
if timestamp in self._timestamp_sets[symbol]:
return False # 重复
# 自动LRU淘汰
evicted = None
if len(self._timestamps[symbol]) == self.max_per_symbol:
evicted = self._timestamps[symbol][0]
self._timestamps[symbol].append(timestamp)
self._timestamp_sets[symbol].add(timestamp)
# 从set中移除被淘汰的时间戳
if evicted and evicted in self._timestamp_sets[symbol]:
self._timestamp_sets[symbol].remove(evicted)
return True
def is_registered(self, symbol: str, timestamp: datetime) -> bool:
"""检查时间戳是否已注册"""
return timestamp in self._timestamp_sets[symbol]
def get_count(self, symbol: str) -> int:
"""获取股票已注册的时间戳数量"""
return len(self._timestamps[symbol])
def clear(self, symbol: str) -> None:
"""清除指定股票的所有时间戳"""
self._timestamps[symbol].clear()
self._timestamp_sets[symbol].clear()
def clear_all(self) -> None:
"""清除所有股票的时间戳"""
self._timestamps.clear()
self._timestamp_sets.clear()
自定义异常
InsufficientDataError
class InsufficientDataError(Exception):
"""数据库历史数据不足异常"""
def __init__(self, symbol: str, required: int, available: int):
self.symbol = symbol # 股票代码
self.required = required # 需要的数据条数
self.available = available # 实际可用的数据条数
self.deficit = required - available # 缺口
super().__init__(
f"Insufficient data for {symbol}: required {required} bars, "
f"only {available} available"
)
使用场景: 当数据库中的历史数据不足以满足策略预热需求时抛出
DataLoadingError
class DataLoadingError(Exception):
"""历史数据加载失败异常"""
def __init__(self, symbol: str, message: str, cause: Optional[Exception] = None):
self.symbol = symbol # 股票代码
self.message = message # 错误消息
self.cause = cause # 原始异常(如果有)
super().__init__(f"Failed to load data for {symbol}: {message}")
使用场景: 数据库查询失败、网络错误、缓存错误等
StrategyError
class StrategyError(Exception):
"""策略执行异常"""
def __init__(self, strategy_name: str, message: str, cause: Optional[Exception] = None):
self.strategy_name = strategy_name # 策略名称
self.message = message # 错误消息
self.cause = cause # 原始异常(如果有)
super().__init__(f"Strategy {strategy_name} error: {message}")
使用场景: 策略在预热或交易过程中抛出异常
历史数据管理业务规则
预热模式
- strict 模式: 必须加载足够的历史数据,否则抛出 InsufficientDataError
- best_effort 模式: 尽力加载数据,即使不足也继续(记录警告日志)
时间戳去重
- 每个股票维护独立的时间戳集合
- 使用 LRU 策略自动淘汰旧时间戳(默认保留最近 10000 条)
- O(1) 时间复杂度的重复检测
数据加载策略
- 缓存优先: 先尝试从 Redis 缓存加载
- 数据库回退: 缓存未命中时从 PostgreSQL 数据库加载
- 缓存回写: 数据库数据加载后写入缓存
- 统计跟踪: 记录缓存命中率、加载时间等指标
状态转换
- INITIALIZING → WARMING_UP: 开始加载历史数据
- WARMING_UP → READY: 历史数据加载完成,指标已初始化
- READY → TRADING: 收到第一条实时数据,开始生成交易信号
- 任何状态 → STOPPED: 用户停止策略
- 任何状态 → ERROR: 发生错误
性能优化
- 异步加载: 所有数据加载操作都是异步的,不阻塞主线程
- 批量加载: 一次性加载所有需要的历史数据
- 缓存命中率监控: 跟踪缓存效果,优化缓存策略
多通道交易数据模型
通道配置 (ChannelConfig)
@dataclass
class ChannelConfig:
"""通道配置数据模型"""
channel_id: str # 通道唯一标识
channel_type: str # 通道类型 (aggressive|balanced|conservative)
enabled: bool # 是否启用
capital_allocation: Decimal # 资金分配比例 (0.0-1.0)
# 策略配置
strategies: List[StrategyConfig] # 策略配置列表
# 风险限制
risk_limits: RiskLimits # 风险限制配置
# 投票权重
voting_weight: Decimal # 在净额计算中的投票权重
# 元数据
created_at: datetime # 创建时间
updated_at: datetime # 更新时间
通道状态 (Channel)
@dataclass
class Channel:
"""交易通道实体"""
channel_id: str # 通道ID
channel_type: str # 通道类型
status: ChannelStatus # 通道状态
config: ChannelConfig # 通道配置
# 策略引擎
strategy_engine: Optional[StrategyEngine] # 策略引擎实例
# 虚拟持仓
virtual_positions: Dict[str, VirtualPosition] # 虚拟持仓字典 {symbol: position}
# 性能指标
performance_metrics: PerformanceMetrics # 通道性能指标
# 时间戳
created_at: datetime # 创建时间
started_at: Optional[datetime] # 启动时间
stopped_at: Optional[datetime] # 停止时间
class ChannelStatus(str, Enum):
"""通道状态枚举"""
CREATED = "created" # 已创建
STARTING = "starting" # 启动中
RUNNING = "running" # 运行中
PAUSED = "paused" # 已暂停
STOPPING = "stopping" # 停止中
STOPPED = "stopped" # 已停止
ERROR = "error" # 错误状态
虚拟持仓 (VirtualPosition)
@dataclass
class VirtualPosition:
"""通道虚拟持仓"""
channel_id: str # 所属通道ID
symbol: str # 股票代码
side: PositionSide # 持仓方向 (long|short)
quantity: Decimal # 持仓数量
entry_price: Decimal # 入场价格
current_price: Decimal # 当前价格
# 盈亏计算
unrealized_pnl: Decimal # 未实现盈亏
unrealized_pnl_pct: Decimal # 未实现盈亏百分比
# 时间信息
entry_time: datetime # 入场时间
holding_period: int # 持仓时长(分钟)
# 权重信息
contribution_weight: Decimal # 在净额计算中的贡献权重
# 元数据
updated_at: datetime # 最后更新时间
class PositionSide(str, Enum):
"""持仓方向"""
LONG = "long" # 多头
SHORT = "short" # 空头
净额持仓 (NetPosition)
@dataclass
class NetPosition:
"""净额持仓(多通道合并后)"""
symbol: str # 股票代码
net_quantity: Decimal # 净数量(正数为多头,负数为空头)
net_side: PositionSide # 净方向 (long|short|flat)
avg_entry_price: Decimal # 平均入场价(加权平均)
current_price: Decimal # 当前价格
# 盈亏信息
total_unrealized_pnl: Decimal # 总未实现盈亏
# 来源信息
channel_positions: Dict[str, VirtualPosition] # 各通道的虚拟持仓
# 计算信息
calculated_at: datetime # 计算时间
风险限制 (RiskLimits)
@dataclass
class RiskLimits:
"""通道级风险限制"""
# 亏损限制
max_loss_per_trade: Decimal # 单笔最大亏损比例 (0.01 = 1%)
max_daily_loss: Decimal # 日内最大亏损比例 (0.05 = 5%)
max_drawdown: Decimal # 最大回撤限制 (0.15 = 15%)
# 杠杆限制
leverage_limit: Decimal # 杠杆倍数限制 (1.0 = 无杠杆)
# 持仓限制
max_position_per_stock: Decimal # 单个股票最大持仓比例
max_stocks_count: int # 最大持股数量
# 交易限制
max_order_size: Optional[Decimal] # 单笔最大订单金额
min_order_size: Optional[Decimal] # 单笔最小订单金额
投票决策 (ChannelDecision)
@dataclass
class ChannelDecision:
"""通道级交易决策(投票结果)"""
channel_id: str # 通道ID
symbol: str # 股票代码
action: TradeAction # 交易动作 (buy|sell|hold)
quantity: Decimal # 建议数量
confidence: Decimal # 置信度 (0.0-1.0)
# 投票详情
voting_details: Dict[str, Decimal] # 各动作的投票权重
# 策略信号
strategy_signals: List[StrategySignal] # 参与投票的策略信号
# 时间戳
created_at: datetime # 决策时间
class TradeAction(str, Enum):
"""交易动作"""
BUY = "buy" # 买入
SELL = "sell" # 卖出
HOLD = "hold" # 持有
通道性能指标 (ChannelPerformanceMetrics)
@dataclass
class ChannelPerformanceMetrics:
"""通道性能指标"""
channel_id: str # 通道ID
channel_type: str # 通道类型
# 收益指标
total_return: Decimal # 总收益率
annualized_return: Decimal # 年化收益率
daily_return: Decimal # 日收益率
# 风险指标
max_drawdown: Decimal # 最大回撤
volatility: Decimal # 波动率
sharpe_ratio: Decimal # 夏普比率
# 交易统计
total_trades: int # 总交易次数
winning_trades: int # 盈利交易数
losing_trades: int # 亏损交易数
win_rate: Decimal # 胜率
# 费用统计
total_fees: Decimal # 总费用
# 持仓信息
current_positions_count: int # 当前持仓数量
total_position_value: Decimal # 持仓总市值
# 时间戳
updated_at: datetime # 更新时间
Redis存储结构(多通道)
# 通道配置
channel:config:{session_id}:{channel_id} # 通道配置 (Hash)
# 通道状态
channel:status:{session_id}:{channel_id} # 通道状态 (String)
# 虚拟持仓
channel:position:{session_id}:{channel_id}:{symbol} # 虚拟持仓 (Hash)
# 净额持仓
channel:net_position:{session_id}:{symbol} # 净额持仓 (Hash)
# 通道性能
channel:performance:{session_id}:{channel_id} # 性能指标 (Hash)
# 投票决策
channel:decision:{session_id}:{symbol} # 最新决策 (Hash)
API接口(多通道)
通道管理
GET /api/v1/channels/sessions/{session_id}/channels- 获取所有通道GET /api/v1/channels/sessions/{session_id}/channels/{channel_id}- 获取通道详情POST /api/v1/channels/sessions/{session_id}/channels/{channel_id}/start- 启动通道POST /api/v1/channels/sessions/{session_id}/channels/{channel_id}/stop- 停止通道PUT /api/v1/channels/sessions/{session_id}/channels/{channel_id}/config- 更新通道配置
虚拟持仓
GET /api/v1/channels/sessions/{session_id}/positions- 获取所有虚拟持仓GET /api/v1/channels/sessions/{session_id}/positions/net- 获取净额持仓
性能对比
GET /api/v1/channels/sessions/{session_id}/performance/comparison- 获取通道性能对比
多通道业务规则
通道配置规则
- 通道ID在会话内唯一
- 资金分配比例总和应≤ 1.0
- 每个通道至少配置1个策略
- 风险限制必须合理(如:max_loss_per_trade < max_daily_loss)
虚拟持仓规则
- 虚拟持仓不执行真实交易,仅用于决策
- 每个通道独立维护自己的虚拟持仓
- 虚拟持仓支持多空双向
净额计算规则
- 根据通道权重和置信度计算加权平均
- 多空持仓相互抵消
- 最终净额持仓用于生成真实交易订单
- 算法:
net_quantity = Σ(channel_quantity × channel_weight × confidence)
投票规则
- 策略组内投票:根据策略权重和置信度加权
- 通道间协调:根据通道权重和决策置信度加权
- 冲突解决:选择加权投票最高的动作
风险控制规则
- 通道级风险限制独立生效
- 全局风险限制优先级最高
- 超过风险限制时自动停止通道
热更新规则
- 配置更新不影响已有持仓
- 新配置立即生效
- 支持回滚到上一次配置
回测优化数据模型
优化任务 (OptimizationTask)
@dataclass
class OptimizationTask:
"""参数优化任务"""
task_id: str # 任务唯一标识
session_id: str # 关联的交易会话ID
parameter_space: Dict[str, ParameterRange] # 参数搜索空间
optimization_metric: str # 优化目标指标
optimization_method: str # 优化方法 (grid_search|bayesian|random)
status: TaskStatus # 任务状态
total_combinations: int # 总参数组合数
completed_combinations: int # 已完成组合数
best_result_id: Optional[str] # 最佳结果ID
best_metric_value: Optional[float] # 最佳指标值
created_at: datetime # 创建时间
started_at: Optional[datetime] # 开始时间
completed_at: Optional[datetime] # 完成时间
class TaskStatus(str, Enum):
"""任务状态"""
CREATED = "created" # 已创建
QUEUED = "queued" # 已排队
RUNNING = "running" # 运行中
COMPLETED = "completed" # 已完成
FAILED = "failed" # 失败
CANCELLED = "cancelled" # 已取消
参数范围 (ParameterRange)
@dataclass
class ParameterRange:
"""参数搜索范围定义"""
parameter_name: str # 参数名称
parameter_type: str # 参数类型 (float|int|categorical)
min_value: Optional[float] # 最小值(数值型)
max_value: Optional[float] # 最大值(数值型)
step: Optional[float] # 步长(数值型)
choices: Optional[List[Any]] # 可选值(分类型)
default_value: Any # 默认值
description: str # 参数描述
回测结果 (BacktestResult)
@dataclass
class BacktestResult:
"""回测结果"""
result_id: str # 结果唯一标识
task_id: str # 关联的优化任务ID
session_id: str # 关联的会话ID
parameters: Dict[str, Any] # 使用的参数组合
performance_metrics: PerformanceMetrics # 性能指标
total_trades: int # 总交易次数
winning_trades: int # 盈利交易数
losing_trades: int # 亏损交易数
execution_time: float # 执行时间(秒)
completed_at: datetime # 完成时间
API接口(回测优化)
优化任务管理(待实现)
POST /api/v1/optimization/tasks- 创建优化任务GET /api/v1/optimization/tasks/{task_id}- 获取任务详情GET /api/v1/optimization/tasks/{task_id}/results- 获取所有结果DELETE /api/v1/optimization/tasks/{task_id}- 取消任务
安全考虑
- 用户只能访问自己的资产数据
- 敏感操作需要管理员权限
- 资产变更需要审计日志
- 支持数据备份和恢复
- 通道配置变更需要记录日志
- 虚拟持仓与真实持仓严格隔离