Coverage for api/v1/endpoints/stock.py: 80.70%

57 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-13 18:58 +0000

1""" 

2股票数据API端点 

3""" 

4 

5from typing import Optional 

6 

7import redis 

8from fastapi import APIRouter, Depends, HTTPException, Query 

9 

10from core.models.stock import (StockCodeList, StockData, StockDataCreate, 

11 StockDataFilter, StockDataResponse, 

12 TradeSession) 

13from core.repositories.stock_repository import StockRepository 

14from infrastructure.database.redis_client import get_redis 

15 

16router = APIRouter() 

17 

18 

19def get_stock_repository() -> StockRepository: 

20 """获取股票数据仓库实例""" 

21 redis_client = get_redis() 

22 return StockRepository(redis_client) 

23 

24 

25@router.post("/stock-data", response_model=dict) 

26async def create_stock_data( 

27 stock_data: StockDataCreate, 

28 stock_repository: StockRepository = Depends(get_stock_repository), 

29): 

30 """创建股票数据""" 

31 success = await stock_repository.create_stock_data(stock_data) 

32 if success: 

33 return {"message": "股票数据创建成功", "success": True} 

34 else: 

35 raise HTTPException(status_code=500, detail="股票数据创建失败") 

36 

37 

38@router.get("/stock-data", response_model=StockDataResponse) 

39async def get_stock_data_list( 

40 code: Optional[str] = Query(None, description="股票代码筛选"), 

41 start_date: Optional[str] = Query(None, description="开始日期(ISO格式)"), 

42 end_date: Optional[str] = Query(None, description="结束日期(ISO格式)"), 

43 trade_session: Optional[TradeSession] = Query(None, description="交易时段筛选"), 

44 sort_by: str = Query("timestamp", description="排序字段"), 

45 sort_order: str = Query("asc", description="排序方式: asc/desc"), 

46 page: int = Query(1, description="页码", ge=1), 

47 page_size: int = Query(120, description="每页数量", ge=1, le=1440), 

48 timezone: str = Query("Asia/Shanghai", description="时区"), 

49 stock_repository: StockRepository = Depends(get_stock_repository), 

50): 

51 """获取股票数据列表""" 

52 # 处理ISO格式的日期字符串 

53 start_timestamp = None 

54 end_timestamp = None 

55 

56 if start_date: 

57 from datetime import datetime 

58 

59 start_dt = datetime.fromisoformat(start_date.replace("Z", "+00:00")) 

60 start_timestamp = int(start_dt.timestamp()) 

61 

62 if end_date: 

63 from datetime import datetime 

64 

65 end_dt = datetime.fromisoformat(end_date.replace("Z", "+00:00")) 

66 end_timestamp = int(end_dt.timestamp()) 

67 

68 filter_params = StockDataFilter( 

69 code=code, 

70 start_timestamp=start_timestamp, 

71 end_timestamp=end_timestamp, 

72 trade_session=trade_session, 

73 sort_by=sort_by, 

74 sort_order=sort_order, 

75 page=page, 

76 page_size=page_size, 

77 timezone=timezone, 

78 ) 

79 

80 return await stock_repository.get_stock_data_list(filter_params) 

81 

82 

83@router.get("/stock-data/{code}/{timestamp}", response_model=StockData) 

84async def get_stock_data( 

85 code: str, 

86 timestamp: int, 

87 stock_repository: StockRepository = Depends(get_stock_repository), 

88): 

89 """获取单条股票数据""" 

90 stock_data = await stock_repository.get_stock_data(code, timestamp) 

91 if not stock_data: 

92 raise HTTPException(status_code=404, detail="股票数据不存在") 

93 return stock_data 

94 

95 

96@router.get("/stock-codes", response_model=StockCodeList) 

97async def get_stock_codes( 

98 stock_repository: StockRepository = Depends(get_stock_repository), 

99): 

100 """获取所有股票代码""" 

101 return await stock_repository.get_stock_codes() 

102 

103 

104@router.delete("/stock-data/{code}") 

105async def delete_stock_data( 

106 code: str, 

107 start_date: str = Query(..., description="开始日期(ISO格式)"), 

108 end_date: str = Query(..., description="结束日期(ISO格式)"), 

109 timezone: str = Query("America/New_York", description="时区"), 

110 stock_repository: StockRepository = Depends(get_stock_repository), 

111): 

112 """删除指定时间范围内的股票数据""" 

113 try: 

114 # 处理ISO格式的日期字符串 

115 from datetime import datetime 

116 

117 import pytz 

118 

119 # 解析时区 

120 tz = pytz.timezone(timezone) 

121 

122 # 解析日期时间 

123 start_dt = datetime.fromisoformat(start_date.replace("Z", "+00:00")) 

124 end_dt = datetime.fromisoformat(end_date.replace("Z", "+00:00")) 

125 

126 # 转换为指定时区 

127 start_dt = start_dt.astimezone(tz) 

128 end_dt = end_dt.astimezone(tz) 

129 

130 # 转换为时间戳 

131 start_timestamp = int(start_dt.timestamp()) 

132 end_timestamp = int(end_dt.timestamp()) 

133 

134 success = await stock_repository.delete_stock_data( 

135 code, start_timestamp, end_timestamp 

136 ) 

137 if success: 

138 return {"message": "股票数据删除成功", "success": True} 

139 else: 

140 raise HTTPException(status_code=500, detail="股票数据删除失败") 

141 except Exception as e: 

142 raise HTTPException(status_code=400, detail=f"日期解析失败: {str(e)}")