Coverage for core/services/broker_service.py: 22.02%
109 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-13 18:58 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-13 18:58 +0000
1"""
2券商服务
3"""
5from decimal import Decimal
6from typing import List, Optional
8from core.models.broker import (Broker, BrokerCreate, BrokerResponse,
9 BrokerUpdate, DataSource, DataSourceCreate,
10 DataSourceUpdate, FeeCalculateRequest,
11 FeeCalculateResponse, FeeConfig, TestResponse,
12 TradingFee)
13from core.models.user import User
14from core.repositories.broker_repository import BrokerRepository
15from core.trading.utils.fee_calculator import FeeCalculator
18class BrokerService:
19 """券商服务"""
21 def __init__(self):
22 self.broker_repository = BrokerRepository()
24 # 券商管理方法
25 def create_broker(
26 self, broker_data: BrokerCreate, current_user: User
27 ) -> Optional[Broker]:
28 """创建券商"""
29 # 设置用户ID
30 broker_data.user_id = current_user.id
32 # 检查券商代码是否已存在(同一用户下)
33 existing_broker = self.broker_repository.get_broker_by_code(broker_data.code)
34 if existing_broker and existing_broker.user_id == current_user.id:
35 return None
37 return self.broker_repository.create_broker(broker_data)
39 def get_broker_by_id(self, broker_id: str, current_user: User) -> Optional[Broker]:
40 """获取券商详情"""
41 broker = self.broker_repository.get_broker_by_id(broker_id)
42 if not broker:
43 return None
45 # 用户只能查看自己的券商
46 if broker.user_id != current_user.id:
47 return None
49 return broker
51 def get_all_brokers(self, current_user: User) -> List[Broker]:
52 """获取券商列表"""
53 # 用户只能查看自己的券商
54 return self.broker_repository.get_brokers_by_user(current_user.id)
56 def update_broker(
57 self, broker_id: str, broker_data: BrokerUpdate, current_user: User
58 ) -> Optional[Broker]:
59 """更新券商"""
60 broker = self.get_broker_by_id(broker_id, current_user)
61 if not broker:
62 return None
64 return self.broker_repository.update_broker(broker_id, broker_data)
66 def delete_broker(self, broker_id: str, current_user: User) -> bool:
67 """删除券商"""
68 broker = self.get_broker_by_id(broker_id, current_user)
69 if not broker:
70 return False
72 return self.broker_repository.delete_broker(broker_id)
74 def test_broker_connection(
75 self, broker_id: str, current_user: User
76 ) -> TestResponse:
77 """测试券商连接"""
78 broker = self.get_broker_by_id(broker_id, current_user)
79 if not broker:
80 return TestResponse(success=False, message="券商不存在")
82 # 这里应该实现具体的连接测试逻辑
83 # 暂时返回模拟结果
84 try:
85 # 模拟连接测试
86 import time
88 time.sleep(0.1) # 模拟网络延迟
90 return TestResponse(success=True, message="连接成功", response_time=100.0)
91 except Exception as e:
92 return TestResponse(success=False, message=f"连接失败: {str(e)}")
94 # 数据源管理方法
95 def create_data_source(
96 self, data_source_data: DataSourceCreate, current_user: User
97 ) -> Optional[DataSource]:
98 """创建数据源(仅管理员可操作)"""
99 if current_user.user_type != "admin":
100 return None
102 # 检查关联券商是否存在
103 broker = self.broker_repository.get_broker_by_id(data_source_data.broker_id)
104 if not broker:
105 return None
107 return self.broker_repository.create_data_source(data_source_data)
109 def get_data_source_by_id(
110 self, data_source_id: str, current_user: User
111 ) -> Optional[DataSource]:
112 """获取数据源详情"""
113 data_source = self.broker_repository.get_data_source_by_id(data_source_id)
114 if not data_source:
115 return None
117 # 普通用户只能查看活跃的数据源
118 if current_user.user_type != "admin" and data_source.status != "active":
119 return None
121 return data_source
123 def get_all_data_sources(self, current_user: User) -> List[DataSource]:
124 """获取数据源列表"""
125 data_sources = self.broker_repository.get_all_data_sources()
127 # 普通用户只能查看活跃的数据源
128 if current_user.user_type != "admin":
129 data_sources = [ds for ds in data_sources if ds.status == "active"]
131 return data_sources
133 def update_data_source(
134 self,
135 data_source_id: str,
136 data_source_data: DataSourceUpdate,
137 current_user: User,
138 ) -> Optional[DataSource]:
139 """更新数据源(仅管理员可操作)"""
140 if current_user.user_type != "admin":
141 return None
143 return self.broker_repository.update_data_source(
144 data_source_id, data_source_data
145 )
147 def delete_data_source(self, data_source_id: str, current_user: User) -> bool:
148 """删除数据源(仅管理员可操作)"""
149 if current_user.user_type != "admin":
150 return False
152 return self.broker_repository.delete_data_source(data_source_id)
154 def test_data_source_connection(
155 self, data_source_id: str, current_user: User
156 ) -> TestResponse:
157 """测试数据源连接"""
158 data_source = self.get_data_source_by_id(data_source_id, current_user)
159 if not data_source:
160 return TestResponse(success=False, message="数据源不存在")
162 # 获取关联的券商信息
163 broker = self.broker_repository.get_broker_by_id(data_source.broker_id)
164 if not broker:
165 return TestResponse(success=False, message="关联券商不存在")
167 # 这里应该实现具体的数据源连接测试逻辑
168 # 暂时返回模拟结果
169 try:
170 # 模拟连接测试
171 import time
173 time.sleep(0.1) # 模拟网络延迟
175 return TestResponse(
176 success=True, message="数据源连接成功", response_time=100.0
177 )
178 except Exception as e:
179 return TestResponse(success=False, message=f"数据源连接失败: {str(e)}")
181 # 费用配置管理方法
182 def get_fee_config(self, broker_id: str, current_user: User) -> Optional[FeeConfig]:
183 """
184 获取券商费用配置
186 Args:
187 broker_id: 券商ID
188 current_user: 当前用户
190 Returns:
191 FeeConfig: 费用配置对象,如果券商不存在或无权限返回None
192 """
193 broker = self.get_broker_by_id(broker_id, current_user)
194 if not broker:
195 return None
197 return self.broker_repository.get_fee_config(broker_id)
199 def update_fee_config(
200 self, broker_id: str, fee_config: FeeConfig, current_user: User
201 ) -> Optional[FeeConfig]:
202 """
203 更新券商费用配置
205 Args:
206 broker_id: 券商ID
207 fee_config: 费用配置对象
208 current_user: 当前用户
210 Returns:
211 FeeConfig: 更新后的费用配置对象,如果券商不存在或无权限返回None
212 """
213 broker = self.get_broker_by_id(broker_id, current_user)
214 if not broker:
215 return None
217 updated_broker = self.broker_repository.update_fee_config(broker_id, fee_config)
218 if not updated_broker:
219 return None
221 return updated_broker.config.fee_config
223 def calculate_fee(
224 self, broker_id: str, request: FeeCalculateRequest, current_user: User
225 ) -> Optional[FeeCalculateResponse]:
226 """
227 计算预估交易费用
229 Args:
230 broker_id: 券商ID
231 request: 费用计算请求
232 current_user: 当前用户
234 Returns:
235 FeeCalculateResponse: 费用计算结果,如果券商不存在或无权限返回None
236 """
237 # 验证券商权限
238 broker = self.get_broker_by_id(broker_id, current_user)
239 if not broker:
240 return None
242 # 获取费用配置
243 fee_config = self.broker_repository.get_fee_config(broker_id)
244 if not fee_config:
245 fee_config = FeeConfig() # 使用默认配置
247 # 创建费用计算器
248 calculator = FeeCalculator(fee_config)
250 # 计算费用
251 fee_details = calculator.calculate_fee(
252 quantity=request.quantity,
253 price=request.price,
254 side=request.side,
255 market=request.market,
256 )
258 # 计算交易金额和总成本
259 trade_amount = request.quantity * request.price
260 total_cost = trade_amount + fee_details.total_fee
262 return FeeCalculateResponse(
263 fee_details=fee_details, trade_amount=trade_amount, total_cost=total_cost
264 )