Coverage for api/v1/endpoints/api_test.py: 66.89%

586 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-13 18:58 +0000

1""" 

2API测试端点 

3提供长桥OpenAPI的测试接口 

4""" 

5 

6from datetime import date, datetime 

7from decimal import Decimal 

8from typing import Any, Dict, List, Optional, Union 

9 

10from fastapi import APIRouter, Depends, HTTPException 

11from pydantic import BaseModel, Field, field_validator 

12 

13from core.data_source.adapters.quote_adapter import QuoteAdapter 

14from core.middleware.auth_middleware import get_current_user 

15from core.models.user import User 

16 

17router = APIRouter(prefix="/api-test", tags=["API测试"]) 

18 

19 

20# 请求模型定义 

21class StaticInfoRequest(BaseModel): 

22 """获取标的基础信息请求""" 

23 

24 symbols: Union[str, List[str]] = Field( 

25 ..., description="股票代码列表或逗号分隔的字符串", example=["YINN.US", "700.HK"] 

26 ) 

27 

28 @field_validator("symbols") 

29 @classmethod 

30 def validate_symbols(cls, v): 

31 if isinstance(v, str): 

32 return [s.strip() for s in v.split(",") if s.strip()] 

33 return v 

34 

35 

36class QuoteRequest(BaseModel): 

37 """获取标的实时行情请求""" 

38 

39 symbols: Union[str, List[str]] = Field( 

40 ..., description="股票代码列表或逗号分隔的字符串", example=["YINN.US", "700.HK"] 

41 ) 

42 

43 @field_validator("symbols") 

44 @classmethod 

45 def validate_symbols(cls, v): 

46 if isinstance(v, str): 

47 return [s.strip() for s in v.split(",") if s.strip()] 

48 return v 

49 

50 

51class DepthRequest(BaseModel): 

52 """获取标的盘口请求""" 

53 

54 symbol: str = Field(..., description="股票代码", example="YINN.US") 

55 

56 

57class TradesRequest(BaseModel): 

58 """获取标的成交明细请求""" 

59 

60 symbol: str = Field(..., description="股票代码", example="YINN.US") 

61 count: int = Field(10, description="获取数量", ge=1, le=1000) 

62 

63 

64class IntradayRequest(BaseModel): 

65 """获取标的分时请求""" 

66 

67 symbol: str = Field(..., description="股票代码", example="YINN.US") 

68 trade_sessions: str = Field("Intraday", description="交易时段", example="Intraday") 

69 

70 

71class CandlesticksRequest(BaseModel): 

72 """获取标的K线请求""" 

73 

74 symbol: str = Field(..., description="股票代码", example="YINN.US") 

75 period: str = Field("Day", description="K线周期", example="Day") 

76 count: int = Field(10, description="获取数量", ge=1, le=1000) 

77 adjust_type: str = Field("NoAdjust", description="复权类型", example="NoAdjust") 

78 trade_sessions: str = Field("Intraday", description="交易时段", example="Intraday") 

79 

80 

81class TradingDaysRequest(BaseModel): 

82 """获取市场交易日请求""" 

83 

84 market: str = Field(..., description="市场类型", example="US") 

85 begin: Union[date, datetime] = Field(..., description="开始日期") 

86 end: Union[date, datetime] = Field(..., description="结束日期") 

87 

88 @field_validator("begin", "end") 

89 @classmethod 

90 def validate_date(cls, v): 

91 if isinstance(v, datetime): 

92 return v.date() 

93 return v 

94 

95 

96class CalcIndexesRequest(BaseModel): 

97 """获取标的计算指标请求""" 

98 

99 symbols: Union[str, List[str]] = Field( 

100 ..., description="股票代码列表或逗号分隔的字符串", example=["YINN.US", "700.HK"] 

101 ) 

102 indexes: Union[str, List[str]] = Field( 

103 ..., 

104 description="计算指标列表或逗号分隔的字符串", 

105 example=["LastDone", "ChangeRate"], 

106 ) 

107 

108 @field_validator("symbols", "indexes") 

109 @classmethod 

110 def validate_list_fields(cls, v): 

111 if isinstance(v, str): 

112 return [s.strip() for s in v.split(",") if s.strip()] 

113 return v 

114 

115 

116class HistoryCandlesticksRequest(BaseModel): 

117 """获取标的历史K线请求""" 

118 

119 symbol: str = Field(..., description="股票代码", example="YINN.US") 

120 period: str = Field("Day", description="K线周期", example="Day") 

121 adjust_type: str = Field("NoAdjust", description="复权类型", example="NoAdjust") 

122 start: Union[date, datetime] = Field(..., description="开始日期") 

123 end: Union[date, datetime] = Field(..., description="结束日期") 

124 trade_sessions: str = Field("Intraday", description="交易时段", example="Intraday") 

125 

126 @field_validator("start", "end") 

127 @classmethod 

128 def validate_date(cls, v): 

129 if isinstance(v, datetime): 

130 return v.date() 

131 return v 

132 

133 

134class SubscribeRequest(BaseModel): 

135 """订阅行情数据请求""" 

136 

137 symbols: Union[str, List[str]] = Field( 

138 ..., description="股票代码列表或逗号分隔的字符串", example=["YINN.US", "700.HK"] 

139 ) 

140 sub_types: Union[str, List[str]] = Field( 

141 ..., description="订阅类型列表或逗号分隔的字符串", example=["Quote", "Depth"] 

142 ) 

143 is_first_push: bool = Field(False, description="是否立即推送") 

144 

145 @field_validator("symbols", "sub_types") 

146 @classmethod 

147 def validate_list_fields(cls, v): 

148 if isinstance(v, str): 

