Coverage for core/trading/risk/risk_engine.py: 82.44%

205 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-13 18:58 +0000

1""" 

2风险管理引擎 

3""" 

4 

5import math 

6from datetime import datetime, timedelta 

7from decimal import Decimal 

8from typing import Any, Callable, Dict, List, Optional 

9 

10from core.models.trading import (MarketData, Order, OrderSide, OrderType, 

11 Portfolio, Position, RiskLimits, RiskMetrics, 

12 RiskResult) 

13 

14 

15class RiskEngine: 

16 """风险管理引擎""" 

17 

18 def __init__(self, user_id: str, risk_config: Dict[str, Any]): 

19 self.user_id = user_id 

20 self.risk_config = risk_config 

21 

22 # 风险参数 

23 self.max_position_ratio = Decimal( 

24 str(risk_config.get("max_position_ratio", 0.1)) 

25 ) 

26 self.stop_loss_ratio = Decimal(str(risk_config.get("stop_loss_ratio", 0.05))) 

27 self.max_drawdown = Decimal(str(risk_config.get("max_drawdown", 0.15))) 

28 self.allowed_symbols = risk_config.get("allowed_symbols", []) 

29 

30 # 高级风险参数 

31 self.max_single_order_ratio = Decimal( 

32 str(risk_config.get("max_single_order_ratio", 0.05)) 

33 ) 

34 self.max_daily_loss_ratio = Decimal( 

35 str(risk_config.get("max_daily_loss_ratio", 0.02)) 

36 ) 

37 self.min_cash_ratio = Decimal(str(risk_config.get("min_cash_ratio", 0.1))) 

38 self.max_leverage = Decimal(str(risk_config.get("max_leverage", 1.0))) 

39 

40 # 风险监控 

41 self.daily_pnl = Decimal("0") 

42 self.max_portfolio_value = Decimal("0") 

43 self.current_drawdown = Decimal("0") 

44 

45 # 风险事件记录 

46 self.risk_events: List[Dict[str, Any]] = [] 

47 

48 # 回调函数 

49 self.on_risk_alert: Optional[Callable] = None 

50 self.on_risk_violation: Optional[Callable] = None 

51 

52 def set_callbacks( 

53 self, 

54 on_risk_alert: Optional[Callable] = None, 

55 on_risk_violation: Optional[Callable] = None, 

56 ): 

57 """设置风险回调函数""" 

58 self.on_risk_alert = on_risk_alert 

59 self.on_risk_violation = on_risk_violation 

60 

61 def validate_order(self, order: Order, portfolio: Portfolio) -> RiskResult: 

62 """验证订单风险""" 

63 errors = [] 

64 warnings = [] 

65 

66 # 1. 基础风险检查 

67 basic_check = self._check_basic_risks(order, portfolio) 

68 errors.extend(basic_check["errors"]) 

69 warnings.extend(basic_check["warnings"]) 

70 

71 # 2. 资金充足性检查 

72 capital_check = self._check_capital_adequacy(order, portfolio) 

73 errors.extend(capital_check["errors"]) 

74 warnings.extend(capital_check["warnings"]) 

75 

76 # 3. 持仓比例检查 

77 position_check = self._check_position_ratio(order, portfolio) 

78 errors.extend(position_check["errors"]) 

79 warnings.extend(position_check["warnings"]) 

80 

81 # 4. 单笔订单限制检查 

82 order_size_check = self._check_order_size(order, portfolio) 

83 errors.extend(order_size_check["errors"]) 

84 warnings.extend(order_size_check["warnings"]) 

85 

86 # 5. 日交易量限制检查已移除 

87 

88 # 6. 杠杆限制检查 

89 leverage_check = self._check_leverage(order, portfolio) 

90 errors.extend(leverage_check["errors"]) 

91 warnings.extend(leverage_check["warnings"]) 

92 

93 # 记录风险事件 

94 if errors: 

95 self._record_risk_event( 

96 "order_validation_failed", 

97 {"order": order.model_dump(), "errors": errors, "warnings": warnings}, 

98 ) 

99 

100 if self.on_risk_violation: 

101 self.on_risk_violation("订单风险检查失败", errors) 

102 

103 return RiskResult(is_valid=len(errors) == 0, errors=errors, warnings=warnings) 

104 

105 def calculate_position_size( 

106 self, symbol: str, price: Decimal, risk_ratio: Optional[Decimal] = None 

107 ) -> Decimal: 

108 """计算建议持仓数量""" 

109 if risk_ratio is None: 

110 risk_ratio = self.max_position_ratio 

