Coverage for api/v1/endpoints/stock.py: 80.70%
57 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-13 18:58 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-13 18:58 +0000
1"""
2股票数据API端点
3"""
5from typing import Optional
7import redis
8from fastapi import APIRouter, Depends, HTTPException, Query
10from core.models.stock import (StockCodeList, StockData, StockDataCreate,
11 StockDataFilter, StockDataResponse,
12 TradeSession)
13from core.repositories.stock_repository import StockRepository
14from infrastructure.database.redis_client import get_redis
16router = APIRouter()
19def get_stock_repository() -> StockRepository:
20 """获取股票数据仓库实例"""
21 redis_client = get_redis()
22 return StockRepository(redis_client)
25@router.post("/stock-data", response_model=dict)
26async def create_stock_data(
27 stock_data: StockDataCreate,
28 stock_repository: StockRepository = Depends(get_stock_repository),
29):
30 """创建股票数据"""
31 success = await stock_repository.create_stock_data(stock_data)
32 if success:
33 return {"message": "股票数据创建成功", "success": True}
34 else:
35 raise HTTPException(status_code=500, detail="股票数据创建失败")
38@router.get("/stock-data", response_model=StockDataResponse)
39async def get_stock_data_list(
40 code: Optional[str] = Query(None, description="股票代码筛选"),
41 start_date: Optional[str] = Query(None, description="开始日期(ISO格式)"),
42 end_date: Optional[str] = Query(None, description="结束日期(ISO格式)"),
43 trade_session: Optional[TradeSession] = Query(None, description="交易时段筛选"),
44 sort_by: str = Query("timestamp", description="排序字段"),
45 sort_order: str = Query("asc", description="排序方式: asc/desc"),
46 page: int = Query(1, description="页码", ge=1),
47 page_size: int = Query(120, description="每页数量", ge=1, le=1440),
48 timezone: str = Query("Asia/Shanghai", description="时区"),
49 stock_repository: StockRepository = Depends(get_stock_repository),
50):
51 """获取股票数据列表"""
52 # 处理ISO格式的日期字符串
53 start_timestamp = None
54 end_timestamp = None
56 if start_date:
57 from datetime import datetime
59 start_dt = datetime.fromisoformat(start_date.replace("Z", "+00:00"))
60 start_timestamp = int(start_dt.timestamp())
62 if end_date:
63 from datetime import datetime
65 end_dt = datetime.fromisoformat(end_date.replace("Z", "+00:00"))
66 end_timestamp = int(end_dt.timestamp())
68 filter_params = StockDataFilter(
69 code=code,
70 start_timestamp=start_timestamp,
71 end_timestamp=end_timestamp,
72 trade_session=trade_session,
73 sort_by=sort_by,
74 sort_order=sort_order,
75 page=page,
76 page_size=page_size,
77 timezone=timezone,
78 )
80 return await stock_repository.get_stock_data_list(filter_params)
83@router.get("/stock-data/{code}/{timestamp}", response_model=StockData)
84async def get_stock_data(
85 code: str,
86 timestamp: int,
87 stock_repository: StockRepository = Depends(get_stock_repository),
88):
89 """获取单条股票数据"""
90 stock_data = await stock_repository.get_stock_data(code, timestamp)
91 if not stock_data:
92 raise HTTPException(status_code=404, detail="股票数据不存在")
93 return stock_data
96@router.get("/stock-codes", response_model=StockCodeList)
97async def get_stock_codes(
98 stock_repository: StockRepository = Depends(get_stock_repository),
99):
100 """获取所有股票代码"""
101 return await stock_repository.get_stock_codes()
104@router.delete("/stock-data/{code}")
105async def delete_stock_data(
106 code: str,
107 start_date: str = Query(..., description="开始日期(ISO格式)"),
108 end_date: str = Query(..., description="结束日期(ISO格式)"),
109 timezone: str = Query("America/New_York", description="时区"),
110 stock_repository: StockRepository = Depends(get_stock_repository),
111):
112 """删除指定时间范围内的股票数据"""
113 try:
114 # 处理ISO格式的日期字符串
115 from datetime import datetime
117 import pytz
119 # 解析时区
120 tz = pytz.timezone(timezone)
122 # 解析日期时间
123 start_dt = datetime.fromisoformat(start_date.replace("Z", "+00:00"))
124 end_dt = datetime.fromisoformat(end_date.replace("Z", "+00:00"))
126 # 转换为指定时区
127 start_dt = start_dt.astimezone(tz)
128 end_dt = end_dt.astimezone(tz)
130 # 转换为时间戳
131 start_timestamp = int(start_dt.timestamp())
132 end_timestamp = int(end_dt.timestamp())
134 success = await stock_repository.delete_stock_data(
135 code, start_timestamp, end_timestamp
136 )
137 if success:
138 return {"message": "股票数据删除成功", "success": True}
139 else:
140 raise HTTPException(status_code=500, detail="股票数据删除失败")
141 except Exception as e:
142 raise HTTPException(status_code=400, detail=f"日期解析失败: {str(e)}")