149 return [s.strip() for s in v.split(",") if s.strip()] 

150 return v 

151 

152 

153class UnsubscribeRequest(BaseModel): 

154 """取消订阅行情数据请求""" 

155 

156 symbols: Union[str, List[str]] = Field( 

157 ..., description="股票代码列表或逗号分隔的字符串", example=["YINN.US", "700.HK"] 

158 ) 

159 sub_types: Union[str, List[str]] = Field( 

160 ..., description="订阅类型列表或逗号分隔的字符串", example=["Quote", "Depth"] 

161 ) 

162 

163 @field_validator("symbols", "sub_types") 

164 @classmethod 

165 def validate_list_fields(cls, v): 

166 if isinstance(v, str): 

167 return [s.strip() for s in v.split(",") if s.strip()] 

168 return v 

169 

170 

171class RealtimeQuoteRequest(BaseModel): 

172 """实时价格推送请求""" 

173 

174 symbols: Union[str, List[str]] = Field( 

175 ..., description="股票代码列表或逗号分隔的字符串", example=["YINN.US", "700.HK"] 

176 ) 

177 

178 @field_validator("symbols") 

179 @classmethod 

180 def validate_symbols(cls, v): 

181 if isinstance(v, str): 

182 return [s.strip() for s in v.split(",") if s.strip()] 

183 return v 

184 

185 

186class RealtimeDepthRequest(BaseModel): 

187 """实时盘口推送请求""" 

188 

189 symbol: str = Field(..., description="股票代码", example="YINN.US") 

190 

191 

192class RealtimeTradesRequest(BaseModel): 

193 """实时成交明细推送请求""" 

194 

195 symbol: str = Field(..., description="股票代码", example="YINN.US") 

196 count: int = Field(10, description="获取数量", ge=1, le=1000) 

197 

198 

199# 响应模型 

200class ApiTestResponse(BaseModel): 

201 """API测试响应""" 

202 

203 success: bool = Field(..., description="是否成功") 

204 message: str = Field(..., description="人类可读的结果描述") 

205 data: Optional[Any] = Field(None, description="原始数据") 

206 error: Optional[str] = Field(None, description="错误信息") 

207 

208 

209def safe_serialize_object(obj): 

210 """安全地序列化对象,避免包含不可序列化的属性""" 

211 if obj is None: 

212 return None 

213 

214 # 基本类型直接返回 

215 if isinstance(obj, (str, int, float, bool)): 

216 return obj 

217 

218 # 优先处理时间类型 - 在所有其他处理之前 

219 if hasattr(obj, "__class__"): 

220 class_name = obj.__class__.__name__ 

221 if class_name == "time": 

222 try: 

223 return obj.isoformat() 

224 except: 

225 return str(obj) 

226 elif class_name == "datetime": 

227 try: 

228 import pytz 

229 

230 if obj.tzinfo is None: 

231 eastern = pytz.timezone("US/Eastern") 

232 obj = eastern.localize(obj).astimezone(pytz.UTC) 

233 return obj.isoformat() 

234 except: 

235 return str(obj) 

236 elif class_name == "date": 

237 try: 

238 return obj.isoformat() 

239 except: 

240 return str(obj) 

241 

242 # 处理Decimal类型 

243 if hasattr(obj, "__class__") and obj.__class__.__name__ == "Decimal": 

244 try: 

245 # 尝试转换为float,如果失败则转为字符串 

246 if str(obj) == "NaN" or str(obj) == "": 

247 return None 

248 return float(obj) 

249 except: 

250 return str(obj) if str(obj) else None 

251 

252 # 处理datetime、date和time类型 

253 if hasattr(obj, "__class__"): 

254 class_name = obj.__class__.__name__ 

255 if class_name == "datetime": 

256 try: 

257 # 如果datetime对象没有时区信息,假设为美东时间 

258 import pytz 

259 

260 if obj.tzinfo is None: 

261 # 假设为美东时间,转换为UTC 

262 eastern = pytz.timezone("US/Eastern") 

263 obj = eastern.localize(obj).astimezone(pytz.UTC) 

264 return obj.isoformat() 

265 except: 

266 return str(obj) 

267 elif class_name == "date": 

268 try: 

269 return obj.isoformat() 

270 except: 

271 return str(obj) 

272 elif class_name == "time": 

273 try: 

274 # 对于time对象,转换为ISO格式字符串 

275 return obj.isoformat() 

276 except: 

277 return str(obj) 

278 

279 # 列表类型递归处理 

280 if isinstance(obj, (list, tuple)): 

281 return [safe_serialize_object(item) for item in obj] 

282 

283 # 字典类型递归处理 

284 if isinstance(obj, dict): 

285 return {k: safe_serialize_object(v) for k, v in obj.items()} 

286 

287 # 处理枚举类型 

288 if hasattr(obj, "__class__") and hasattr(obj.__class__, "__name__"): 

289 class_name = obj.__class__.__name__ 

290 

291 # 检查是否是枚举类型 

292 if hasattr(obj, "name") and hasattr(obj, "value"): 

293 try: 

294 return {"type": class_name, "name": obj.name, "value": obj.value} 

295 except: 

296 return {"type": class_name, "value": str(obj)} 

297 

298 # 检查是否是LongPort SDK的特殊类型 

299 if class_name in [ 

300 "SecurityBoard", 

301 "Market", 

302 "Currency", 

303 "Exchange", 

304 "Period", 

305 "AdjustType", 

306 "TradeSessions", 

307 "SubType", 

308 "CalcIndex", 

309 "TradeStatus", 

310 "TradeSession", 

311 "DerivativeType", 

312 "TradeDirection", 

313 "OptionType", 

314 "OptionDirection", 

315 "WarrantType", 

316 ]: 

