Coverage for core/services/risk_service.py: 48.28%

116 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-13 18:58 +0000

1""" 

2风险管理服务 

3""" 

4 

5import logging 

6from datetime import datetime 

7from decimal import Decimal 

8from typing import Any, Dict, List, Optional 

9 

10from core.models.risk_config import (RiskConfigModel, RiskConfigPreset, 

11 RiskConfigTemplate) 

12from core.repositories.trading_repository import TradingRepository 

13from core.trading.risk import RiskEngine 

14 

15logger = logging.getLogger(__name__) 

16 

17 

18class RiskService: 

19 """风险管理服务""" 

20 

21 def __init__( 

22 self, user_id: str, trading_repository: Optional[TradingRepository] = None 

23 ): 

24 self.user_id = user_id 

25 self.trading_repo = trading_repository or TradingRepository() 

26 self.risk_engines: Dict[str, RiskEngine] = {} 

27 

28 def create_risk_engine( 

29 self, session_id: str, risk_config: Dict[str, Any] 

30 ) -> RiskEngine: 

31 """创建风险管理引擎""" 

32 try: 

33 # 验证风险配置 

34 validated_config = RiskConfigModel.from_dict(risk_config) 

35 

36 # 创建风险管理引擎 

37 risk_engine = RiskEngine(self.user_id, validated_config.to_dict()) 

38 

39 # 设置回调函数 

40 def on_risk_alert(message: str): 

41 logger.warning(f"风险告警: {message}") 

42 # 这里可以添加告警通知逻辑 

43 

44 def on_risk_violation(message: str, errors: List[str]): 

45 logger.error(f"风险违规: {message}") 

46 for error in errors: 

47 logger.error(f" - {error}") 

48 # 这里可以添加风险违规处理逻辑 

49 

50 risk_engine.set_callbacks( 

51 on_risk_alert=on_risk_alert, on_risk_violation=on_risk_violation 

52 ) 

53 

54 # 存储风险管理引擎 

55 self.risk_engines[session_id] = risk_engine 

56 

57 logger.info(f"风险管理引擎已创建: {session_id}") 

58 return risk_engine 

59 

60 except Exception as e: 

61 logger.error(f"创建风险管理引擎失败: {e}") 

62 raise 

63 

64 def get_risk_engine(self, session_id: str) -> Optional[RiskEngine]: 

65 """获取风险管理引擎""" 

66 return self.risk_engines.get(session_id) 

67 

68 def remove_risk_engine(self, session_id: str) -> bool: 

69 """移除风险管理引擎""" 

70 if session_id in self.risk_engines: 

71 del self.risk_engines[session_id] 

72 logger.info(f"风险管理引擎已移除: {session_id}") 

73 return True 

74 return False 

75 

76 def validate_risk_config(self, risk_config: Dict[str, Any]) -> Dict[str, Any]: 

77 """验证风险配置""" 

78 try: 

79 validated_config = RiskConfigModel.from_dict(risk_config) 

80 return { 

81 "valid": True, 

82 "config": validated_config.to_dict(), 

83 "errors": [], 

84 "warnings": [], 

85 } 

86 except Exception as e: 

87 return {"valid": False, "config": {}, "errors": [str(e)], "warnings": []} 

88 

89 def get_risk_presets(self) -> List[Dict[str, Any]]: 

90 """获取风险配置预设""" 

91 presets = RiskConfigPreset.get_presets() 

92 return [ 

93 { 

94 "name": preset.name, 

95 "description": preset.description, 

96 "config": preset.config.to_dict(), 

97 "is_default": preset.is_default, 

98 } 

99 for preset in presets 

100 ] 

101 

102 def get_risk_preset_by_name(self, name: str) -> Optional[Dict[str, Any]]: 

103 """根据名称获取风险配置预设""" 

104 preset = RiskConfigPreset.get_preset_by_name(name) 

105 if preset: 

106 return { 

107 "name": preset.name, 

108 "description": preset.description, 

109 "config": preset.config.to_dict(), 

110 "is_default": preset.is_default, 

111 } 

112 return None 

113 

114 def get_default_risk_config(self) -> Dict[str, Any]: 

115 """获取默认风险配置""" 

116 return RiskConfigModel.get_default_config().to_dict() 

117 

118 def get_conservative_risk_config(self) -> Dict[str, Any]: 