111 

112 # 基于风险比例计算最大可投入资金 

113 # 这里需要获取当前投资组合信息 

114 # 简化处理,假设有100万资金 

115 max_investment = Decimal("1000000") * risk_ratio 

116 

117 # 计算持仓数量 

118 position_size = max_investment / price 

119 

120 return position_size 

121 

122 def get_risk_metrics(self, portfolio: Portfolio) -> RiskMetrics: 

123 """计算风险指标""" 

124 try: 

125 # 计算VaR (简化版本) 

126 var_95 = self._calculate_var(portfolio, 0.95) 

127 var_99 = self._calculate_var(portfolio, 0.99) 

128 

129 # 计算最大回撤 

130 max_drawdown = self._calculate_max_drawdown(portfolio) 

131 

132 # 计算波动率 

133 volatility = self._calculate_volatility(portfolio) 

134 

135 # 计算贝塔系数 (简化版本) 

136 beta = self._calculate_beta(portfolio) 

137 

138 return RiskMetrics( 

139 var_95=var_95, 

140 var_99=var_99, 

141 max_drawdown=max_drawdown, 

142 volatility=volatility, 

143 beta=beta, 

144 ) 

145 

146 except Exception as e: 

147 # 风险引擎暂时使用print,因为它没有直接的WebSocket接口 

148 print(f"❌ 计算风险指标失败: {e}") 

149 return RiskMetrics( 

150 var_95=Decimal("0"), 

151 var_99=Decimal("0"), 

152 max_drawdown=Decimal("0"), 

153 volatility=Decimal("0"), 

154 beta=Decimal("1"), 

155 ) 

156 

157 def check_portfolio_risk(self, portfolio: Portfolio) -> RiskResult: 

158 """检查投资组合风险""" 

159 errors = [] 

160 warnings = [] 

161 

162 # 1. 检查最大回撤 

163 current_drawdown = self._calculate_current_drawdown(portfolio) 

164 if current_drawdown > self.max_drawdown: 

165 errors.append( 

166 f"当前回撤 {current_drawdown:.2%} 超过最大回撤限制 {self.max_drawdown:.2%}" 

167 ) 

168 

169 # 2. 检查现金比例 

170 cash_ratio = ( 

171 portfolio.cash / portfolio.total_value 

172 if portfolio.total_value > 0 

173 else Decimal("0") 

174 ) 

175 if cash_ratio < self.min_cash_ratio: 

176 warnings.append( 

177 f"现金比例 {cash_ratio:.2%} 低于建议比例 {self.min_cash_ratio:.2%}" 

178 ) 

179 

180 # 3. 检查日亏损 

181 if self.daily_pnl < -self.max_daily_loss_ratio * portfolio.total_value: 

182 errors.append(f"日亏损 {self.daily_pnl} 超过日最大亏损限制") 

183 

184 # 4. 检查持仓集中度 

185 concentration_risk = self._check_concentration_risk(portfolio) 

186 if concentration_risk: 

187 warnings.append(concentration_risk) 

188 

189 return RiskResult(is_valid=len(errors) == 0, errors=errors, warnings=warnings) 

190 

191 def update_daily_metrics(self, portfolio: Portfolio, daily_pnl: Decimal): 

192 """更新日度风险指标""" 

193 self.daily_pnl = daily_pnl 

194 

195 # 更新最大投资组合价值 

196 if portfolio.total_value > self.max_portfolio_value: 

197 self.max_portfolio_value = portfolio.total_value 

198 

199 # 计算当前回撤 

200 self.current_drawdown = self._calculate_current_drawdown(portfolio) 

201 

202 # 检查风险告警 

203 self._check_risk_alerts(portfolio) 

204 

205 def reset_daily_metrics(self): 

206 """重置日度指标""" 

207 self.daily_pnl = Decimal("0") 

208 # 风险引擎暂时使用print,因为它没有直接的WebSocket接口 

209 print("🔄 日度风险指标已重置") 

210 

211 def get_risk_summary(self) -> Dict[str, Any]: 

212 """获取风险摘要""" 

213 return { 

214 "user_id": self.user_id, 

215 "risk_config": self.risk_config, 

216 "daily_pnl": self.daily_pnl, 

217 "current_drawdown": self.current_drawdown, 

218 "max_portfolio_value": self.max_portfolio_value, 

219 "risk_events_count": len(self.risk_events), 

220 "last_risk_event": self.risk_events[-1] if self.risk_events else None, 

221 } 

222 

223 # ===== 私有方法 ===== 

224 

