Coverage for core/services/risk_service.py: 48.28%
116 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-13 18:58 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-13 18:58 +0000
1"""
2风险管理服务
3"""
5import logging
6from datetime import datetime
7from decimal import Decimal
8from typing import Any, Dict, List, Optional
10from core.models.risk_config import (RiskConfigModel, RiskConfigPreset,
11 RiskConfigTemplate)
12from core.repositories.trading_repository import TradingRepository
13from core.trading.risk import RiskEngine
15logger = logging.getLogger(__name__)
18class RiskService:
19 """风险管理服务"""
21 def __init__(
22 self, user_id: str, trading_repository: Optional[TradingRepository] = None
23 ):
24 self.user_id = user_id
25 self.trading_repo = trading_repository or TradingRepository()
26 self.risk_engines: Dict[str, RiskEngine] = {}
28 def create_risk_engine(
29 self, session_id: str, risk_config: Dict[str, Any]
30 ) -> RiskEngine:
31 """创建风险管理引擎"""
32 try:
33 # 验证风险配置
34 validated_config = RiskConfigModel.from_dict(risk_config)
36 # 创建风险管理引擎
37 risk_engine = RiskEngine(self.user_id, validated_config.to_dict())
39 # 设置回调函数
40 def on_risk_alert(message: str):
41 logger.warning(f"风险告警: {message}")
42 # 这里可以添加告警通知逻辑
44 def on_risk_violation(message: str, errors: List[str]):
45 logger.error(f"风险违规: {message}")
46 for error in errors:
47 logger.error(f" - {error}")
48 # 这里可以添加风险违规处理逻辑
50 risk_engine.set_callbacks(
51 on_risk_alert=on_risk_alert, on_risk_violation=on_risk_violation
52 )
54 # 存储风险管理引擎
55 self.risk_engines[session_id] = risk_engine
57 logger.info(f"风险管理引擎已创建: {session_id}")
58 return risk_engine
60 except Exception as e:
61 logger.error(f"创建风险管理引擎失败: {e}")
62 raise
64 def get_risk_engine(self, session_id: str) -> Optional[RiskEngine]:
65 """获取风险管理引擎"""
66 return self.risk_engines.get(session_id)
68 def remove_risk_engine(self, session_id: str) -> bool:
69 """移除风险管理引擎"""
70 if session_id in self.risk_engines:
71 del self.risk_engines[session_id]
72 logger.info(f"风险管理引擎已移除: {session_id}")
73 return True
74 return False
76 def validate_risk_config(self, risk_config: Dict[str, Any]) -> Dict[str, Any]:
77 """验证风险配置"""
78 try:
79 validated_config = RiskConfigModel.from_dict(risk_config)
80 return {
81 "valid": True,
82 "config": validated_config.to_dict(),
83 "errors": [],
84 "warnings": [],
85 }
86 except Exception as e:
87 return {"valid": False, "config": {}, "errors": [str(e)], "warnings": []}
89 def get_risk_presets(self) -> List[Dict[str, Any]]:
90 """获取风险配置预设"""
91 presets = RiskConfigPreset.get_presets()
92 return [
93 {
94 "name": preset.name,
95 "description": preset.description,
96 "config": preset.config.to_dict(),
97 "is_default": preset.is_default,
98 }
99 for preset in presets
100 ]
102 def get_risk_preset_by_name(self, name: str) -> Optional[Dict[str, Any]]:
103 """根据名称获取风险配置预设"""
104 preset = RiskConfigPreset.get_preset_by_name(name)
105 if preset:
106 return {
107 "name": preset.name,
108 "description": preset.description,
109 "config": preset.config.to_dict(),
110 "is_default": preset.is_default,
111 }
112 return None
114 def get_default_risk_config(self) -> Dict[str, Any]:
115 """获取默认风险配置"""
116 return RiskConfigModel.get_default_config().to_dict()
118 def get_conservative_risk_config(self) -> Dict[str, Any]:
119 """获取保守风险配置"""
120 return RiskConfigModel.get_conservative_config().to_dict()
122 def get_aggressive_risk_config(self) -> Dict[str, Any]:
123 """获取激进风险配置"""
124 return RiskConfigModel.get_aggressive_config().to_dict()
126 def update_risk_config(self, session_id: str, new_config: Dict[str, Any]) -> bool:
127 """更新风险配置"""
128 try:
129 # 验证新配置
130 validation_result = self.validate_risk_config(new_config)
131 if not validation_result["valid"]:
132 logger.error(f"风险配置验证失败: {validation_result['errors']}")
133 return False
135 # 获取现有的风险管理引擎
136 risk_engine = self.get_risk_engine(session_id)
137 if not risk_engine:
138 logger.warning(f"风险管理引擎不存在: {session_id}")
139 return False
141 # 更新配置
142 risk_engine.risk_config = new_config
143 risk_engine.max_position_ratio = new_config.get("max_position_ratio", 0.1)
144 risk_engine.stop_loss_ratio = new_config.get("stop_loss_ratio", 0.05)
145 risk_engine.max_drawdown = new_config.get("max_drawdown", 0.15)
146 risk_engine.allowed_symbols = new_config.get("allowed_symbols", [])
147 risk_engine.max_single_order_ratio = new_config.get(
148 "max_single_order_ratio", 0.05
149 )
150 risk_engine.max_daily_loss_ratio = new_config.get(
151 "max_daily_loss_ratio", 0.02
152 )
153 risk_engine.min_cash_ratio = new_config.get("min_cash_ratio", 0.1)
154 risk_engine.max_leverage = new_config.get("max_leverage", 1.0)
156 print(f"✅ 风险配置已更新: {session_id}")
157 return True
159 except Exception as e:
160 print(f"❌ 更新风险配置失败: {e}")
161 return False
163 def get_risk_summary(self, session_id: str) -> Optional[Dict[str, Any]]:
164 """获取风险摘要"""
165 risk_engine = self.get_risk_engine(session_id)
166 if not risk_engine:
167 return None
169 return risk_engine.get_risk_summary()
171 def get_risk_events(
172 self, session_id: str, limit: int = 100
173 ) -> List[Dict[str, Any]]:
174 """获取风险事件"""
175 risk_engine = self.get_risk_engine(session_id)
176 if not risk_engine:
177 return []
179 events = risk_engine.risk_events[-limit:] if risk_engine.risk_events else []
180 return [
181 {
182 "timestamp": event["timestamp"].isoformat(),
183 "event_type": event["event_type"],
184 "data": event["data"],
185 }
186 for event in events
187 ]
189 def reset_daily_risk_metrics(self, session_id: str) -> bool:
190 """重置日度风险指标"""
191 risk_engine = self.get_risk_engine(session_id)
192 if not risk_engine:
193 return False
195 risk_engine.reset_daily_metrics()
196 return True
198 def check_session_risk(self, session_id: str) -> Optional[Dict[str, Any]]:
199 """检查会话风险"""
200 risk_engine = self.get_risk_engine(session_id)
201 if not risk_engine:
202 return None
204 # 这里需要获取投资组合信息
205 # 简化处理,返回风险摘要
206 return risk_engine.get_risk_summary()
208 def get_risk_recommendations(self, session_id: str) -> List[str]:
209 """获取风险建议"""
210 risk_engine = self.get_risk_engine(session_id)
211 if not risk_engine:
212 return []
214 recommendations = []
216 # 基于当前风险状态提供建议
217 if risk_engine.current_drawdown > risk_engine.max_drawdown * Decimal("0.8"):
218 recommendations.append("当前回撤较大,建议减少仓位或暂停交易")
220 if risk_engine.daily_pnl < 0:
221 recommendations.append("当日亏损,建议检查交易策略")
223 if len(risk_engine.risk_events) > 10:
224 recommendations.append("风险事件较多,建议检查风险配置")
226 return recommendations