119 """获取保守风险配置""" 

120 return RiskConfigModel.get_conservative_config().to_dict() 

121 

122 def get_aggressive_risk_config(self) -> Dict[str, Any]: 

123 """获取激进风险配置""" 

124 return RiskConfigModel.get_aggressive_config().to_dict() 

125 

126 def update_risk_config(self, session_id: str, new_config: Dict[str, Any]) -> bool: 

127 """更新风险配置""" 

128 try: 

129 # 验证新配置 

130 validation_result = self.validate_risk_config(new_config) 

131 if not validation_result["valid"]: 

132 logger.error(f"风险配置验证失败: {validation_result['errors']}") 

133 return False 

134 

135 # 获取现有的风险管理引擎 

136 risk_engine = self.get_risk_engine(session_id) 

137 if not risk_engine: 

138 logger.warning(f"风险管理引擎不存在: {session_id}") 

139 return False 

140 

141 # 更新配置 

142 risk_engine.risk_config = new_config 

143 risk_engine.max_position_ratio = new_config.get("max_position_ratio", 0.1) 

144 risk_engine.stop_loss_ratio = new_config.get("stop_loss_ratio", 0.05) 

145 risk_engine.max_drawdown = new_config.get("max_drawdown", 0.15) 

146 risk_engine.allowed_symbols = new_config.get("allowed_symbols", []) 

147 risk_engine.max_single_order_ratio = new_config.get( 

148 "max_single_order_ratio", 0.05 

149 ) 

150 risk_engine.max_daily_loss_ratio = new_config.get( 

151 "max_daily_loss_ratio", 0.02 

152 ) 

153 risk_engine.min_cash_ratio = new_config.get("min_cash_ratio", 0.1) 

154 risk_engine.max_leverage = new_config.get("max_leverage", 1.0) 

155 

156 print(f"✅ 风险配置已更新: {session_id}") 

157 return True 

158 

159 except Exception as e: 

160 print(f"❌ 更新风险配置失败: {e}") 

161 return False 

162 

163 def get_risk_summary(self, session_id: str) -> Optional[Dict[str, Any]]: 

164 """获取风险摘要""" 

165 risk_engine = self.get_risk_engine(session_id) 

166 if not risk_engine: 

167 return None 

168 

169 return risk_engine.get_risk_summary() 

170 

171 def get_risk_events( 

172 self, session_id: str, limit: int = 100 

173 ) -> List[Dict[str, Any]]: 

174 """获取风险事件""" 

175 risk_engine = self.get_risk_engine(session_id) 

176 if not risk_engine: 

177 return [] 

178 

179 events = risk_engine.risk_events[-limit:] if risk_engine.risk_events else [] 

180 return [ 

181 { 

182 "timestamp": event["timestamp"].isoformat(), 

183 "event_type": event["event_type"], 

184 "data": event["data"], 

185 } 

186 for event in events 

187 ] 

188 

189 def reset_daily_risk_metrics(self, session_id: str) -> bool: 

190 """重置日度风险指标""" 

191 risk_engine = self.get_risk_engine(session_id) 

192 if not risk_engine: 

193 return False 

194 

195 risk_engine.reset_daily_metrics() 

196 return True 

197 

198 def check_session_risk(self, session_id: str) -> Optional[Dict[str, Any]]: 

199 """检查会话风险""" 

200 risk_engine = self.get_risk_engine(session_id) 

201 if not risk_engine: 

202 return None 

203 

204 # 这里需要获取投资组合信息 

205 # 简化处理,返回风险摘要 

206 return risk_engine.get_risk_summary() 

207 

208 def get_risk_recommendations(self, session_id: str) -> List[str]: 

209 """获取风险建议""" 

210 risk_engine = self.get_risk_engine(session_id) 

211 if not risk_engine: 

212 return [] 

213 

214 recommendations = [] 

215 

216 # 基于当前风险状态提供建议 

217 if risk_engine.current_drawdown > risk_engine.max_drawdown * Decimal("0.8"): 

218 recommendations.append("当前回撤较大,建议减少仓位或暂停交易") 

219 

220 if risk_engine.daily_pnl < 0: 

221 recommendations.append("当日亏损,建议检查交易策略") 

222 

223 if len(risk_engine.risk_events) > 10: 

224 recommendations.append("风险事件较多,建议检查风险配置") 

225 

226 return recommendations