225 def _check_basic_risks(self, order: Order, portfolio: Portfolio) -> Dict[str, Any]: 

226 """检查基础风险""" 

227 errors = [] 

228 warnings = [] 

229 

230 # 检查允许交易的股票代码 

231 if self.allowed_symbols and order.symbol not in self.allowed_symbols: 

232 errors.append(f"股票 {order.symbol} 不在允许交易列表中") 

233 

234 return {"errors": errors, "warnings": warnings} 

235 

236 def _check_capital_adequacy( 

237 self, order: Order, portfolio: Portfolio 

238 ) -> Dict[str, Any]: 

239 """检查资金充足性""" 

240 errors = [] 

241 warnings = [] 

242 

243 if order.side == OrderSide.BUY: 

244 # 买入订单需要检查资金 

245 if order.order_type == OrderType.MARKET: 

246 # 市价单需要估算价格 

247 estimated_cost = order.quantity * Decimal("100") # 简化估算 

248 if estimated_cost > portfolio.cash: 

249 errors.append("资金不足,无法执行买入订单") 

250 elif order.order_type == OrderType.LIMIT and order.price: 

251 required_capital = order.quantity * order.price 

252 if required_capital > portfolio.cash: 

253 errors.append("资金不足,无法执行买入订单") 

254 

255 return {"errors": errors, "warnings": warnings} 

256 

257 def _check_position_ratio( 

258 self, order: Order, portfolio: Portfolio 

259 ) -> Dict[str, Any]: 

260 """检查持仓比例""" 

261 errors = [] 

262 warnings = [] 

263 

264 # 计算当前持仓价值 

265 current_position_value = Decimal("0") 

266 for position in portfolio.positions: 

267 if position.symbol == order.symbol: 

268 current_position_value = position.market_value 

269 break 

270 

271 # 计算订单价值 

272 if order.order_type == OrderType.MARKET: 

273 order_value = order.quantity * Decimal("100") # 简化估算 

274 elif order.order_type == OrderType.LIMIT and order.price: 

275 order_value = order.quantity * order.price 

276 else: 

277 order_value = Decimal("0") 

278 

279 # 计算新的持仓比例 

280 if order.side == OrderSide.BUY: 

281 new_position_value = current_position_value + order_value 

282 else: 

283 new_position_value = current_position_value - order_value 

284 

285 position_ratio = ( 

286 new_position_value / portfolio.total_value 

287 if portfolio.total_value > 0 

288 else Decimal("0") 

289 ) 

290 

291 if position_ratio > self.max_position_ratio: 

292 errors.append( 

293 f"持仓比例 {position_ratio:.2%} 超过最大限制 {self.max_position_ratio:.2%}" 

294 ) 

295 

296 return {"errors": errors, "warnings": warnings} 

297 

298 def _check_order_size(self, order: Order, portfolio: Portfolio) -> Dict[str, Any]: 

299 """检查单笔订单大小""" 

300 errors = [] 

301 warnings = [] 

302 

303 # 计算订单价值 

304 if order.order_type == OrderType.MARKET: 

305 order_value = order.quantity * Decimal("100") # 简化估算 

306 elif order.order_type == OrderType.LIMIT and order.price: 

307 order_value = order.quantity * order.price 

308 else: 

309 order_value = Decimal("0") 

310 

311 # 检查单笔订单比例 

312 order_ratio = ( 

313 order_value / portfolio.total_value 

314 if portfolio.total_value > 0 

315 else Decimal("0") 

316 ) 

317 

318 if order_ratio > self.max_single_order_ratio: 

319 errors.append( 

320 f"单笔订单比例 {order_ratio:.2%} 超过最大限制 {self.max_single_order_ratio:.2%}" 

321 ) 

322 

323 return {"errors": errors, "warnings": warnings} 

324 

325 def _check_leverage(self, order: Order, portfolio: Portfolio) -> Dict[str, Any]: 

326 """检查杠杆限制""" 

327 errors = [] 

328 warnings = [] 

329 

330 # 简化版本:检查总持仓价值与现金的比例 

331 total_position_value = sum(pos.market_value for pos in portfolio.positions) 

332 leverage = ( 

333 total_position_value / portfolio.cash 

334 if portfolio.cash > 0 

335 else Decimal("0") 

336 ) 

337 

338 if leverage > self.max_leverage: 

339 errors.append(f"杠杆比例 {leverage:.2f} 超过最大限制 {self.max_leverage}") 

340 

341 return {"errors": errors, "warnings": warnings} 

342 