317 try: 

318 # 尝试获取枚举的名称和值 

319 enum_str = str(obj) 

320 if "." in enum_str: 

321 enum_name = enum_str.split(".")[-1] 

322 return {"type": class_name, "name": enum_name, "value": enum_str} 

323 return {"type": class_name, "value": enum_str} 

324 except: 

325 return f"<{class_name}>" 

326 

327 # 根据LongPort文档,为特定类型提取关键属性 

328 result = {"_type": class_name} 

329 

330 # SecurityQuote - 证券行情 

331 if class_name == "SecurityQuote": 

332 attrs = [ 

333 "symbol", 

334 "last_done", 

335 "prev_close", 

336 "open", 

337 "high", 

338 "low", 

339 "timestamp", 

340 "volume", 

341 "turnover", 

342 "trade_status", 

343 "pre_market_quote", 

344 "post_market_quote", 

345 "overnight_quote", 

346 ] 

347 

348 # SecurityStaticInfo - 证券基础信息 

349 elif class_name == "SecurityStaticInfo": 

350 attrs = [ 

351 "symbol", 

352 "name_cn", 

353 "name_en", 

354 "name_hk", 

355 "exchange", 

356 "currency", 

357 "lot_size", 

358 "total_shares", 

359 "circulating_shares", 

360 "hk_shares", 

361 "eps", 

362 "eps_ttm", 

363 "bps", 

364 "dividend_yield", 

365 "stock_derivatives", 

366 "board", 

367 ] 

368 

369 # MarketTradingSession - 市场交易时段 

370 elif class_name == "MarketTradingSession": 

371 attrs = ["market", "trade_sessions"] 

372 

373 # TradingSessionInfo - 交易时段信息 

374 elif class_name == "TradingSessionInfo": 

375 attrs = ["begin_time", "end_time", "trade_session"] 

376 

377 # SecurityDepth - 证券深度 

378 elif class_name == "SecurityDepth": 

379 attrs = ["asks", "bids"] 

380 

381 # Trade - 成交记录 

382 elif class_name == "Trade": 

383 attrs = [ 

384 "price", 

385 "volume", 

386 "timestamp", 

387 "trade_type", 

388 "direction", 

389 "trade_session", 

390 ] 

391 

392 # IntradayLine - 分时线 

393 elif class_name == "IntradayLine": 

394 attrs = ["price", "timestamp", "volume", "turnover", "avg_price"] 

395 

396 # Candlestick - K线 

397 elif class_name == "Candlestick": 

398 attrs = [ 

399 "close", 

400 "open", 

401 "low", 

402 "high", 

403 "volume", 

404 "turnover", 

405 "timestamp", 

406 "trade_session", 

407 ] 

408 

409 # SecurityCalcIndex - 计算指标 

410 elif class_name == "SecurityCalcIndex": 

411 attrs = [ 

412 "symbol", 

413 "last_done", 

414 "change_value", 

415 "change_rate", 

416 "volume", 

417 "turnover", 

418 "ytd_change_rate", 

419 "turnover_rate", 

420 "total_market_value", 

421 "capital_flow", 

422 "amplitude", 

423 "volume_ratio", 

424 "pe_ttm_ratio", 

425 "pb_ratio", 

426 "dividend_ratio_ttm", 

427 ] 

428 

429 # Subscription - 订阅信息 

430 elif class_name == "Subscription": 

431 attrs = ["symbol", "sub_types", "candlesticks"] 

432 

433 # RealtimeQuote - 实时行情 

434 elif class_name == "RealtimeQuote": 

435 attrs = [ 

436 "symbol", 

437 "last_done", 

438 "open", 

439 "high", 

440 "low", 

441 "timestamp", 

442 "volume", 

443 "turnover", 

444 "trade_status", 

445 ] 

446 

447 # PrePostQuote - 盘前盘后行情 

448 elif class_name == "PrePostQuote": 

449 attrs = [ 

450 "last_done", 

451 "timestamp", 

452 "volume", 

453 "turnover", 

454 "high", 

455 "low", 

456 "prev_close", 

457 ] 

458 

459 # Depth - 深度 

460 elif class_name == "Depth": 

461 attrs = ["position", "price", "volume", "order_num"] 

462 

463 # MarketTradingDays - 交易日 

464 elif class_name == "MarketTradingDays": 

465 attrs = ["trading_days", "half_trading_days"] 

466 

467 else: 

468 # 对于未知类型,尝试提取常见属性 

469 attrs = [ 

470 "symbol", 

471 "name", 

472 "name_cn", 

473 "name_en", 

474 "name_hk", 

475 "exchange", 

476 "currency", 

477 "lot_size", 

478 "total_shares", 

479 "last_done", 

480 "timestamp", 

481 "open", 

482 "high", 

483 "low", 

484 "close", 

485 "volume", 

486 "turnover", 

487 "market", 

488 "board", 

489 "value", 

490 "count", 

491 "price", 

492 "begin_time", 

493 "end_time", 

494 "trade_session", 

495 "trade_sessions", 

496 "asks", 

497 "bids", 

498 "direction", 

499 "trade_type", 

500 "avg_price", 

501 "change_value", 

502 "change_rate", 

503 ] 

504 

505 # 提取属性 

506 extracted_count = 0 

507 for attr in attrs: 

508 if hasattr(obj, attr): 

509 try: 

510 value = getattr(obj, attr) 

511 # 跳过方法和可调用对象 

512 if callable(value): 

513 continue 

514 # 特殊处理时间类型 

515 if hasattr(value, "__class__") and value.__class__.__name__ in [ 

516 "time", 

517 "datetime", 

518 "date", 

519 ]: 

