Coverage for core/services/broker_service.py: 22.02%

109 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-13 18:58 +0000

1""" 

2券商服务 

3""" 

4 

5from decimal import Decimal 

6from typing import List, Optional 

7 

8from core.models.broker import (Broker, BrokerCreate, BrokerResponse, 

9 BrokerUpdate, DataSource, DataSourceCreate, 

10 DataSourceUpdate, FeeCalculateRequest, 

11 FeeCalculateResponse, FeeConfig, TestResponse, 

12 TradingFee) 

13from core.models.user import User 

14from core.repositories.broker_repository import BrokerRepository 

15from core.trading.utils.fee_calculator import FeeCalculator 

16 

17 

18class BrokerService: 

19 """券商服务""" 

20 

21 def __init__(self): 

22 self.broker_repository = BrokerRepository() 

23 

24 # 券商管理方法 

25 def create_broker( 

26 self, broker_data: BrokerCreate, current_user: User 

27 ) -> Optional[Broker]: 

28 """创建券商""" 

29 # 设置用户ID 

30 broker_data.user_id = current_user.id 

31 

32 # 检查券商代码是否已存在(同一用户下) 

33 existing_broker = self.broker_repository.get_broker_by_code(broker_data.code) 

34 if existing_broker and existing_broker.user_id == current_user.id: 

35 return None 

36 

37 return self.broker_repository.create_broker(broker_data) 

38 

39 def get_broker_by_id(self, broker_id: str, current_user: User) -> Optional[Broker]: 

40 """获取券商详情""" 

41 broker = self.broker_repository.get_broker_by_id(broker_id) 

42 if not broker: 

43 return None 

44 

45 # 用户只能查看自己的券商 

46 if broker.user_id != current_user.id: 

47 return None 

48 

49 return broker 

50 

51 def get_all_brokers(self, current_user: User) -> List[Broker]: 

52 """获取券商列表""" 

53 # 用户只能查看自己的券商 

54 return self.broker_repository.get_brokers_by_user(current_user.id) 

55 

56 def update_broker( 

57 self, broker_id: str, broker_data: BrokerUpdate, current_user: User 

58 ) -> Optional[Broker]: 

59 """更新券商""" 

60 broker = self.get_broker_by_id(broker_id, current_user) 

61 if not broker: 

62 return None 

63 

64 return self.broker_repository.update_broker(broker_id, broker_data) 

65 

66 def delete_broker(self, broker_id: str, current_user: User) -> bool: 

67 """删除券商""" 

68 broker = self.get_broker_by_id(broker_id, current_user) 

69 if not broker: 

70 return False 

71 

72 return self.broker_repository.delete_broker(broker_id) 

73 

74 def test_broker_connection( 

75 self, broker_id: str, current_user: User 

76 ) -> TestResponse: 

77 """测试券商连接""" 

78 broker = self.get_broker_by_id(broker_id, current_user) 

79 if not broker: 

80 return TestResponse(success=False, message="券商不存在") 

81 

82 # 这里应该实现具体的连接测试逻辑 

83 # 暂时返回模拟结果 

84 try: 

85 # 模拟连接测试 

86 import time 

87 

88 time.sleep(0.1) # 模拟网络延迟 

89 

90 return TestResponse(success=True, message="连接成功", response_time=100.0) 

91 except Exception as e: 

92 return TestResponse(success=False, message=f"连接失败: {str(e)}") 

93 

94 # 数据源管理方法 

95 def create_data_source( 

96 self, data_source_data: DataSourceCreate, current_user: User 

97 ) -> Optional[DataSource]: 

98 """创建数据源(仅管理员可操作)""" 

99 if current_user.user_type != "admin": 

100 return None 

101 

102 # 检查关联券商是否存在 

103 broker = self.broker_repository.get_broker_by_id(data_source_data.broker_id) 

104 if not broker: 

105 return None 

106 

107 return self.broker_repository.create_data_source(data_source_data) 

108 

109 def get_data_source_by_id( 

110 self, data_source_id: str, current_user: User 

111 ) -> Optional[DataSource]: 

112 """获取数据源详情""" 

113 data_source = self.broker_repository.get_data_source_by_id(data_source_id) 

114 if not data_source: 

115 return None 

116 

117 # 普通用户只能查看活跃的数据源 

118 if current_user.user_type != "admin" and data_source.status != "active": 

119 return None 

120 

121 return data_source 

122 

123 def get_all_data_sources(self, current_user: User) -> List[DataSource]: 

124 """获取数据源列表""" 

125 data_sources = self.broker_repository.get_all_data_sources() 

126 

127 # 普通用户只能查看活跃的数据源 

