Files
SupervisorAI/api/routes/video_urls.py
2026-01-23 09:38:43 +08:00

112 lines
3.0 KiB
Python

"""
视频地址API路由
"""
from typing import List
from fastapi import APIRouter, HTTPException, status
from pydantic import BaseModel
import requests
import urllib3
from hikvision_openapi_signer import HikvisionOpenAPISigner
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# 初始化签名器
signer = HikvisionOpenAPISigner(
"https://29.2.19.19:443",
"28031410",
"Y79rv6LqBahxoEH6adLv",
headers={'tagId': '0'} # 根据平台要求设置
)
# 创建路由器
router = APIRouter(
prefix="/video-urls",
tags=["视频地址管理"],
responses={
404: {"description": "资源不存在"},
400: {"description": "请求参数错误"},
500: {"description": "服务器内部错误"}
}
)
class VideoUrlRequest(BaseModel):
"""获取视频地址请求模型"""
videoId: List[str]
class VideoUrlResponse(BaseModel):
"""获取视频地址响应模型"""
videoUrl: List[str]
def get_video_preview_url(video_id: str):
"""
获取单个视频的预览地址
参数:
video_id: 视频ID
返回:
视频预览地址字符串
"""
try:
# 签名一个请求
request = signer.sign(
'POST',
'/api/video/v1/cameras/previewURLs',
jsons={'cameraIndexCode': video_id, 'protocol': 'hls', 'expand': 'transcode=1'},
accept='application/json'
)
method, url, headers, body = request
# 发送请求
response = requests.request(method, url, headers=headers, data=body, verify=False)
result = response.json()
# 提取url
if 'data' in result and 'url' in result['data']:
return result['data']['url']
else:
raise ValueError(f"未找到视频地址: {result}")
except Exception as e:
raise ValueError(f"获取视频地址失败: {str(e)}")
@router.post(
"/",
response_model=VideoUrlResponse,
status_code=status.HTTP_200_OK,
summary="获取视频地址",
description="根据视频ID数组获取对应的视频地址数组"
)
async def get_video_urls(request: VideoUrlRequest):
"""
获取视频地址
- **videoId**: 视频ID数组 (必须,字符串数组)
返回:
- **videoUrl**: 视频地址数组 (字符串数组)
"""
try:
video_urls = []
# 批量获取视频地址
for video_id in request.videoId:
try:
video_url = get_video_preview_url(video_id)
video_urls.append(video_url)
except Exception as e:
# 单个视频获取失败时记录错误,但继续处理其他视频
video_urls.append(f"net_error_url")
return VideoUrlResponse(videoUrl=video_urls)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"批量获取视频地址失败: {str(e)}"
)