520 try: 

521 if value.__class__.__name__ == "time": 

522 result[attr] = value.isoformat() 

523 elif value.__class__.__name__ == "datetime": 

524 import pytz 

525 

526 if value.tzinfo is None: 

527 eastern = pytz.timezone("US/Eastern") 

528 value = eastern.localize(value).astimezone(pytz.UTC) 

529 result[attr] = value.isoformat() 

530 elif value.__class__.__name__ == "date": 

531 result[attr] = value.isoformat() 

532 extracted_count += 1 

533 continue 

534 except: 

535 pass 

536 

537 if value is not None: 

538 result[attr] = safe_serialize_object(value) 

539 extracted_count += 1 

540 except: 

541 pass 

542 

543 # 如果成功提取了属性,返回结果 

544 if extracted_count > 0: 

545 return result 

546 

547 # 如果没有提取到任何属性,尝试使用__dict__ 

548 try: 

549 if hasattr(obj, "__dict__"): 

550 for key, value in obj.__dict__.items(): 

551 if not key.startswith("_"): # 跳过私有属性 

552 try: 

553 result[key] = safe_serialize_object(value) 

554 except: 

555 result[key] = str(value) if value is not None else None 

556 return result 

557 except: 

558 pass 

559 

560 # 最后的备选方案:返回字符串表示 

561 try: 

562 # 对于特殊类型,尝试直接转换 

563 if class_name == "Decimal": 

564 try: 

565 return float(obj) 

566 except: 

567 return str(obj) 

568 elif class_name == "time": 

569 try: 

570 return obj.isoformat() 

571 except: 

572 return str(obj) 

573 elif class_name == "datetime": 

574 try: 

575 import pytz 

576 

577 if obj.tzinfo is None: 

578 eastern = pytz.timezone("US/Eastern") 

579 obj = eastern.localize(obj).astimezone(pytz.UTC) 

580 return obj.isoformat() 

581 except: 

582 return str(obj) 

583 elif class_name == "date": 

584 try: 

585 return obj.isoformat() 

586 except: 

587 return str(obj) 

588 

589 # 尝试直接访问一些可能的属性 

590 debug_info = f"<{class_name}: " 

591 if hasattr(obj, "symbol"): 

592 debug_info += f"symbol={getattr(obj, 'symbol', 'N/A')}, " 

593 if hasattr(obj, "name_cn"): 

594 debug_info += f"name_cn={getattr(obj, 'name_cn', 'N/A')}, " 

595 if hasattr(obj, "last_done"): 

596 val = getattr(obj, "last_done", "N/A") 

597 if hasattr(val, "__class__") and val.__class__.__name__ == "Decimal": 

598 try: 

599 val = float(val) 

600 except: 

601 pass 

602 debug_info += f"last_done={val}, " 

603 if hasattr(obj, "market"): 

604 debug_info += f"market={getattr(obj, 'market', 'N/A')}, " 

605 # 特殊处理时间属性 

606 if hasattr(obj, "begin_time"): 

607 val = getattr(obj, "begin_time", "N/A") 

608 if hasattr(val, "__class__") and val.__class__.__name__ == "time": 

609 try: 

610 val = val.isoformat() 

611 except: 

612 pass 

613 debug_info += f"begin_time={val}, " 

614 if hasattr(obj, "end_time"): 

615 val = getattr(obj, "end_time", "N/A") 

616 if hasattr(val, "__class__") and val.__class__.__name__ == "time": 

617 try: 

618 val = val.isoformat() 

619 except: 

620 pass 

621 debug_info += f"end_time={val}, " 

622 debug_info = debug_info.rstrip(", ") + ">" 

623 return debug_info 

624 except: 

625 return str(obj) 

626 

627 # 对于没有类名的对象,返回字符串表示 

628 try: 

629 return str(obj) 

630 except Exception: 

631 return f"<{type(obj).__name__} object>" 

632 

633 

634def format_response( 

635 success: bool, message: str, data: Any = None, error: str = None 

636) -> ApiTestResponse: 

637 """格式化响应""" 

638 # 安全地序列化数据 

639 safe_data = safe_serialize_object(data) 

640 

641 return ApiTestResponse( 

642 success=success, message=message, data=safe_data, error=error 

643 ) 

644 

645 

646def process_symbols_parameter(symbols): 

647 """处理symbols参数,支持字符串和列表格式""" 

648 if isinstance(symbols, str): 

649 return [s.strip() for s in symbols.split(",") if s.strip()] 

650 elif isinstance(symbols, list): 

651 # 如果列表中有字符串需要分割的情况 

652 processed_symbols = [] 

653 for symbol in symbols: 

654 if "," in str(symbol): 

655 processed_symbols.extend( 

656 [s.strip() for s in str(symbol).split(",") if s.strip()] 

657 ) 

658 else: 

659 processed_symbols.append(str(symbol)) 

660 return processed_symbols 

661 return symbols 

662 

663 

664# 注意:枚举值转换现在在BaseDataSourceClient的具体实现中处理 

665# 这样API测试模块就与具体的券商SDK解耦了 

666 

667 

668# 拉取页面API端点 

669 

670 

671@router.post("/pull/static-info", response_model=ApiTestResponse) 

672async def test_static_info( 

673 request: StaticInfoRequest, current_user: User = Depends(get_current_user) 

674): 

675 """测试获取标的基础信息""" 

676 try: 

677 # 使用新的业务适配器,保持统一的接口调用 

678 adapter = QuoteAdapter(current_user.id) 

679 result = adapter.get_static_info(request.symbols) 

680 

681 # 格式化结果描述 

682 if result: 

683 message = f"成功获取 {len(result)} 个标的的基础信息" 

