Coverage for core/trading/risk/risk_engine.py: 82.44%
205 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-13 18:58 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-13 18:58 +0000
1"""
2风险管理引擎
3"""
5import math
6from datetime import datetime, timedelta
7from decimal import Decimal
8from typing import Any, Callable, Dict, List, Optional
10from core.models.trading import (MarketData, Order, OrderSide, OrderType,
11 Portfolio, Position, RiskLimits, RiskMetrics,
12 RiskResult)
15class RiskEngine:
16 """风险管理引擎"""
18 def __init__(self, user_id: str, risk_config: Dict[str, Any]):
19 self.user_id = user_id
20 self.risk_config = risk_config
22 # 风险参数
23 self.max_position_ratio = Decimal(
24 str(risk_config.get("max_position_ratio", 0.1))
25 )
26 self.stop_loss_ratio = Decimal(str(risk_config.get("stop_loss_ratio", 0.05)))
27 self.max_drawdown = Decimal(str(risk_config.get("max_drawdown", 0.15)))
28 self.allowed_symbols = risk_config.get("allowed_symbols", [])
30 # 高级风险参数
31 self.max_single_order_ratio = Decimal(
32 str(risk_config.get("max_single_order_ratio", 0.05))
33 )
34 self.max_daily_loss_ratio = Decimal(
35 str(risk_config.get("max_daily_loss_ratio", 0.02))
36 )
37 self.min_cash_ratio = Decimal(str(risk_config.get("min_cash_ratio", 0.1)))
38 self.max_leverage = Decimal(str(risk_config.get("max_leverage", 1.0)))
40 # 风险监控
41 self.daily_pnl = Decimal("0")
42 self.max_portfolio_value = Decimal("0")
43 self.current_drawdown = Decimal("0")
45 # 风险事件记录
46 self.risk_events: List[Dict[str, Any]] = []
48 # 回调函数
49 self.on_risk_alert: Optional[Callable] = None
50 self.on_risk_violation: Optional[Callable] = None
52 def set_callbacks(
53 self,
54 on_risk_alert: Optional[Callable] = None,
55 on_risk_violation: Optional[Callable] = None,
56 ):
57 """设置风险回调函数"""
58 self.on_risk_alert = on_risk_alert
59 self.on_risk_violation = on_risk_violation
61 def validate_order(self, order: Order, portfolio: Portfolio) -> RiskResult:
62 """验证订单风险"""
63 errors = []
64 warnings = []
66 # 1. 基础风险检查
67 basic_check = self._check_basic_risks(order, portfolio)
68 errors.extend(basic_check["errors"])
69 warnings.extend(basic_check["warnings"])
71 # 2. 资金充足性检查
72 capital_check = self._check_capital_adequacy(order, portfolio)
73 errors.extend(capital_check["errors"])
74 warnings.extend(capital_check["warnings"])
76 # 3. 持仓比例检查
77 position_check = self._check_position_ratio(order, portfolio)
78 errors.extend(position_check["errors"])
79 warnings.extend(position_check["warnings"])
81 # 4. 单笔订单限制检查
82 order_size_check = self._check_order_size(order, portfolio)
83 errors.extend(order_size_check["errors"])
84 warnings.extend(order_size_check["warnings"])
86 # 5. 日交易量限制检查已移除
88 # 6. 杠杆限制检查
89 leverage_check = self._check_leverage(order, portfolio)
90 errors.extend(leverage_check["errors"])
91 warnings.extend(leverage_check["warnings"])
93 # 记录风险事件
94 if errors:
95 self._record_risk_event(
96 "order_validation_failed",
97 {"order": order.model_dump(), "errors": errors, "warnings": warnings},
98 )
100 if self.on_risk_violation:
101 self.on_risk_violation("订单风险检查失败", errors)
103 return RiskResult(is_valid=len(errors) == 0, errors=errors, warnings=warnings)
105 def calculate_position_size(
106 self, symbol: str, price: Decimal, risk_ratio: Optional[Decimal] = None
107 ) -> Decimal:
108 """计算建议持仓数量"""
109 if risk_ratio is None:
110 risk_ratio = self.max_position_ratio
112 # 基于风险比例计算最大可投入资金
113 # 这里需要获取当前投资组合信息
114 # 简化处理,假设有100万资金
115 max_investment = Decimal("1000000") * risk_ratio
117 # 计算持仓数量
118 position_size = max_investment / price
120 return position_size
122 def get_risk_metrics(self, portfolio: Portfolio) -> RiskMetrics:
123 """计算风险指标"""
124 try:
125 # 计算VaR (简化版本)
126 var_95 = self._calculate_var(portfolio, 0.95)
127 var_99 = self._calculate_var(portfolio, 0.99)
129 # 计算最大回撤
130 max_drawdown = self._calculate_max_drawdown(portfolio)
132 # 计算波动率
133 volatility = self._calculate_volatility(portfolio)
135 # 计算贝塔系数 (简化版本)
136 beta = self._calculate_beta(portfolio)
138 return RiskMetrics(
139 var_95=var_95,
140 var_99=var_99,
141 max_drawdown=max_drawdown,
142 volatility=volatility,
143 beta=beta,
144 )
146 except Exception as e:
147 # 风险引擎暂时使用print,因为它没有直接的WebSocket接口
148 print(f"❌ 计算风险指标失败: {e}")
149 return RiskMetrics(
150 var_95=Decimal("0"),
151 var_99=Decimal("0"),
152 max_drawdown=Decimal("0"),
153 volatility=Decimal("0"),
154 beta=Decimal("1"),
155 )
157 def check_portfolio_risk(self, portfolio: Portfolio) -> RiskResult:
158 """检查投资组合风险"""
159 errors = []
160 warnings = []
162 # 1. 检查最大回撤
163 current_drawdown = self._calculate_current_drawdown(portfolio)
164 if current_drawdown > self.max_drawdown:
165 errors.append(
166 f"当前回撤 {current_drawdown:.2%} 超过最大回撤限制 {self.max_drawdown:.2%}"
167 )
169 # 2. 检查现金比例
170 cash_ratio = (
171 portfolio.cash / portfolio.total_value
172 if portfolio.total_value > 0
173 else Decimal("0")
174 )
175 if cash_ratio < self.min_cash_ratio:
176 warnings.append(
177 f"现金比例 {cash_ratio:.2%} 低于建议比例 {self.min_cash_ratio:.2%}"
178 )
180 # 3. 检查日亏损
181 if self.daily_pnl < -self.max_daily_loss_ratio * portfolio.total_value:
182 errors.append(f"日亏损 {self.daily_pnl} 超过日最大亏损限制")
184 # 4. 检查持仓集中度
185 concentration_risk = self._check_concentration_risk(portfolio)
186 if concentration_risk:
187 warnings.append(concentration_risk)
189 return RiskResult(is_valid=len(errors) == 0, errors=errors, warnings=warnings)
191 def update_daily_metrics(self, portfolio: Portfolio, daily_pnl: Decimal):
192 """更新日度风险指标"""
193 self.daily_pnl = daily_pnl
195 # 更新最大投资组合价值
196 if portfolio.total_value > self.max_portfolio_value:
197 self.max_portfolio_value = portfolio.total_value
199 # 计算当前回撤
200 self.current_drawdown = self._calculate_current_drawdown(portfolio)
202 # 检查风险告警
203 self._check_risk_alerts(portfolio)
205 def reset_daily_metrics(self):
206 """重置日度指标"""
207 self.daily_pnl = Decimal("0")
208 # 风险引擎暂时使用print,因为它没有直接的WebSocket接口
209 print("🔄 日度风险指标已重置")
211 def get_risk_summary(self) -> Dict[str, Any]:
212 """获取风险摘要"""
213 return {
214 "user_id": self.user_id,
215 "risk_config": self.risk_config,
216 "daily_pnl": self.daily_pnl,
217 "current_drawdown": self.current_drawdown,
218 "max_portfolio_value": self.max_portfolio_value,
219 "risk_events_count": len(self.risk_events),
220 "last_risk_event": self.risk_events[-1] if self.risk_events else None,
221 }
223 # ===== 私有方法 =====
225 def _check_basic_risks(self, order: Order, portfolio: Portfolio) -> Dict[str, Any]:
226 """检查基础风险"""
227 errors = []
228 warnings = []
230 # 检查允许交易的股票代码
231 if self.allowed_symbols and order.symbol not in self.allowed_symbols:
232 errors.append(f"股票 {order.symbol} 不在允许交易列表中")
234 return {"errors": errors, "warnings": warnings}
236 def _check_capital_adequacy(
237 self, order: Order, portfolio: Portfolio
238 ) -> Dict[str, Any]:
239 """检查资金充足性"""
240 errors = []
241 warnings = []
243 if order.side == OrderSide.BUY:
244 # 买入订单需要检查资金
245 if order.order_type == OrderType.MARKET:
246 # 市价单需要估算价格
247 estimated_cost = order.quantity * Decimal("100") # 简化估算
248 if estimated_cost > portfolio.cash:
249 errors.append("资金不足,无法执行买入订单")
250 elif order.order_type == OrderType.LIMIT and order.price:
251 required_capital = order.quantity * order.price
252 if required_capital > portfolio.cash:
253 errors.append("资金不足,无法执行买入订单")
255 return {"errors": errors, "warnings": warnings}
257 def _check_position_ratio(
258 self, order: Order, portfolio: Portfolio
259 ) -> Dict[str, Any]:
260 """检查持仓比例"""
261 errors = []
262 warnings = []
264 # 计算当前持仓价值
265 current_position_value = Decimal("0")
266 for position in portfolio.positions:
267 if position.symbol == order.symbol:
268 current_position_value = position.market_value
269 break
271 # 计算订单价值
272 if order.order_type == OrderType.MARKET:
273 order_value = order.quantity * Decimal("100") # 简化估算
274 elif order.order_type == OrderType.LIMIT and order.price:
275 order_value = order.quantity * order.price
276 else:
277 order_value = Decimal("0")
279 # 计算新的持仓比例
280 if order.side == OrderSide.BUY:
281 new_position_value = current_position_value + order_value
282 else:
283 new_position_value = current_position_value - order_value
285 position_ratio = (
286 new_position_value / portfolio.total_value
287 if portfolio.total_value > 0
288 else Decimal("0")
289 )
291 if position_ratio > self.max_position_ratio:
292 errors.append(
293 f"持仓比例 {position_ratio:.2%} 超过最大限制 {self.max_position_ratio:.2%}"
294 )
296 return {"errors": errors, "warnings": warnings}
298 def _check_order_size(self, order: Order, portfolio: Portfolio) -> Dict[str, Any]:
299 """检查单笔订单大小"""
300 errors = []
301 warnings = []
303 # 计算订单价值
304 if order.order_type == OrderType.MARKET:
305 order_value = order.quantity * Decimal("100") # 简化估算
306 elif order.order_type == OrderType.LIMIT and order.price:
307 order_value = order.quantity * order.price
308 else:
309 order_value = Decimal("0")
311 # 检查单笔订单比例
312 order_ratio = (
313 order_value / portfolio.total_value
314 if portfolio.total_value > 0
315 else Decimal("0")
316 )
318 if order_ratio > self.max_single_order_ratio:
319 errors.append(
320 f"单笔订单比例 {order_ratio:.2%} 超过最大限制 {self.max_single_order_ratio:.2%}"
321 )
323 return {"errors": errors, "warnings": warnings}
325 def _check_leverage(self, order: Order, portfolio: Portfolio) -> Dict[str, Any]:
326 """检查杠杆限制"""
327 errors = []
328 warnings = []
330 # 简化版本:检查总持仓价值与现金的比例
331 total_position_value = sum(pos.market_value for pos in portfolio.positions)
332 leverage = (
333 total_position_value / portfolio.cash
334 if portfolio.cash > 0
335 else Decimal("0")
336 )
338 if leverage > self.max_leverage:
339 errors.append(f"杠杆比例 {leverage:.2f} 超过最大限制 {self.max_leverage}")
341 return {"errors": errors, "warnings": warnings}
343 def _check_concentration_risk(self, portfolio: Portfolio) -> Optional[str]:
344 """检查持仓集中度风险"""
345 if not portfolio.positions:
346 return None
348 # 计算最大单只股票持仓比例
349 max_position_ratio = Decimal("0")
350 for position in portfolio.positions:
351 position_ratio = position.market_value / portfolio.total_value
352 if position_ratio > max_position_ratio:
353 max_position_ratio = position_ratio
355 if max_position_ratio > self.max_position_ratio * Decimal(
356 "2"
357 ): # 允许单只股票超过一般限制的2倍
358 return f"持仓集中度风险:最大单只股票持仓比例 {max_position_ratio:.2%}"
360 return None
362 def _calculate_var(self, portfolio: Portfolio, confidence_level: float) -> Decimal:
363 """计算VaR (简化版本)"""
364 # 简化计算:基于历史波动率
365 # 实际应该使用更复杂的模型
366 volatility = self._calculate_volatility(portfolio)
367 z_score = 1.96 if confidence_level == 0.95 else 2.58 # 95%和99%置信水平
369 var = portfolio.total_value * volatility * Decimal(str(z_score))
370 return var
372 def _calculate_volatility(self, portfolio: Portfolio) -> Decimal:
373 """计算波动率 (简化版本)"""
374 # 简化计算:基于持仓权重
375 if not portfolio.positions:
376 return Decimal("0.02") # 默认2%波动率
378 # 计算加权平均波动率
379 total_value = sum(pos.market_value for pos in portfolio.positions)
380 if total_value == 0:
381 return Decimal("0.02")
383 weighted_volatility = Decimal("0")
384 for position in portfolio.positions:
385 weight = position.market_value / total_value
386 # 简化:假设每只股票波动率为20%
387 stock_volatility = Decimal("0.2")
388 weighted_volatility += weight * stock_volatility
390 return weighted_volatility
392 def _calculate_beta(self, portfolio: Portfolio) -> Decimal:
393 """计算贝塔系数 (简化版本)"""
394 # 简化计算:假设所有股票贝塔为1
395 return Decimal("1.0")
397 def _calculate_max_drawdown(self, portfolio: Portfolio) -> Decimal:
398 """计算最大回撤"""
399 return self.current_drawdown
401 def _calculate_current_drawdown(self, portfolio: Portfolio) -> Decimal:
402 """计算当前回撤"""
403 if self.max_portfolio_value == 0:
404 return Decimal("0")
406 current_drawdown = (
407 self.max_portfolio_value - portfolio.total_value
408 ) / self.max_portfolio_value
409 return max(current_drawdown, Decimal("0"))
411 def _check_risk_alerts(self, portfolio: Portfolio):
412 """检查风险告警"""
413 alerts = []
415 # 回撤告警
416 if self.current_drawdown > self.max_drawdown * Decimal("0.8"): # 80%阈值
417 alerts.append(f"回撤告警:当前回撤 {self.current_drawdown:.2%}")
419 # 日亏损告警
420 if (
421 self.daily_pnl
422 < -self.max_daily_loss_ratio * portfolio.total_value * Decimal("0.5")
423 ): # 50%阈值
424 alerts.append(f"日亏损告警:当日亏损 {self.daily_pnl}")
426 # 现金比例告警
427 cash_ratio = (
428 portfolio.cash / portfolio.total_value
429 if portfolio.total_value > 0
430 else Decimal("0")
431 )
432 if cash_ratio < self.min_cash_ratio * Decimal("1.2"): # 120%阈值
433 alerts.append(f"现金比例告警:当前现金比例 {cash_ratio:.2%}")
435 # 触发告警回调
436 if alerts and self.on_risk_alert:
437 for alert in alerts:
438 self.on_risk_alert(alert)
440 def _record_risk_event(self, event_type: str, event_data: Dict[str, Any]):
441 """记录风险事件"""
442 event = {
443 "timestamp": datetime.now(),
444 "event_type": event_type,
445 "data": event_data,
446 }
448 self.risk_events.append(event)
450 # 保持最近1000个事件
451 if len(self.risk_events) > 1000:
452 self.risk_events = self.risk_events[-1000:]