Coverage for core/trading/strategies/bollinger_strategy.py: 18.88%

143 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-13 18:58 +0000

1""" 

2布林带策略 

3""" 

4 

5import math 

6from datetime import datetime 

7from decimal import Decimal 

8from typing import Any, Dict, List, Optional 

9 

10from core.models.trading import (MarketData, Order, OrderSide, OrderType, 

11 StrategyContext) 

12 

13from .base_strategy import BaseStrategy 

14 

15 

16class BollingerBandsStrategy(BaseStrategy): 

17 """布林带策略 - 基于布林带的技术分析策略""" 

18 

19 def __init__(self, name: str, config: Dict[str, Any]): 

20 super().__init__(name, config) 

21 

22 # 布林带参数 

23 self.period = config.get("period", 20) # 移动平均周期 

24 self.std_dev = config.get("std_dev", 2.0) # 标准差倍数 

25 self.position_size = config.get("position_size", 0.04) # 仓位大小 

26 

27 # 价格历史 - 动态管理 

28 self.price_history: Dict[str, List[Decimal]] = {} 

29 self.middle_band: Dict[str, List[Decimal]] = {} # 中轨(移动平均) 

30 self.upper_band: Dict[str, List[Decimal]] = {} # 上轨 

31 self.lower_band: Dict[str, List[Decimal]] = {} # 下轨 

32 self.last_signals: Dict[str, str] = {} # "buy", "sell", "hold" 

33 

34 def initialize(self, context: StrategyContext): 

35 """策略初始化""" 

36 self.context = context 

37 self.trading_engine = context.trading_engine 

38 self.quote_adapter = context.quote_adapter 

39 

40 self.log_message(f"✅ 布林带策略已初始化: {self.name}") 

41 self.log_message(f" 移动平均周期: {self.period}") 

42 self.log_message(f" 标准差倍数: {self.std_dev}") 

43 self.log_message(f" 仓位大小: {self.position_size}") 

44 

45 def on_market_data(self, market_data: MarketData): 

46 """市场数据回调""" 

47 symbol = market_data.symbol 

48 

49 # 动态初始化股票数据 

50 if symbol not in self.price_history: 

51 self.price_history[symbol] = [] 

52 self.middle_band[symbol] = [] 

53 self.upper_band[symbol] = [] 

54 self.lower_band[symbol] = [] 

55 self.last_signals[symbol] = "hold" 

56 self.log_message(f"🆕 初始化股票数据: {symbol}") 

57 

58 # 更新价格历史 

59 self._update_price_history(market_data) 

60 

61 # 检查是否足够的历史数据 

62 if len(self.price_history[market_data.symbol]) < self.period: 

63 return 

64 

65 # 计算布林带 

66 bollinger_data = self._calculate_bollinger_bands(market_data.symbol) 

67 if bollinger_data is None: 

68 return 

69 

70 # 生成交易信号 

71 signal = self._generate_signal( 

72 market_data.symbol, market_data.close, bollinger_data 

73 ) 

74 

75 # 执行交易 

76 if signal != "hold": 

77 self._execute_signal(market_data.symbol, signal, market_data.close) 

78 

79 # 检查止损(策略完全被动化,在每次市场数据更新时检查) 

80 self._check_stop_loss() 

81 

82 def on_trade_update(self, trade): 

83 """交易更新回调""" 

84 self.log_message( 

85 f"📊 布林带策略交易更新: {trade.symbol} {trade.side} {trade.quantity} @ {trade.price}" 

86 ) 

87 self.trades.append(trade) 

88 self.total_trades += 1 

89 

90 # 更新胜率统计 

91 if trade.side == OrderSide.SELL: 

92 # 简化:假设卖出盈利 

93 self.winning_trades += 1 

94 

95 # 注意:策略完全被动化,不再需要定时器回调 

96 # 止损检查在 on_market_data 中处理 

97 

98 def _update_price_history(self, market_data: MarketData): 

99 """更新价格历史""" 

100 symbol = market_data.symbol 

101 if symbol not in self.price_history: 

102 self.price_history[symbol] = [] 

103 

104 self.price_history[symbol].append(market_data.close) 

105 

106 # 保持历史数据长度 

107 max_history = self.period * 3 

108 if len(self.price_history[symbol]) > max_history: 

109 self.price_history[symbol] = self.price_history[symbol][-max_history:] 

110 

111 def _calculate_bollinger_bands(self, symbol: str) -> Optional[Dict[str, Decimal]]: 

112 """计算布林带指标""" 

113 if symbol not in self.price_history: 

114 return None 

115 

116 prices = self.price_history[symbol] 

117 if len(prices) < self.period: 

118 return None 

119 

120 # 计算移动平均(中轨) 

121 recent_prices = prices[-self.period :] 

122 middle_band = sum(recent_prices) / Decimal(str(len(recent_prices))) 

123 

124 # 计算标准差 

125 variance = Decimal("0") 

126 for price in recent_prices: 

127 diff = price - middle_band 

128 variance += diff * diff 

129 variance = variance / Decimal(str(len(recent_prices))) 

130 std_deviation = Decimal(str(math.sqrt(float(variance)))) 

131 

132 # 计算上轨和下轨 

133 upper_band = middle_band + (std_deviation * Decimal(str(self.std_dev))) 

134 lower_band = middle_band - (std_deviation * Decimal(str(self.std_dev))) 

135 

136 # 更新历史数据 

137 self.middle_band[symbol].append(middle_band) 

138 self.upper_band[symbol].append(upper_band) 

139 self.lower_band[symbol].append(lower_band) 

140 

141 # 保持历史数据长度 

142 max_history = 100 

143 if len(self.middle_band[symbol]) > max_history: 