684 for info in result[:3]: # 显示前3个 

685 symbol = getattr( 

686 info, 

687 "symbol", 

688 info.get("symbol", "N/A") if isinstance(info, dict) else "N/A", 

689 ) 

690 name_cn = getattr( 

691 info, 

692 "name_cn", 

693 info.get("symbol", "N/A") if isinstance(info, dict) else "N/A", 

694 ) 

695 name_en = getattr( 

696 info, 

697 "name_en", 

698 info.get("symbol", "N/A") if isinstance(info, dict) else "N/A", 

699 ) 

700 name = name_cn or name_en 

701 message += f"\n• {symbol}: {name}" 

702 if len(result) > 3: 

703 message += f"\n... 还有 {len(result) - 3} 个标的" 

704 else: 

705 message = "未获取到任何标的基础信息" 

706 

707 return format_response( 

708 result is not None and len(result) > 0, 

709 message if result else "请求失败", 

710 result, 

711 ) 

712 

713 except Exception as e: 

714 return format_response(False, f"获取标的基础信息失败", error=str(e)) 

715 

716 

717@router.post("/pull/quote", response_model=ApiTestResponse) 

718async def test_quote( 

719 request: QuoteRequest, current_user: User = Depends(get_current_user) 

720): 

721 """测试获取标的实时行情""" 

722 try: 

723 adapter = QuoteAdapter(current_user.id) 

724 result = adapter.get_quote(request.symbols) 

725 

726 # 格式化结果描述 

727 if result: 

728 message = f"成功获取 {len(result)} 个标的的实时行情" 

729 for quote in result[:3]: # 显示前3个 

730 symbol = getattr( 

731 quote, 

732 "symbol", 

733 quote.get("symbol", "N/A") if isinstance(quote, dict) else "N/A", 

734 ) 

735 last_done = getattr( 

736 quote, 

737 "last_done", 

738 quote.get("last_done", "N/A") if isinstance(quote, dict) else "N/A", 

739 ) 

740 timestamp = getattr( 

741 quote, 

742 "timestamp", 

743 quote.get("timestamp", "N/A") if isinstance(quote, dict) else "N/A", 

744 ) 

745 message += f"\n• {symbol}: ${last_done} ({timestamp})" 

746 if len(result) > 3: 

747 message += f"\n... 还有 {len(result) - 3} 个标的" 

748 else: 

749 message = "未获取到任何实时行情" 

750 

751 return format_response( 

752 result is not None and len(result) > 0, 

753 message if result else "请求失败", 

754 result, 

755 ) 

756 

757 except Exception as e: 

758 return format_response(False, f"获取实时行情失败", error=str(e)) 

759 

760 

761@router.post("/pull/depth", response_model=ApiTestResponse) 

762async def test_depth( 

763 request: DepthRequest, current_user: User = Depends(get_current_user) 

764): 

765 """测试获取标的盘口""" 

766 try: 

767 adapter = QuoteAdapter(current_user.id) 

768 result = adapter.get_depth(request.symbol) 

769 

770 # 格式化结果描述 

771 if result: 

772 ask_count = len(result.get("asks", [])) if result.get("asks") else 0 

773 bid_count = len(result.get("bids", [])) if result.get("bids") else 0 

774 message = f"成功获取 {request.symbol} 的盘口数据\n• 卖盘档位: {ask_count}\n• 买盘档位: {bid_count}" 

775 else: 

776 message = f"未获取到 {request.symbol} 的盘口数据" 

777 

778 return format_response( 

779 result is not None, message if result else "请求失败", result 

780 ) 

781 

782 except Exception as e: 

783 return format_response(False, f"获取盘口数据失败", error=str(e)) 

784 

785 

786@router.post("/pull/trades", response_model=ApiTestResponse) 

787async def test_trades( 

788 request: TradesRequest, current_user: User = Depends(get_current_user) 

789): 

790 """测试获取标的成交明细""" 

791 try: 

792 adapter = QuoteAdapter(current_user.id) 

793 result = adapter.get_trades(request.symbol, request.count) 

794 

795 # 格式化结果描述 

796 if result: 

797 message = f"成功获取 {request.symbol}{len(result)} 条成交明细" 

798 if result: 

799 latest = result[0] 

800 price = getattr( 

801 latest, 

802 "price", 

803 latest.get("price", "N/A") if isinstance(latest, dict) else "N/A", 

804 ) 

805 volume = getattr( 

806 latest, 

807 "volume", 

808 latest.get("volume", "N/A") if isinstance(latest, dict) else "N/A", 

809 ) 

810 timestamp = getattr( 

811 latest, 

812 "timestamp", 

813 ( 

814 latest.get("timestamp", "N/A") 

815 if isinstance(latest, dict) 

816 else "N/A" 

817 ), 

818 ) 

819 message += f"\n• 最新成交: ${price} x {volume} ({timestamp})" 

820 else: 

821 message = f"未获取到 {request.symbol} 的成交明细" 

822 

823 return format_response( 

824 result is not None and len(result) > 0, 

825 message if result else "请求失败", 

826 result, 

827 ) 

828 

829 except Exception as e: 

830 return format_response(False, f"获取成交明细失败", error=str(e)) 

831 

832 

833@router.post("/pull/intraday", response_model=ApiTestResponse) 

834async def test_intraday( 

835 request: IntradayRequest, current_user: User = Depends(get_current_user) 

836): 

837 """测试获取标的分时""" 

838 try: 

839 adapter = QuoteAdapter(current_user.id) 

840 result = adapter.get_candlesticks( 

841 request.symbol, "Min_1", 100, "NoAdjust", request.trade_sessions 

842 ) 

843 