343 def _check_concentration_risk(self, portfolio: Portfolio) -> Optional[str]: 

344 """检查持仓集中度风险""" 

345 if not portfolio.positions: 

346 return None 

347 

348 # 计算最大单只股票持仓比例 

349 max_position_ratio = Decimal("0") 

350 for position in portfolio.positions: 

351 position_ratio = position.market_value / portfolio.total_value 

352 if position_ratio > max_position_ratio: 

353 max_position_ratio = position_ratio 

354 

355 if max_position_ratio > self.max_position_ratio * Decimal( 

356 "2" 

357 ): # 允许单只股票超过一般限制的2倍 

358 return f"持仓集中度风险:最大单只股票持仓比例 {max_position_ratio:.2%}" 

359 

360 return None 

361 

362 def _calculate_var(self, portfolio: Portfolio, confidence_level: float) -> Decimal: 

363 """计算VaR (简化版本)""" 

364 # 简化计算:基于历史波动率 

365 # 实际应该使用更复杂的模型 

366 volatility = self._calculate_volatility(portfolio) 

367 z_score = 1.96 if confidence_level == 0.95 else 2.58 # 95%和99%置信水平 

368 

369 var = portfolio.total_value * volatility * Decimal(str(z_score)) 

370 return var 

371 

372 def _calculate_volatility(self, portfolio: Portfolio) -> Decimal: 

373 """计算波动率 (简化版本)""" 

374 # 简化计算:基于持仓权重 

375 if not portfolio.positions: 

376 return Decimal("0.02") # 默认2%波动率 

377 

378 # 计算加权平均波动率 

379 total_value = sum(pos.market_value for pos in portfolio.positions) 

380 if total_value == 0: 

381 return Decimal("0.02") 

382 

383 weighted_volatility = Decimal("0") 

384 for position in portfolio.positions: 

385 weight = position.market_value / total_value 

386 # 简化:假设每只股票波动率为20% 

387 stock_volatility = Decimal("0.2") 

388 weighted_volatility += weight * stock_volatility 

389 

390 return weighted_volatility 

391 

392 def _calculate_beta(self, portfolio: Portfolio) -> Decimal: 

393 """计算贝塔系数 (简化版本)""" 

394 # 简化计算:假设所有股票贝塔为1 

395 return Decimal("1.0") 

396 

397 def _calculate_max_drawdown(self, portfolio: Portfolio) -> Decimal: 

398 """计算最大回撤""" 

399 return self.current_drawdown 

400 

401 def _calculate_current_drawdown(self, portfolio: Portfolio) -> Decimal: 

402 """计算当前回撤""" 

403 if self.max_portfolio_value == 0: 

404 return Decimal("0") 

405 

406 current_drawdown = ( 

407 self.max_portfolio_value - portfolio.total_value 

408 ) / self.max_portfolio_value 

409 return max(current_drawdown, Decimal("0")) 

410 

411 def _check_risk_alerts(self, portfolio: Portfolio): 

412 """检查风险告警""" 

413 alerts = [] 

414 

415 # 回撤告警 

416 if self.current_drawdown > self.max_drawdown * Decimal("0.8"): # 80%阈值 

417 alerts.append(f"回撤告警:当前回撤 {self.current_drawdown:.2%}") 

418 

419 # 日亏损告警 

420 if ( 

421 self.daily_pnl 

422 < -self.max_daily_loss_ratio * portfolio.total_value * Decimal("0.5") 

423 ): # 50%阈值 

424 alerts.append(f"日亏损告警:当日亏损 {self.daily_pnl}") 

425 

426 # 现金比例告警 

427 cash_ratio = ( 

428 portfolio.cash / portfolio.total_value 

429 if portfolio.total_value > 0 

430 else Decimal("0") 

431 ) 

432 if cash_ratio < self.min_cash_ratio * Decimal("1.2"): # 120%阈值 

433 alerts.append(f"现金比例告警:当前现金比例 {cash_ratio:.2%}") 

434 

435 # 触发告警回调 

436 if alerts and self.on_risk_alert: 

437 for alert in alerts: 

438 self.on_risk_alert(alert) 

439 

440 def _record_risk_event(self, event_type: str, event_data: Dict[str, Any]): 

441 """记录风险事件""" 

442 event = { 

443 "timestamp": datetime.now(), 

444 "event_type": event_type, 

445 "data": event_data, 

446 } 

447 

448 self.risk_events.append(event) 

449 

450 # 保持最近1000个事件 

451 if len(self.risk_events) > 1000: 

452 self.risk_events = self.risk_events[-1000:]