128 if current_user.user_type != "admin": 

129 data_sources = [ds for ds in data_sources if ds.status == "active"] 

130 

131 return data_sources 

132 

133 def update_data_source( 

134 self, 

135 data_source_id: str, 

136 data_source_data: DataSourceUpdate, 

137 current_user: User, 

138 ) -> Optional[DataSource]: 

139 """更新数据源(仅管理员可操作)""" 

140 if current_user.user_type != "admin": 

141 return None 

142 

143 return self.broker_repository.update_data_source( 

144 data_source_id, data_source_data 

145 ) 

146 

147 def delete_data_source(self, data_source_id: str, current_user: User) -> bool: 

148 """删除数据源(仅管理员可操作)""" 

149 if current_user.user_type != "admin": 

150 return False 

151 

152 return self.broker_repository.delete_data_source(data_source_id) 

153 

154 def test_data_source_connection( 

155 self, data_source_id: str, current_user: User 

156 ) -> TestResponse: 

157 """测试数据源连接""" 

158 data_source = self.get_data_source_by_id(data_source_id, current_user) 

159 if not data_source: 

160 return TestResponse(success=False, message="数据源不存在") 

161 

162 # 获取关联的券商信息 

163 broker = self.broker_repository.get_broker_by_id(data_source.broker_id) 

164 if not broker: 

165 return TestResponse(success=False, message="关联券商不存在") 

166 

167 # 这里应该实现具体的数据源连接测试逻辑 

168 # 暂时返回模拟结果 

169 try: 

170 # 模拟连接测试 

171 import time 

172 

173 time.sleep(0.1) # 模拟网络延迟 

174 

175 return TestResponse( 

176 success=True, message="数据源连接成功", response_time=100.0 

177 ) 

178 except Exception as e: 

179 return TestResponse(success=False, message=f"数据源连接失败: {str(e)}") 

180 

181 # 费用配置管理方法 

182 def get_fee_config(self, broker_id: str, current_user: User) -> Optional[FeeConfig]: 

183 """ 

184 获取券商费用配置 

185 

186 Args: 

187 broker_id: 券商ID 

188 current_user: 当前用户 

189 

190 Returns: 

191 FeeConfig: 费用配置对象,如果券商不存在或无权限返回None 

192 """ 

193 broker = self.get_broker_by_id(broker_id, current_user) 

194 if not broker: 

195 return None 

196 

197 return self.broker_repository.get_fee_config(broker_id) 

198 

199 def update_fee_config( 

200 self, broker_id: str, fee_config: FeeConfig, current_user: User 

201 ) -> Optional[FeeConfig]: 

202 """ 

203 更新券商费用配置 

204 

205 Args: 

206 broker_id: 券商ID 

207 fee_config: 费用配置对象 

208 current_user: 当前用户 

209 

210 Returns: 

211 FeeConfig: 更新后的费用配置对象,如果券商不存在或无权限返回None 

212 """ 

213 broker = self.get_broker_by_id(broker_id, current_user) 

214 if not broker: 

215 return None 

216 

217 updated_broker = self.broker_repository.update_fee_config(broker_id, fee_config) 

218 if not updated_broker: 

219 return None 

220 

221 return updated_broker.config.fee_config 

222 

223 def calculate_fee( 

224 self, broker_id: str, request: FeeCalculateRequest, current_user: User 

225 ) -> Optional[FeeCalculateResponse]: 

226 """ 

227 计算预估交易费用 

228 

229 Args: 

230 broker_id: 券商ID 

231 request: 费用计算请求 

232 current_user: 当前用户 

233 

234 Returns: 

235 FeeCalculateResponse: 费用计算结果,如果券商不存在或无权限返回None 

236 """ 

237 # 验证券商权限 

238 broker = self.get_broker_by_id(broker_id, current_user) 

239 if not broker: 

240 return None 

241 

242 # 获取费用配置 

243 fee_config = self.broker_repository.get_fee_config(broker_id) 

244 if not fee_config: 

245 fee_config = FeeConfig() # 使用默认配置 

246 

247 # 创建费用计算器 

248 calculator = FeeCalculator(fee_config) 

249 

250 # 计算费用 

251 fee_details = calculator.calculate_fee( 

252 quantity=request.quantity, 

253 price=request.price, 

254 side=request.side, 

255 market=request.market, 

256 ) 

257 

258 # 计算交易金额和总成本 

259 trade_amount = request.quantity * request.price 

260 total_cost = trade_amount + fee_details.total_fee 

261 

262 return FeeCalculateResponse( 

263 fee_details=fee_details, trade_amount=trade_amount, total_cost=total_cost 

264 )