844 # 格式化结果描述 

845 if result: 

846 message = f"成功获取 {request.symbol}{len(result)} 个分时数据点" 

847 if result: 

848 latest = result[-1] 

849 price = getattr( 

850 latest, 

851 "price", 

852 latest.get("price", "N/A") if isinstance(latest, dict) else "N/A", 

853 ) 

854 timestamp = getattr( 

855 latest, 

856 "timestamp", 

857 ( 

858 latest.get("timestamp", "N/A") 

859 if isinstance(latest, dict) 

860 else "N/A" 

861 ), 

862 ) 

863 message += f"\n• 最新价格: ${price} ({timestamp})" 

864 else: 

865 message = f"未获取到 {request.symbol} 的分时数据" 

866 

867 return format_response( 

868 result is not None and len(result) > 0, 

869 message if result else "请求失败", 

870 result, 

871 ) 

872 

873 except Exception as e: 

874 return format_response(False, f"获取分时数据失败", error=str(e)) 

875 

876 

877@router.post("/pull/candlesticks", response_model=ApiTestResponse) 

878async def test_candlesticks( 

879 request: CandlesticksRequest, current_user: User = Depends(get_current_user) 

880): 

881 """测试获取标的K线""" 

882 try: 

883 adapter = QuoteAdapter(current_user.id) 

884 result = adapter.get_candlesticks( 

885 request.symbol, 

886 request.period, 

887 request.count, 

888 request.adjust_type, 

889 request.trade_sessions, 

890 ) 

891 

892 # 格式化结果描述 

893 if result: 

894 message = f"成功获取 {request.symbol}{len(result)} 根K线数据" 

895 if result: 

896 latest = result[-1] 

897 open_price = getattr( 

898 latest, 

899 "open", 

900 latest.get("open", "N/A") if isinstance(latest, dict) else "N/A", 

901 ) 

902 high_price = getattr( 

903 latest, 

904 "high", 

905 latest.get("high", "N/A") if isinstance(latest, dict) else "N/A", 

906 ) 

907 low_price = getattr( 

908 latest, 

909 "low", 

910 latest.get("low", "N/A") if isinstance(latest, dict) else "N/A", 

911 ) 

912 close_price = getattr( 

913 latest, 

914 "close", 

915 latest.get("close", "N/A") if isinstance(latest, dict) else "N/A", 

916 ) 

917 message += f"\n• 最新K线: 开${open_price} 高${high_price} 低${low_price} 收${close_price}" 

918 else: 

919 message = f"未获取到 {request.symbol} 的K线数据" 

920 

921 return format_response( 

922 result is not None and len(result) > 0, 

923 message if result else "请求失败", 

924 result, 

925 ) 

926 

927 except Exception as e: 

928 return format_response(False, f"获取K线数据失败", error=str(e)) 

929 

930 

931@router.post("/pull/trading-days", response_model=ApiTestResponse) 

932async def test_trading_days( 

933 request: TradingDaysRequest, current_user: User = Depends(get_current_user) 

934): 

935 """测试获取市场交易日""" 

936 try: 

937 adapter = QuoteAdapter(current_user.id) 

938 result = adapter.get_trading_days(request.market, request.begin, request.end) 

939 

940 # 格式化结果描述 

941 if result: 

942 trading_days = getattr( 

943 result, 

944 "trading_days", 

945 result.get("trading_days", []) if isinstance(result, dict) else [], 

946 ) 

947 half_trading_days = getattr( 

948 result, 

949 "half_trading_days", 

950 result.get("half_trading_days", []) if isinstance(result, dict) else [], 

951 ) 

952 trading_count = len(trading_days) 

953 half_trading_count = len(half_trading_days) 

954 message = f"成功获取 {request.market} 市场交易日信息\n• 交易日: {trading_count} 天\n• 半日交易: {half_trading_count}" 

955 else: 

956 message = f"未获取到 {request.market} 市场的交易日信息" 

957 

958 return format_response( 

959 result is not None, message if result else "请求失败", result 

960 ) 

961 

962 except Exception as e: 

963 return format_response(False, f"获取市场交易日失败", error=str(e)) 

964 

965 

966@router.post("/pull/trading-session", response_model=ApiTestResponse) 

967async def test_trading_session(current_user: User = Depends(get_current_user)): 

968 """测试获取各市场当日交易时段""" 

969 try: 

970 adapter = QuoteAdapter(current_user.id) 

971 result = adapter.get_trading_session() 

972 

973 # 格式化结果描述 

974 if result: 

975 message = f"成功获取 {len(result)} 个市场的交易时段信息" 

976 for session in result[:3]: # 显示前3个 

977 trade_sessions = getattr( 

978 session, 

979 "trade_sessions", 

980 ( 

981 session.get("trade_sessions", []) 

982 if isinstance(session, dict) 

983 else [] 

984 ), 

985 ) 

986 market = getattr( 

987 session, 

988 "market", 

989 ( 

990 session.get("market", "N/A") 

991 if isinstance(session, dict) 

992 else "N/A" 

993 ), 

994 ) 

995 session_count = len(trade_sessions) if trade_sessions else 0 

996 message += f"\n• {market}: {session_count} 个交易时段" 

997 if len(result) > 3: 

998 message += f"\n... 还有 {len(result) - 3} 个市场" 

999 else: 

1000 message = "未获取到交易时段信息" 

1001 

1002 return format_response( 

1003 result is not None and len(result) > 0, 

1004 message if result else "请求失败", 

1005 result, 

1006 ) 

1007 

1008 except Exception as e: 

1009 return format_response(False, f"获取交易时段失败", error=str(e)) 

1010 

1011 

1012@router.post("/pull/calc-indexes", response_model=ApiTestResponse) 