144 self.middle_band[symbol] = self.middle_band[symbol][-max_history:] 

145 self.upper_band[symbol] = self.upper_band[symbol][-max_history:] 

146 self.lower_band[symbol] = self.lower_band[symbol][-max_history:] 

147 

148 return { 

149 "middle": middle_band, 

150 "upper": upper_band, 

151 "lower": lower_band, 

152 "std_dev": std_deviation, 

153 } 

154 

155 def _generate_signal( 

156 self, symbol: str, current_price: Decimal, bollinger_data: Dict[str, Decimal] 

157 ) -> str: 

158 """生成交易信号""" 

159 last_signal = self.last_signals.get(symbol, "hold") 

160 

161 upper_band = bollinger_data["upper"] 

162 lower_band = bollinger_data["lower"] 

163 middle_band = bollinger_data["middle"] 

164 

165 # 价格触及下轨,买入信号 

166 if current_price <= lower_band and last_signal != "buy": 

167 self.last_signals[symbol] = "buy" 

168 return "buy" 

169 # 价格触及上轨,卖出信号 

170 elif current_price >= upper_band and last_signal != "sell": 

171 self.last_signals[symbol] = "sell" 

172 return "sell" 

173 # 价格回到中轨附近,平仓信号 

174 elif ( 

175 abs(current_price - middle_band) / middle_band < Decimal("0.02") 

176 and last_signal != "hold" 

177 ): 

178 self.last_signals[symbol] = "hold" 

179 return "sell" # 平仓 

180 else: 

181 return "hold" 

182 

183 def _execute_signal(self, symbol: str, signal: str, current_price: Decimal): 

184 """执行交易信号""" 

185 if not self.trading_engine: 

186 return 

187 

188 portfolio = self.get_portfolio() 

189 

190 if signal == "buy": 

191 # 买入逻辑 

192 # 检查风险限制 

193 risk_check = self.check_risk_limits(symbol, Decimal("1"), current_price) 

194 if not risk_check["valid"]: 

195 self.log_message( 

196 f"❌ 布林带买入信号被风险控制阻止: {risk_check['errors']}", "warn" 

197 ) 

198 return 

199 

200 # 计算买入数量 

201 position_size = self.calculate_position_size( 

202 symbol, current_price, self.position_size 

203 ) 

204 

205 if position_size > 0: 

206 order = Order( 

207 symbol=symbol, 

208 side=OrderSide.BUY, 

209 order_type=OrderType.MARKET, 

210 quantity=position_size, 

211 price=None, 

212 session_id=self.context.trading_session_id, 

213 ) 

214 

215 success = self.submit_order(order) 

216 if success: 

217 self.log_message( 

218 f"🟢 布林带买入信号执行: {symbol} {position_size} @ {current_price}", 

219 "info", 

220 ) 

221 else: 

222 self.log_message(f"❌ 布林带买入订单失败: {symbol}", "error") 

223 

224 elif signal == "sell": 

225 # 卖出逻辑 

226 # 查找当前持仓 

227 current_position = None 

228 for position in portfolio.positions: 

229 if position.symbol == symbol: 

230 current_position = position 

231 break 

232 

233 if current_position and current_position.quantity > 0: 

234 # 卖出全部持仓 

235 order = Order( 

236 symbol=symbol, 

237 side=OrderSide.SELL, 

238 order_type=OrderType.MARKET, 

239 quantity=current_position.quantity, 

240 price=None, 

241 session_id=self.context.trading_session_id, 

242 ) 

243 

244 success = self.submit_order(order) 

245 if success: 

246 self.log_message( 

247 f"🔴 布林带卖出信号执行: {symbol} {current_position.quantity} @ {current_price}", 

248 "info", 

249 ) 

250 else: 

251 self.log_message(f"❌ 布林带卖出订单失败: {symbol}", "error") 

252 

253 def _check_stop_loss(self): 

254 """检查止损""" 

255 portfolio = self.get_portfolio() 

256 risk_limits = self.get_risk_limits() 

257 

258 for position in portfolio.positions: 

259 if position.unrealized_pnl < 0: 

260 # 计算亏损比例 

261 loss_ratio = abs(position.unrealized_pnl) / ( 

262 position.avg_price * position.quantity 

263 ) 

264 

265 if loss_ratio >= risk_limits.stop_loss_ratio: 

266 # 触发止损 

267 order = Order( 

268 symbol=position.symbol, 

269 side=OrderSide.SELL, 

270 order_type=OrderType.MARKET, 

271 quantity=position.quantity, 

272 price=None, 

273 session_id=self.context.trading_session_id, 

274 ) 

275 

276 success = self.submit_order(order) 

277 if success: 

278 self.log_message( 

279 f"🛑 布林带止损触发: {position.symbol} {position.quantity}", 

280 "warn", 

281 ) 

282 

283 def get_bollinger_data(self, symbol: str) -> Optional[Dict[str, Decimal]]: 

284 """获取当前布林带数据""" 

285 if ( 

286 symbol in self.middle_band 

287 and self.middle_band[symbol] 

288 and symbol in self.upper_band 

289 and self.upper_band[symbol] 

290 and symbol in self.lower_band 

291 and self.lower_band[symbol] 

292 ): 

293 

294 return { 

295 "middle": self.middle_band[symbol][-1], 

296 "upper": self.upper_band[symbol][-1], 

297 "lower": self.lower_band[symbol][-1], 

298 } 

299 return None 

300 

301 def get_bollinger_history(self, symbol: str) -> Dict[str, List[Decimal]]: 

302 """获取布林带历史数据""" 

303 return { 

304 "middle": self.middle_band.get(symbol, []), 

305 "upper": self.upper_band.get(symbol, []), 

306 "lower": self.lower_band.get(symbol, []), 

307 }