Coverage for api/v1/endpoints/api_test.py: 66.89%
586 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-13 18:58 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-13 18:58 +0000
1"""
2API测试端点
3提供长桥OpenAPI的测试接口
4"""
6from datetime import date, datetime
7from decimal import Decimal
8from typing import Any, Dict, List, Optional, Union
10from fastapi import APIRouter, Depends, HTTPException
11from pydantic import BaseModel, Field, field_validator
13from core.data_source.adapters.quote_adapter import QuoteAdapter
14from core.middleware.auth_middleware import get_current_user
15from core.models.user import User
17router = APIRouter(prefix="/api-test", tags=["API测试"])
20# 请求模型定义
21class StaticInfoRequest(BaseModel):
22 """获取标的基础信息请求"""
24 symbols: Union[str, List[str]] = Field(
25 ..., description="股票代码列表或逗号分隔的字符串", example=["YINN.US", "700.HK"]
26 )
28 @field_validator("symbols")
29 @classmethod
30 def validate_symbols(cls, v):
31 if isinstance(v, str):
32 return [s.strip() for s in v.split(",") if s.strip()]
33 return v
36class QuoteRequest(BaseModel):
37 """获取标的实时行情请求"""
39 symbols: Union[str, List[str]] = Field(
40 ..., description="股票代码列表或逗号分隔的字符串", example=["YINN.US", "700.HK"]
41 )
43 @field_validator("symbols")
44 @classmethod
45 def validate_symbols(cls, v):
46 if isinstance(v, str):
47 return [s.strip() for s in v.split(",") if s.strip()]
48 return v
51class DepthRequest(BaseModel):
52 """获取标的盘口请求"""
54 symbol: str = Field(..., description="股票代码", example="YINN.US")
57class TradesRequest(BaseModel):
58 """获取标的成交明细请求"""
60 symbol: str = Field(..., description="股票代码", example="YINN.US")
61 count: int = Field(10, description="获取数量", ge=1, le=1000)
64class IntradayRequest(BaseModel):
65 """获取标的分时请求"""
67 symbol: str = Field(..., description="股票代码", example="YINN.US")
68 trade_sessions: str = Field("Intraday", description="交易时段", example="Intraday")
71class CandlesticksRequest(BaseModel):
72 """获取标的K线请求"""
74 symbol: str = Field(..., description="股票代码", example="YINN.US")
75 period: str = Field("Day", description="K线周期", example="Day")
76 count: int = Field(10, description="获取数量", ge=1, le=1000)
77 adjust_type: str = Field("NoAdjust", description="复权类型", example="NoAdjust")
78 trade_sessions: str = Field("Intraday", description="交易时段", example="Intraday")
81class TradingDaysRequest(BaseModel):
82 """获取市场交易日请求"""
84 market: str = Field(..., description="市场类型", example="US")
85 begin: Union[date, datetime] = Field(..., description="开始日期")
86 end: Union[date, datetime] = Field(..., description="结束日期")
88 @field_validator("begin", "end")
89 @classmethod
90 def validate_date(cls, v):
91 if isinstance(v, datetime):
92 return v.date()
93 return v
96class CalcIndexesRequest(BaseModel):
97 """获取标的计算指标请求"""
99 symbols: Union[str, List[str]] = Field(
100 ..., description="股票代码列表或逗号分隔的字符串", example=["YINN.US", "700.HK"]
101 )
102 indexes: Union[str, List[str]] = Field(
103 ...,
104 description="计算指标列表或逗号分隔的字符串",
105 example=["LastDone", "ChangeRate"],
106 )
108 @field_validator("symbols", "indexes")
109 @classmethod
110 def validate_list_fields(cls, v):
111 if isinstance(v, str):
112 return [s.strip() for s in v.split(",") if s.strip()]
113 return v
116class HistoryCandlesticksRequest(BaseModel):
117 """获取标的历史K线请求"""
119 symbol: str = Field(..., description="股票代码", example="YINN.US")
120 period: str = Field("Day", description="K线周期", example="Day")
121 adjust_type: str = Field("NoAdjust", description="复权类型", example="NoAdjust")
122 start: Union[date, datetime] = Field(..., description="开始日期")
123 end: Union[date, datetime] = Field(..., description="结束日期")
124 trade_sessions: str = Field("Intraday", description="交易时段", example="Intraday")
126 @field_validator("start", "end")
127 @classmethod
128 def validate_date(cls, v):
129 if isinstance(v, datetime):
130 return v.date()
131 return v
134class SubscribeRequest(BaseModel):
135 """订阅行情数据请求"""
137 symbols: Union[str, List[str]] = Field(
138 ..., description="股票代码列表或逗号分隔的字符串", example=["YINN.US", "700.HK"]
139 )
140 sub_types: Union[str, List[str]] = Field(
141 ..., description="订阅类型列表或逗号分隔的字符串", example=["Quote", "Depth"]
142 )
143 is_first_push: bool = Field(False, description="是否立即推送")
145 @field_validator("symbols", "sub_types")
146 @classmethod
147 def validate_list_fields(cls, v):
148 if isinstance(v, str):
149 return [s.strip() for s in v.split(",") if s.strip()]
150 return v
153class UnsubscribeRequest(BaseModel):
154 """取消订阅行情数据请求"""
156 symbols: Union[str, List[str]] = Field(
157 ..., description="股票代码列表或逗号分隔的字符串", example=["YINN.US", "700.HK"]
158 )
159 sub_types: Union[str, List[str]] = Field(
160 ..., description="订阅类型列表或逗号分隔的字符串", example=["Quote", "Depth"]
161 )
163 @field_validator("symbols", "sub_types")
164 @classmethod
165 def validate_list_fields(cls, v):
166 if isinstance(v, str):
167 return [s.strip() for s in v.split(",") if s.strip()]
168 return v
171class RealtimeQuoteRequest(BaseModel):
172 """实时价格推送请求"""
174 symbols: Union[str, List[str]] = Field(
175 ..., description="股票代码列表或逗号分隔的字符串", example=["YINN.US", "700.HK"]
176 )
178 @field_validator("symbols")
179 @classmethod
180 def validate_symbols(cls, v):
181 if isinstance(v, str):
182 return [s.strip() for s in v.split(",") if s.strip()]
183 return v
186class RealtimeDepthRequest(BaseModel):
187 """实时盘口推送请求"""
189 symbol: str = Field(..., description="股票代码", example="YINN.US")
192class RealtimeTradesRequest(BaseModel):
193 """实时成交明细推送请求"""
195 symbol: str = Field(..., description="股票代码", example="YINN.US")
196 count: int = Field(10, description="获取数量", ge=1, le=1000)
199# 响应模型
200class ApiTestResponse(BaseModel):
201 """API测试响应"""
203 success: bool = Field(..., description="是否成功")
204 message: str = Field(..., description="人类可读的结果描述")
205 data: Optional[Any] = Field(None, description="原始数据")
206 error: Optional[str] = Field(None, description="错误信息")
209def safe_serialize_object(obj):
210 """安全地序列化对象,避免包含不可序列化的属性"""
211 if obj is None:
212 return None
214 # 基本类型直接返回
215 if isinstance(obj, (str, int, float, bool)):
216 return obj
218 # 优先处理时间类型 - 在所有其他处理之前
219 if hasattr(obj, "__class__"):
220 class_name = obj.__class__.__name__
221 if class_name == "time":
222 try:
223 return obj.isoformat()
224 except:
225 return str(obj)
226 elif class_name == "datetime":
227 try:
228 import pytz
230 if obj.tzinfo is None:
231 eastern = pytz.timezone("US/Eastern")
232 obj = eastern.localize(obj).astimezone(pytz.UTC)
233 return obj.isoformat()
234 except:
235 return str(obj)
236 elif class_name == "date":
237 try:
238 return obj.isoformat()
239 except:
240 return str(obj)
242 # 处理Decimal类型
243 if hasattr(obj, "__class__") and obj.__class__.__name__ == "Decimal":
244 try:
245 # 尝试转换为float,如果失败则转为字符串
246 if str(obj) == "NaN" or str(obj) == "":
247 return None
248 return float(obj)
249 except:
250 return str(obj) if str(obj) else None
252 # 处理datetime、date和time类型
253 if hasattr(obj, "__class__"):
254 class_name = obj.__class__.__name__
255 if class_name == "datetime":
256 try:
257 # 如果datetime对象没有时区信息,假设为美东时间
258 import pytz
260 if obj.tzinfo is None:
261 # 假设为美东时间,转换为UTC
262 eastern = pytz.timezone("US/Eastern")
263 obj = eastern.localize(obj).astimezone(pytz.UTC)
264 return obj.isoformat()
265 except:
266 return str(obj)
267 elif class_name == "date":
268 try:
269 return obj.isoformat()
270 except:
271 return str(obj)
272 elif class_name == "time":
273 try:
274 # 对于time对象,转换为ISO格式字符串
275 return obj.isoformat()
276 except:
277 return str(obj)
279 # 列表类型递归处理
280 if isinstance(obj, (list, tuple)):
281 return [safe_serialize_object(item) for item in obj]
283 # 字典类型递归处理
284 if isinstance(obj, dict):
285 return {k: safe_serialize_object(v) for k, v in obj.items()}
287 # 处理枚举类型
288 if hasattr(obj, "__class__") and hasattr(obj.__class__, "__name__"):
289 class_name = obj.__class__.__name__
291 # 检查是否是枚举类型
292 if hasattr(obj, "name") and hasattr(obj, "value"):
293 try:
294 return {"type": class_name, "name": obj.name, "value": obj.value}
295 except:
296 return {"type": class_name, "value": str(obj)}
298 # 检查是否是LongPort SDK的特殊类型
299 if class_name in [
300 "SecurityBoard",
301 "Market",
302 "Currency",
303 "Exchange",
304 "Period",
305 "AdjustType",
306 "TradeSessions",
307 "SubType",
308 "CalcIndex",
309 "TradeStatus",
310 "TradeSession",
311 "DerivativeType",
312 "TradeDirection",
313 "OptionType",
314 "OptionDirection",
315 "WarrantType",
316 ]:
317 try:
318 # 尝试获取枚举的名称和值
319 enum_str = str(obj)
320 if "." in enum_str:
321 enum_name = enum_str.split(".")[-1]
322 return {"type": class_name, "name": enum_name, "value": enum_str}
323 return {"type": class_name, "value": enum_str}
324 except:
325 return f"<{class_name}>"
327 # 根据LongPort文档,为特定类型提取关键属性
328 result = {"_type": class_name}
330 # SecurityQuote - 证券行情
331 if class_name == "SecurityQuote":
332 attrs = [
333 "symbol",
334 "last_done",
335 "prev_close",
336 "open",
337 "high",
338 "low",
339 "timestamp",
340 "volume",
341 "turnover",
342 "trade_status",
343 "pre_market_quote",
344 "post_market_quote",
345 "overnight_quote",
346 ]
348 # SecurityStaticInfo - 证券基础信息
349 elif class_name == "SecurityStaticInfo":
350 attrs = [
351 "symbol",
352 "name_cn",
353 "name_en",
354 "name_hk",
355 "exchange",
356 "currency",
357 "lot_size",
358 "total_shares",
359 "circulating_shares",
360 "hk_shares",
361 "eps",
362 "eps_ttm",
363 "bps",
364 "dividend_yield",
365 "stock_derivatives",
366 "board",
367 ]
369 # MarketTradingSession - 市场交易时段
370 elif class_name == "MarketTradingSession":
371 attrs = ["market", "trade_sessions"]
373 # TradingSessionInfo - 交易时段信息
374 elif class_name == "TradingSessionInfo":
375 attrs = ["begin_time", "end_time", "trade_session"]
377 # SecurityDepth - 证券深度
378 elif class_name == "SecurityDepth":
379 attrs = ["asks", "bids"]
381 # Trade - 成交记录
382 elif class_name == "Trade":
383 attrs = [
384 "price",
385 "volume",
386 "timestamp",
387 "trade_type",
388 "direction",
389 "trade_session",
390 ]
392 # IntradayLine - 分时线
393 elif class_name == "IntradayLine":
394 attrs = ["price", "timestamp", "volume", "turnover", "avg_price"]
396 # Candlestick - K线
397 elif class_name == "Candlestick":
398 attrs = [
399 "close",
400 "open",
401 "low",
402 "high",
403 "volume",
404 "turnover",
405 "timestamp",
406 "trade_session",
407 ]
409 # SecurityCalcIndex - 计算指标
410 elif class_name == "SecurityCalcIndex":
411 attrs = [
412 "symbol",
413 "last_done",
414 "change_value",
415 "change_rate",
416 "volume",
417 "turnover",
418 "ytd_change_rate",
419 "turnover_rate",
420 "total_market_value",
421 "capital_flow",
422 "amplitude",
423 "volume_ratio",
424 "pe_ttm_ratio",
425 "pb_ratio",
426 "dividend_ratio_ttm",
427 ]
429 # Subscription - 订阅信息
430 elif class_name == "Subscription":
431 attrs = ["symbol", "sub_types", "candlesticks"]
433 # RealtimeQuote - 实时行情
434 elif class_name == "RealtimeQuote":
435 attrs = [
436 "symbol",
437 "last_done",
438 "open",
439 "high",
440 "low",
441 "timestamp",
442 "volume",
443 "turnover",
444 "trade_status",
445 ]
447 # PrePostQuote - 盘前盘后行情
448 elif class_name == "PrePostQuote":
449 attrs = [
450 "last_done",
451 "timestamp",
452 "volume",
453 "turnover",
454 "high",
455 "low",
456 "prev_close",
457 ]
459 # Depth - 深度
460 elif class_name == "Depth":
461 attrs = ["position", "price", "volume", "order_num"]
463 # MarketTradingDays - 交易日
464 elif class_name == "MarketTradingDays":
465 attrs = ["trading_days", "half_trading_days"]
467 else:
468 # 对于未知类型,尝试提取常见属性
469 attrs = [
470 "symbol",
471 "name",
472 "name_cn",
473 "name_en",
474 "name_hk",
475 "exchange",
476 "currency",
477 "lot_size",
478 "total_shares",
479 "last_done",
480 "timestamp",
481 "open",
482 "high",
483 "low",
484 "close",
485 "volume",
486 "turnover",
487 "market",
488 "board",
489 "value",
490 "count",
491 "price",
492 "begin_time",
493 "end_time",
494 "trade_session",
495 "trade_sessions",
496 "asks",
497 "bids",
498 "direction",
499 "trade_type",
500 "avg_price",
501 "change_value",
502 "change_rate",
503 ]
505 # 提取属性
506 extracted_count = 0
507 for attr in attrs:
508 if hasattr(obj, attr):
509 try:
510 value = getattr(obj, attr)
511 # 跳过方法和可调用对象
512 if callable(value):
513 continue
514 # 特殊处理时间类型
515 if hasattr(value, "__class__") and value.__class__.__name__ in [
516 "time",
517 "datetime",
518 "date",
519 ]:
520 try:
521 if value.__class__.__name__ == "time":
522 result[attr] = value.isoformat()
523 elif value.__class__.__name__ == "datetime":
524 import pytz
526 if value.tzinfo is None:
527 eastern = pytz.timezone("US/Eastern")
528 value = eastern.localize(value).astimezone(pytz.UTC)
529 result[attr] = value.isoformat()
530 elif value.__class__.__name__ == "date":
531 result[attr] = value.isoformat()
532 extracted_count += 1
533 continue
534 except:
535 pass
537 if value is not None:
538 result[attr] = safe_serialize_object(value)
539 extracted_count += 1
540 except:
541 pass
543 # 如果成功提取了属性,返回结果
544 if extracted_count > 0:
545 return result
547 # 如果没有提取到任何属性,尝试使用__dict__
548 try:
549 if hasattr(obj, "__dict__"):
550 for key, value in obj.__dict__.items():
551 if not key.startswith("_"): # 跳过私有属性
552 try:
553 result[key] = safe_serialize_object(value)
554 except:
555 result[key] = str(value) if value is not None else None
556 return result
557 except:
558 pass
560 # 最后的备选方案:返回字符串表示
561 try:
562 # 对于特殊类型,尝试直接转换
563 if class_name == "Decimal":
564 try:
565 return float(obj)
566 except:
567 return str(obj)
568 elif class_name == "time":
569 try:
570 return obj.isoformat()
571 except:
572 return str(obj)
573 elif class_name == "datetime":
574 try:
575 import pytz
577 if obj.tzinfo is None:
578 eastern = pytz.timezone("US/Eastern")
579 obj = eastern.localize(obj).astimezone(pytz.UTC)
580 return obj.isoformat()
581 except:
582 return str(obj)
583 elif class_name == "date":
584 try:
585 return obj.isoformat()
586 except:
587 return str(obj)
589 # 尝试直接访问一些可能的属性
590 debug_info = f"<{class_name}: "
591 if hasattr(obj, "symbol"):
592 debug_info += f"symbol={getattr(obj, 'symbol', 'N/A')}, "
593 if hasattr(obj, "name_cn"):
594 debug_info += f"name_cn={getattr(obj, 'name_cn', 'N/A')}, "
595 if hasattr(obj, "last_done"):
596 val = getattr(obj, "last_done", "N/A")
597 if hasattr(val, "__class__") and val.__class__.__name__ == "Decimal":
598 try:
599 val = float(val)
600 except:
601 pass
602 debug_info += f"last_done={val}, "
603 if hasattr(obj, "market"):
604 debug_info += f"market={getattr(obj, 'market', 'N/A')}, "
605 # 特殊处理时间属性
606 if hasattr(obj, "begin_time"):
607 val = getattr(obj, "begin_time", "N/A")
608 if hasattr(val, "__class__") and val.__class__.__name__ == "time":
609 try:
610 val = val.isoformat()
611 except:
612 pass
613 debug_info += f"begin_time={val}, "
614 if hasattr(obj, "end_time"):
615 val = getattr(obj, "end_time", "N/A")
616 if hasattr(val, "__class__") and val.__class__.__name__ == "time":
617 try:
618 val = val.isoformat()
619 except:
620 pass
621 debug_info += f"end_time={val}, "
622 debug_info = debug_info.rstrip(", ") + ">"
623 return debug_info
624 except:
625 return str(obj)
627 # 对于没有类名的对象,返回字符串表示
628 try:
629 return str(obj)
630 except Exception:
631 return f"<{type(obj).__name__} object>"
634def format_response(
635 success: bool, message: str, data: Any = None, error: str = None
636) -> ApiTestResponse:
637 """格式化响应"""
638 # 安全地序列化数据
639 safe_data = safe_serialize_object(data)
641 return ApiTestResponse(
642 success=success, message=message, data=safe_data, error=error
643 )
646def process_symbols_parameter(symbols):
647 """处理symbols参数,支持字符串和列表格式"""
648 if isinstance(symbols, str):
649 return [s.strip() for s in symbols.split(",") if s.strip()]
650 elif isinstance(symbols, list):
651 # 如果列表中有字符串需要分割的情况
652 processed_symbols = []
653 for symbol in symbols:
654 if "," in str(symbol):
655 processed_symbols.extend(
656 [s.strip() for s in str(symbol).split(",") if s.strip()]
657 )
658 else:
659 processed_symbols.append(str(symbol))
660 return processed_symbols
661 return symbols
664# 注意:枚举值转换现在在BaseDataSourceClient的具体实现中处理
665# 这样API测试模块就与具体的券商SDK解耦了
668# 拉取页面API端点
671@router.post("/pull/static-info", response_model=ApiTestResponse)
672async def test_static_info(
673 request: StaticInfoRequest, current_user: User = Depends(get_current_user)
674):
675 """测试获取标的基础信息"""
676 try:
677 # 使用新的业务适配器,保持统一的接口调用
678 adapter = QuoteAdapter(current_user.id)
679 result = adapter.get_static_info(request.symbols)
681 # 格式化结果描述
682 if result:
683 message = f"成功获取 {len(result)} 个标的的基础信息"
684 for info in result[:3]: # 显示前3个
685 symbol = getattr(
686 info,
687 "symbol",
688 info.get("symbol", "N/A") if isinstance(info, dict) else "N/A",
689 )
690 name_cn = getattr(
691 info,
692 "name_cn",
693 info.get("symbol", "N/A") if isinstance(info, dict) else "N/A",
694 )
695 name_en = getattr(
696 info,
697 "name_en",
698 info.get("symbol", "N/A") if isinstance(info, dict) else "N/A",
699 )
700 name = name_cn or name_en
701 message += f"\n• {symbol}: {name}"
702 if len(result) > 3:
703 message += f"\n... 还有 {len(result) - 3} 个标的"
704 else:
705 message = "未获取到任何标的基础信息"
707 return format_response(
708 result is not None and len(result) > 0,
709 message if result else "请求失败",
710 result,
711 )
713 except Exception as e:
714 return format_response(False, f"获取标的基础信息失败", error=str(e))
717@router.post("/pull/quote", response_model=ApiTestResponse)
718async def test_quote(
719 request: QuoteRequest, current_user: User = Depends(get_current_user)
720):
721 """测试获取标的实时行情"""
722 try:
723 adapter = QuoteAdapter(current_user.id)
724 result = adapter.get_quote(request.symbols)
726 # 格式化结果描述
727 if result:
728 message = f"成功获取 {len(result)} 个标的的实时行情"
729 for quote in result[:3]: # 显示前3个
730 symbol = getattr(
731 quote,
732 "symbol",
733 quote.get("symbol", "N/A") if isinstance(quote, dict) else "N/A",
734 )
735 last_done = getattr(
736 quote,
737 "last_done",
738 quote.get("last_done", "N/A") if isinstance(quote, dict) else "N/A",
739 )
740 timestamp = getattr(
741 quote,
742 "timestamp",
743 quote.get("timestamp", "N/A") if isinstance(quote, dict) else "N/A",
744 )
745 message += f"\n• {symbol}: ${last_done} ({timestamp})"
746 if len(result) > 3:
747 message += f"\n... 还有 {len(result) - 3} 个标的"
748 else:
749 message = "未获取到任何实时行情"
751 return format_response(
752 result is not None and len(result) > 0,
753 message if result else "请求失败",
754 result,
755 )
757 except Exception as e:
758 return format_response(False, f"获取实时行情失败", error=str(e))
761@router.post("/pull/depth", response_model=ApiTestResponse)
762async def test_depth(
763 request: DepthRequest, current_user: User = Depends(get_current_user)
764):
765 """测试获取标的盘口"""
766 try:
767 adapter = QuoteAdapter(current_user.id)
768 result = adapter.get_depth(request.symbol)
770 # 格式化结果描述
771 if result:
772 ask_count = len(result.get("asks", [])) if result.get("asks") else 0
773 bid_count = len(result.get("bids", [])) if result.get("bids") else 0
774 message = f"成功获取 {request.symbol} 的盘口数据\n• 卖盘档位: {ask_count}\n• 买盘档位: {bid_count}"
775 else:
776 message = f"未获取到 {request.symbol} 的盘口数据"
778 return format_response(
779 result is not None, message if result else "请求失败", result
780 )
782 except Exception as e:
783 return format_response(False, f"获取盘口数据失败", error=str(e))
786@router.post("/pull/trades", response_model=ApiTestResponse)
787async def test_trades(
788 request: TradesRequest, current_user: User = Depends(get_current_user)
789):
790 """测试获取标的成交明细"""
791 try:
792 adapter = QuoteAdapter(current_user.id)
793 result = adapter.get_trades(request.symbol, request.count)
795 # 格式化结果描述
796 if result:
797 message = f"成功获取 {request.symbol} 的 {len(result)} 条成交明细"
798 if result:
799 latest = result[0]
800 price = getattr(
801 latest,
802 "price",
803 latest.get("price", "N/A") if isinstance(latest, dict) else "N/A",
804 )
805 volume = getattr(
806 latest,
807 "volume",
808 latest.get("volume", "N/A") if isinstance(latest, dict) else "N/A",
809 )
810 timestamp = getattr(
811 latest,
812 "timestamp",
813 (
814 latest.get("timestamp", "N/A")
815 if isinstance(latest, dict)
816 else "N/A"
817 ),
818 )
819 message += f"\n• 最新成交: ${price} x {volume} ({timestamp})"
820 else:
821 message = f"未获取到 {request.symbol} 的成交明细"
823 return format_response(
824 result is not None and len(result) > 0,
825 message if result else "请求失败",
826 result,
827 )
829 except Exception as e:
830 return format_response(False, f"获取成交明细失败", error=str(e))
833@router.post("/pull/intraday", response_model=ApiTestResponse)
834async def test_intraday(
835 request: IntradayRequest, current_user: User = Depends(get_current_user)
836):
837 """测试获取标的分时"""
838 try:
839 adapter = QuoteAdapter(current_user.id)
840 result = adapter.get_candlesticks(
841 request.symbol, "Min_1", 100, "NoAdjust", request.trade_sessions
842 )
844 # 格式化结果描述
845 if result:
846 message = f"成功获取 {request.symbol} 的 {len(result)} 个分时数据点"
847 if result:
848 latest = result[-1]
849 price = getattr(
850 latest,
851 "price",
852 latest.get("price", "N/A") if isinstance(latest, dict) else "N/A",
853 )
854 timestamp = getattr(
855 latest,
856 "timestamp",
857 (
858 latest.get("timestamp", "N/A")
859 if isinstance(latest, dict)
860 else "N/A"
861 ),
862 )
863 message += f"\n• 最新价格: ${price} ({timestamp})"
864 else:
865 message = f"未获取到 {request.symbol} 的分时数据"
867 return format_response(
868 result is not None and len(result) > 0,
869 message if result else "请求失败",
870 result,
871 )
873 except Exception as e:
874 return format_response(False, f"获取分时数据失败", error=str(e))
877@router.post("/pull/candlesticks", response_model=ApiTestResponse)
878async def test_candlesticks(
879 request: CandlesticksRequest, current_user: User = Depends(get_current_user)
880):
881 """测试获取标的K线"""
882 try:
883 adapter = QuoteAdapter(current_user.id)
884 result = adapter.get_candlesticks(
885 request.symbol,
886 request.period,
887 request.count,
888 request.adjust_type,
889 request.trade_sessions,
890 )
892 # 格式化结果描述
893 if result:
894 message = f"成功获取 {request.symbol} 的 {len(result)} 根K线数据"
895 if result:
896 latest = result[-1]
897 open_price = getattr(
898 latest,
899 "open",
900 latest.get("open", "N/A") if isinstance(latest, dict) else "N/A",
901 )
902 high_price = getattr(
903 latest,
904 "high",
905 latest.get("high", "N/A") if isinstance(latest, dict) else "N/A",
906 )
907 low_price = getattr(
908 latest,
909 "low",
910 latest.get("low", "N/A") if isinstance(latest, dict) else "N/A",
911 )
912 close_price = getattr(
913 latest,
914 "close",
915 latest.get("close", "N/A") if isinstance(latest, dict) else "N/A",
916 )
917 message += f"\n• 最新K线: 开${open_price} 高${high_price} 低${low_price} 收${close_price}"
918 else:
919 message = f"未获取到 {request.symbol} 的K线数据"
921 return format_response(
922 result is not None and len(result) > 0,
923 message if result else "请求失败",
924 result,
925 )
927 except Exception as e:
928 return format_response(False, f"获取K线数据失败", error=str(e))
931@router.post("/pull/trading-days", response_model=ApiTestResponse)
932async def test_trading_days(
933 request: TradingDaysRequest, current_user: User = Depends(get_current_user)
934):
935 """测试获取市场交易日"""
936 try:
937 adapter = QuoteAdapter(current_user.id)
938 result = adapter.get_trading_days(request.market, request.begin, request.end)
940 # 格式化结果描述
941 if result:
942 trading_days = getattr(
943 result,
944 "trading_days",
945 result.get("trading_days", []) if isinstance(result, dict) else [],
946 )
947 half_trading_days = getattr(
948 result,
949 "half_trading_days",
950 result.get("half_trading_days", []) if isinstance(result, dict) else [],
951 )
952 trading_count = len(trading_days)
953 half_trading_count = len(half_trading_days)
954 message = f"成功获取 {request.market} 市场交易日信息\n• 交易日: {trading_count} 天\n• 半日交易: {half_trading_count} 天"
955 else:
956 message = f"未获取到 {request.market} 市场的交易日信息"
958 return format_response(
959 result is not None, message if result else "请求失败", result
960 )
962 except Exception as e:
963 return format_response(False, f"获取市场交易日失败", error=str(e))
966@router.post("/pull/trading-session", response_model=ApiTestResponse)
967async def test_trading_session(current_user: User = Depends(get_current_user)):
968 """测试获取各市场当日交易时段"""
969 try:
970 adapter = QuoteAdapter(current_user.id)
971 result = adapter.get_trading_session()
973 # 格式化结果描述
974 if result:
975 message = f"成功获取 {len(result)} 个市场的交易时段信息"
976 for session in result[:3]: # 显示前3个
977 trade_sessions = getattr(
978 session,
979 "trade_sessions",
980 (
981 session.get("trade_sessions", [])
982 if isinstance(session, dict)
983 else []
984 ),
985 )
986 market = getattr(
987 session,
988 "market",
989 (
990 session.get("market", "N/A")
991 if isinstance(session, dict)
992 else "N/A"
993 ),
994 )
995 session_count = len(trade_sessions) if trade_sessions else 0
996 message += f"\n• {market}: {session_count} 个交易时段"
997 if len(result) > 3:
998 message += f"\n... 还有 {len(result) - 3} 个市场"
999 else:
1000 message = "未获取到交易时段信息"
1002 return format_response(
1003 result is not None and len(result) > 0,
1004 message if result else "请求失败",
1005 result,
1006 )
1008 except Exception as e:
1009 return format_response(False, f"获取交易时段失败", error=str(e))
1012@router.post("/pull/calc-indexes", response_model=ApiTestResponse)
1013async def test_calc_indexes(
1014 request: CalcIndexesRequest, current_user: User = Depends(get_current_user)
1015):
1016 """测试获取标的计算指标"""
1017 try:
1018 adapter = QuoteAdapter(current_user.id)
1019 result = adapter.get_calc_indexes(request.symbols, request.indexes)
1021 # 格式化结果描述
1022 if result:
1023 message = f"成功获取 {len(result)} 个标的的计算指标"
1024 for calc in result[:3]: # 显示前3个
1025 symbol = getattr(
1026 calc,
1027 "symbol",
1028 calc.get("symbol", "N/A") if isinstance(calc, dict) else "N/A",
1029 )
1030 last_done = getattr(
1031 calc,
1032 "last_done",
1033 calc.get("symbol", "N/A") if isinstance(calc, dict) else "N/A",
1034 )
1035 message += f"\n• {symbol}: 最新价 ${last_done or 'N/A'}"
1036 if len(result) > 3:
1037 message += f"\n... 还有 {len(result) - 3} 个标的"
1038 else:
1039 message = "未获取到计算指标"
1041 return format_response(
1042 result is not None and len(result) > 0,
1043 message if result else "请求失败",
1044 result,
1045 )
1047 except Exception as e:
1048 return format_response(False, f"获取计算指标失败", error=str(e))
1051@router.post("/pull/history-candlesticks", response_model=ApiTestResponse)
1052async def test_history_candlesticks(
1053 request: HistoryCandlesticksRequest, current_user: User = Depends(get_current_user)
1054):
1055 """测试获取标的历史K线"""
1056 try:
1057 adapter = QuoteAdapter(current_user.id)
1058 result = adapter.get_candlesticks(
1059 request.symbol,
1060 request.period,
1061 1000,
1062 request.adjust_type,
1063 request.trade_sessions,
1064 )
1066 # 格式化结果描述
1067 if result:
1068 message = f"成功获取 {request.symbol} 的 {len(result)} 根历史K线数据"
1069 if result:
1070 first = result[0]
1071 last = result[-1]
1072 first_time = getattr(
1073 first,
1074 "timestamp",
1075 first.get("timestamp", "N/A") if isinstance(first, dict) else "N/A",
1076 )
1077 last_time = getattr(
1078 last,
1079 "timestamp",
1080 last.get("timestamp", "N/A") if isinstance(last, dict) else "N/A",
1081 )
1082 message += f"\n• 时间范围: {first_time} 至 {last_time}"
1083 else:
1084 message = f"未获取到 {request.symbol}: 的历史K线数据"
1086 return format_response(
1087 result is not None and len(result) > 0,
1088 message if result else "请求失败",
1089 result,
1090 )
1092 except Exception as e:
1093 return format_response(False, f"获取历史K线失败", error=str(e))
1096# 订阅推送页面API端点
1099@router.post("/subscription/subscriptions", response_model=ApiTestResponse)
1100async def test_subscriptions(current_user: User = Depends(get_current_user)):
1101 """测试获取已订阅标的行情"""
1102 try:
1103 adapter = QuoteAdapter(current_user.id)
1104 result = adapter.get_subscription_summary()
1106 # 格式化结果描述
1107 subscriptions = result.get("subscriptions", [])
1108 count = result.get("count", 0)
1110 if count > 0:
1111 message = f"当前已订阅 {count} 个标的"
1112 if count > 3:
1113 message += f"\n... 还有 {count - 3} 个标的"
1114 else:
1115 message = "当前没有订阅任何标的"
1117 return format_response(count > 0, message, subscriptions)
1119 except Exception as e:
1120 return format_response(False, f"获取订阅信息失败", error=str(e))
1123@router.post("/subscription/subscribe", response_model=ApiTestResponse)
1124async def test_subscribe(
1125 request: SubscribeRequest, current_user: User = Depends(get_current_user)
1126):
1127 """测试订阅行情数据"""
1128 try:
1129 adapter = QuoteAdapter(current_user.id)
1130 success = adapter.subscribe(
1131 request.symbols, request.sub_types, request.is_first_push
1132 )
1134 # 格式化结果描述
1135 symbols_str = ", ".join(request.symbols)
1136 sub_types_str = ", ".join(request.sub_types)
1137 message = (
1138 f"成功订阅行情数据\n• 标的: {symbols_str}\n• 类型: {sub_types_str}\n• 立即推送: {'是' if request.is_first_push else '否'}"
1139 if success
1140 else "订阅失败"
1141 )
1143 return format_response(
1144 success,
1145 message,
1146 {
1147 "symbols": request.symbols,
1148 "sub_types": request.sub_types,
1149 "is_first_push": request.is_first_push,
1150 },
1151 )
1153 except Exception as e:
1154 return format_response(False, f"订阅行情数据失败", error=str(e))
1157@router.post("/subscription/unsubscribe", response_model=ApiTestResponse)
1158async def test_unsubscribe(
1159 request: UnsubscribeRequest, current_user: User = Depends(get_current_user)
1160):
1161 """测试取消订阅行情数据"""
1162 try:
1163 adapter = QuoteAdapter(current_user.id)
1164 success = adapter.unsubscribe(request.symbols, request.sub_types)
1166 # 格式化结果描述
1167 symbols_str = ", ".join(request.symbols)
1168 sub_types_str = ", ".join(request.sub_types)
1169 message = (
1170 f"成功取消订阅行情数据\n• 标的: {symbols_str}\n• 类型: {sub_types_str}"
1171 if success
1172 else "取消订阅失败"
1173 )
1175 return format_response(
1176 success,
1177 message,
1178 {"symbols": request.symbols, "sub_types": request.sub_types},
1179 )
1181 except Exception as e:
1182 return format_response(False, f"取消订阅失败", error=str(e))
1185@router.post("/subscription/realtime-quote", response_model=ApiTestResponse)
1186async def test_realtime_quote(
1187 request: RealtimeQuoteRequest, current_user: User = Depends(get_current_user)
1188):
1189 """测试实时价格推送"""
1190 try:
1191 adapter = QuoteAdapter(current_user.id)
1192 result = adapter.get_realtime_quote(request.symbols)
1194 # 格式化结果描述
1195 if result:
1196 message = f"成功获取 {len(result)} 个标的的实时价格"
1197 for quote in result[:3]: # 显示前3个
1198 symbol = getattr(
1199 quote,
1200 "symbol",
1201 quote.get("symbol", "N/A") if isinstance(quote, dict) else "N/A",
1202 )
1203 last_done = getattr(
1204 quote,
1205 "last_done",
1206 quote.get("last_done", "N/A") if isinstance(quote, dict) else "N/A",
1207 )
1208 timestamp = getattr(
1209 quote,
1210 "timestamp",
1211 quote.get("timestamp", "N/A") if isinstance(quote, dict) else "N/A",
1212 )
1213 message += f"\n• {symbol}: ${last_done} ({timestamp})"
1214 if len(result) > 3:
1215 message += f"\n... 还有 {len(result) - 3} 个标的"
1216 else:
1217 message = "未获取到实时价格数据(可能需要先订阅)"
1219 return format_response(
1220 result is not None and len(result) > 0,
1221 message if result else "请求失败",
1222 result,
1223 )
1225 except Exception as e:
1226 return format_response(False, f"获取实时价格失败", error=str(e))
1229@router.post("/subscription/realtime-depth", response_model=ApiTestResponse)
1230async def test_realtime_depth(
1231 request: RealtimeDepthRequest, current_user: User = Depends(get_current_user)
1232):
1233 """测试实时盘口推送"""
1234 try:
1235 adapter = QuoteAdapter(current_user.id)
1236 result = adapter.get_realtime_depth(request.symbol)
1238 # 格式化结果描述
1239 if result:
1240 asks = getattr(
1241 result,
1242 "asks",
1243 result.get("asks", []) if isinstance(result, dict) else [],
1244 )
1245 bids = getattr(
1246 result,
1247 "bids",
1248 result.get("bids", []) if isinstance(result, dict) else [],
1249 )
1250 ask_count = len(asks) if asks else 0
1251 bid_count = len(bids) if bids else 0
1252 message = f"成功获取 {request.symbol} 的实时盘口数据\n• 卖盘档位: {ask_count}\n• 买盘档位: {bid_count}"
1253 else:
1254 message = f"未获取到 {request.symbol} 的实时盘口数据(可能需要先订阅)"
1256 return format_response(
1257 result is not None, message if result else "请求失败", result
1258 )
1260 except Exception as e:
1261 return format_response(False, f"获取实时盘口失败", error=str(e))
1264@router.post("/subscription/realtime-trades", response_model=ApiTestResponse)
1265async def test_realtime_trades(
1266 request: RealtimeTradesRequest, current_user: User = Depends(get_current_user)
1267):
1268 """测试实时成交明细推送"""
1269 try:
1270 adapter = QuoteAdapter(current_user.id)
1271 result = adapter.get_realtime_trades(request.symbol, request.count)
1273 # 格式化结果描述
1274 if result:
1275 message = f"成功获取 {request.symbol} 的 {len(result)} 条实时成交明细"
1276 if result:
1277 latest = result[0]
1278 price = getattr(
1279 latest,
1280 "price",
1281 latest.get("price", "N/A") if isinstance(latest, dict) else "N/A",
1282 )
1283 volume = getattr(
1284 latest,
1285 "volume",
1286 latest.get("volume", "N/A") if isinstance(latest, dict) else "N/A",
1287 )
1288 timestamp = getattr(
1289 latest,
1290 "timestamp",
1291 (
1292 latest.get("timestamp", "N/A")
1293 if isinstance(latest, dict)
1294 else "N/A"
1295 ),
1296 )
1297 message += f"\n• 最新成交: ${price} x {volume} ({timestamp})"
1298 else:
1299 message = f"未获取到 {request.symbol} 的实时成交明细(可能需要先订阅)"
1301 return format_response(
1302 result is not None and len(result) > 0,
1303 message if result else "请求失败",
1304 result,
1305 )
1307 except Exception as e:
1308 return format_response(False, f"获取实时成交明细失败", error=str(e))