1013async def test_calc_indexes( 

1014 request: CalcIndexesRequest, current_user: User = Depends(get_current_user) 

1015): 

1016 """测试获取标的计算指标""" 

1017 try: 

1018 adapter = QuoteAdapter(current_user.id) 

1019 result = adapter.get_calc_indexes(request.symbols, request.indexes) 

1020 

1021 # 格式化结果描述 

1022 if result: 

1023 message = f"成功获取 {len(result)} 个标的的计算指标" 

1024 for calc in result[:3]: # 显示前3个 

1025 symbol = getattr( 

1026 calc, 

1027 "symbol", 

1028 calc.get("symbol", "N/A") if isinstance(calc, dict) else "N/A", 

1029 ) 

1030 last_done = getattr( 

1031 calc, 

1032 "last_done", 

1033 calc.get("symbol", "N/A") if isinstance(calc, dict) else "N/A", 

1034 ) 

1035 message += f"\n• {symbol}: 最新价 ${last_done or 'N/A'}" 

1036 if len(result) > 3: 

1037 message += f"\n... 还有 {len(result) - 3} 个标的" 

1038 else: 

1039 message = "未获取到计算指标" 

1040 

1041 return format_response( 

1042 result is not None and len(result) > 0, 

1043 message if result else "请求失败", 

1044 result, 

1045 ) 

1046 

1047 except Exception as e: 

1048 return format_response(False, f"获取计算指标失败", error=str(e)) 

1049 

1050 

1051@router.post("/pull/history-candlesticks", response_model=ApiTestResponse) 

1052async def test_history_candlesticks( 

1053 request: HistoryCandlesticksRequest, current_user: User = Depends(get_current_user) 

1054): 

1055 """测试获取标的历史K线""" 

1056 try: 

1057 adapter = QuoteAdapter(current_user.id) 

1058 result = adapter.get_candlesticks( 

1059 request.symbol, 

1060 request.period, 

1061 1000, 

1062 request.adjust_type, 

1063 request.trade_sessions, 

1064 ) 

1065 

1066 # 格式化结果描述 

1067 if result: 

1068 message = f"成功获取 {request.symbol}{len(result)} 根历史K线数据" 

1069 if result: 

1070 first = result[0] 

1071 last = result[-1] 

1072 first_time = getattr( 

1073 first, 

1074 "timestamp", 

1075 first.get("timestamp", "N/A") if isinstance(first, dict) else "N/A", 

1076 ) 

1077 last_time = getattr( 

1078 last, 

1079 "timestamp", 

1080 last.get("timestamp", "N/A") if isinstance(last, dict) else "N/A", 

1081 ) 

1082 message += f"\n• 时间范围: {first_time}{last_time}" 

1083 else: 

1084 message = f"未获取到 {request.symbol}: 的历史K线数据" 

1085 

1086 return format_response( 

1087 result is not None and len(result) > 0, 

1088 message if result else "请求失败", 

1089 result, 

1090 ) 

1091 

1092 except Exception as e: 

1093 return format_response(False, f"获取历史K线失败", error=str(e)) 

1094 

1095 

1096# 订阅推送页面API端点 

1097 

1098 

1099@router.post("/subscription/subscriptions", response_model=ApiTestResponse) 

1100async def test_subscriptions(current_user: User = Depends(get_current_user)): 

1101 """测试获取已订阅标的行情""" 

1102 try: 

1103 adapter = QuoteAdapter(current_user.id) 

1104 result = adapter.get_subscription_summary() 

1105 

1106 # 格式化结果描述 

1107 subscriptions = result.get("subscriptions", []) 

1108 count = result.get("count", 0) 

1109 

1110 if count > 0: 

1111 message = f"当前已订阅 {count} 个标的" 

1112 if count > 3: 

1113 message += f"\n... 还有 {count - 3} 个标的" 

1114 else: 

1115 message = "当前没有订阅任何标的" 

1116 

1117 return format_response(count > 0, message, subscriptions) 

1118 

1119 except Exception as e: 

1120 return format_response(False, f"获取订阅信息失败", error=str(e)) 

1121 

1122 

1123@router.post("/subscription/subscribe", response_model=ApiTestResponse) 

1124async def test_subscribe( 

1125 request: SubscribeRequest, current_user: User = Depends(get_current_user) 

1126): 

1127 """测试订阅行情数据""" 

1128 try: 

1129 adapter = QuoteAdapter(current_user.id) 

1130 success = adapter.subscribe( 

1131 request.symbols, request.sub_types, request.is_first_push 

1132 ) 

1133 

1134 # 格式化结果描述 

1135 symbols_str = ", ".join(request.symbols) 

1136 sub_types_str = ", ".join(request.sub_types) 

1137 message = ( 

1138 f"成功订阅行情数据\n• 标的: {symbols_str}\n• 类型: {sub_types_str}\n• 立即推送: {'是' if request.is_first_push else '否'}" 

1139 if success 

1140 else "订阅失败" 

1141 ) 

1142 

1143 return format_response( 

1144 success, 

1145 message, 

1146 { 

1147 "symbols": request.symbols, 

1148 "sub_types": request.sub_types, 

1149 "is_first_push": request.is_first_push, 

1150 }, 

1151 ) 

1152 

1153 except Exception as e: 

1154 return format_response(False, f"订阅行情数据失败", error=str(e)) 

1155 

1156 

1157@router.post("/subscription/unsubscribe", response_model=ApiTestResponse) 

1158async def test_unsubscribe( 

1159 request: UnsubscribeRequest, current_user: User = Depends(get_current_user) 

1160): 

1161 """测试取消订阅行情数据""" 

1162 try: 

1163 adapter = QuoteAdapter(current_user.id) 

1164 success = adapter.unsubscribe(request.symbols, request.sub_types) 

1165 

1166 # 格式化结果描述 

1167 symbols_str = ", ".join(request.symbols) 

1168 sub_types_str = ", ".join(request.sub_types) 

1169 message = ( 

1170 f"成功取消订阅行情数据\n• 标的: {symbols_str}\n• 类型: {sub_types_str}" 

1171 if success 

1172 else "取消订阅失败" 

1173 ) 

1174 

1175 return format_response( 

1176 success, 

1177 message, 

1178 {"symbols": request.symbols, "sub_types": request.sub_types}, 

1179 ) 

1180 

1181 except Exception as e: 

1182 return format_response(False, f"取消订阅失败", error=str(e)) 

1183 

1184 

1185@router.post("/subscription/realtime-quote", response_model=ApiTestResponse) 

1186async def test_realtime_quote( 

1187 request: RealtimeQuoteRequest, current_user: User = Depends(get_current_user) 

1188): 

1189 """测试实时价格推送""" 

1190 try: 

1191 adapter = QuoteAdapter(current_user.id) 

1192 result = adapter.get_realtime_quote(request.symbols) 

1193 

1194 # 格式化结果描述 

1195 if result: 

1196 message = f"成功获取 {len(result)} 个标的的实时价格" 

1197 for quote in result[:3]: # 显示前3个 

1198 symbol = getattr( 

1199 quote, 

1200 "symbol", 

1201 quote.get("symbol", "N/A") if isinstance(quote, dict) else "N/A", 

1202 ) 

1203 last_done = getattr( 

1204 quote, 

1205 "last_done", 

1206 quote.get("last_done", "N/A") if isinstance(quote, dict) else "N/A", 

1207 ) 

1208 timestamp = getattr( 

1209 quote, 

1210 "timestamp", 

1211 quote.get("timestamp", "N/A") if isinstance(quote, dict) else "N/A", 

1212 ) 

1213 message += f"\n• {symbol}: ${last_done} ({timestamp})" 

1214 if len(result) > 3: 

1215 message += f"\n... 还有 {len(result) - 3} 个标的" 

1216 else: 

1217 message = "未获取到实时价格数据(可能需要先订阅)" 

1218 

1219 return format_response( 

1220 result is not None and len(result) > 0, 

1221 message if result else "请求失败", 

1222 result, 

1223 ) 

1224 

1225 except Exception as e: 

1226 return format_response(False, f"获取实时价格失败", error=str(e)) 

1227 

1228 

1229@router.post("/subscription/realtime-depth", response_model=ApiTestResponse) 

1230async def test_realtime_depth( 

1231 request: RealtimeDepthRequest, current_user: User = Depends(get_current_user) 

1232): 

1233 """测试实时盘口推送""" 

1234 try: 

1235 adapter = QuoteAdapter(current_user.id) 

1236 result = adapter.get_realtime_depth(request.symbol) 

1237 

1238 # 格式化结果描述 

1239 if result: 

1240 asks = getattr( 

1241 result, 

1242 "asks", 

1243 result.get("asks", []) if isinstance(result, dict) else [], 

1244 ) 

1245 bids = getattr( 

1246 result, 

1247 "bids", 

1248 result.get("bids", []) if isinstance(result, dict) else [], 

1249 ) 

1250 ask_count = len(asks) if asks else 0 

1251 bid_count = len(bids) if bids else 0 

1252 message = f"成功获取 {request.symbol} 的实时盘口数据\n• 卖盘档位: {ask_count}\n• 买盘档位: {bid_count}" 

1253 else: 

1254 message = f"未获取到 {request.symbol} 的实时盘口数据(可能需要先订阅)" 

1255 

1256 return format_response( 

1257 result is not None, message if result else "请求失败", result 

1258 ) 

1259 

1260 except Exception as e: 

1261 return format_response(False, f"获取实时盘口失败", error=str(e)) 

1262 

1263 

1264@router.post("/subscription/realtime-trades", response_model=ApiTestResponse) 

1265async def test_realtime_trades( 

1266 request: RealtimeTradesRequest, current_user: User = Depends(get_current_user) 

1267): 

1268 """测试实时成交明细推送""" 

1269 try: 

1270 adapter = QuoteAdapter(current_user.id) 

1271 result = adapter.get_realtime_trades(request.symbol, request.count) 

1272 

1273 # 格式化结果描述 

1274 if result: 

1275 message = f"成功获取 {request.symbol}{len(result)} 条实时成交明细" 

1276 if result: 

1277 latest = result[0] 

1278 price = getattr( 

1279 latest, 

1280 "price", 

1281 latest.get("price", "N/A") if isinstance(latest, dict) else "N/A", 

1282 ) 

1283 volume = getattr( 

1284 latest, 

1285 "volume", 

1286 latest.get("volume", "N/A") if isinstance(latest, dict) else "N/A", 

1287 ) 

1288 timestamp = getattr( 

1289 latest, 

1290 "timestamp", 

1291 ( 

1292 latest.get("timestamp", "N/A") 

1293 if isinstance(latest, dict) 

1294 else "N/A" 

1295 ), 

1296 ) 

1297 message += f"\n• 最新成交: ${price} x {volume} ({timestamp})" 

1298 else: 

1299 message = f"未获取到 {request.symbol} 的实时成交明细(可能需要先订阅)" 

1300 

1301 return format_response( 

1302 result is not None and len(result) > 0, 

1303 message if result else "请求失败", 

1304 result, 

1305 ) 

1306 

1307 except Exception as e: 

1308 return format_response(False, f"获取实时成交明细失败", error=str(e))