完成获取视频url接口
This commit is contained in:
@@ -5,6 +5,19 @@
|
||||
from typing import List
|
||||
from fastapi import APIRouter, HTTPException, status
|
||||
from pydantic import BaseModel
|
||||
import requests
|
||||
import urllib3
|
||||
from hikvision_openapi_signer import HikvisionOpenAPISigner
|
||||
|
||||
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
||||
|
||||
# 初始化签名器
|
||||
signer = HikvisionOpenAPISigner(
|
||||
"https://29.2.19.19:443",
|
||||
"28031410",
|
||||
"Y79rv6LqBahxoEH6adLv",
|
||||
headers={'tagId': '0'} # 根据平台要求设置
|
||||
)
|
||||
|
||||
# 创建路由器
|
||||
router = APIRouter(
|
||||
@@ -28,6 +41,40 @@ class VideoUrlResponse(BaseModel):
|
||||
videoUrl: List[str]
|
||||
|
||||
|
||||
def get_video_preview_url(video_id: str):
|
||||
"""
|
||||
获取单个视频的预览地址
|
||||
|
||||
参数:
|
||||
video_id: 视频ID
|
||||
|
||||
返回:
|
||||
视频预览地址字符串
|
||||
"""
|
||||
try:
|
||||
# 签名一个请求
|
||||
request = signer.sign(
|
||||
'POST',
|
||||
'/api/video/v1/cameras/previewURLs',
|
||||
jsons={'cameraIndexCode': video_id, 'protocol': 'hls', 'expand': 'transcode=1'},
|
||||
accept='application/json'
|
||||
)
|
||||
method, url, headers, body = request
|
||||
|
||||
# 发送请求
|
||||
response = requests.request(method, url, headers=headers, data=body, verify=False)
|
||||
result = response.json()
|
||||
|
||||
# 提取url
|
||||
if 'data' in result and 'url' in result['data']:
|
||||
return result['data']['url']
|
||||
else:
|
||||
raise ValueError(f"未找到视频地址: {result}")
|
||||
|
||||
except Exception as e:
|
||||
raise ValueError(f"获取视频地址失败: {str(e)}")
|
||||
|
||||
|
||||
@router.post(
|
||||
"/",
|
||||
response_model=VideoUrlResponse,
|
||||
@@ -45,18 +92,21 @@ async def get_video_urls(request: VideoUrlRequest):
|
||||
- **videoUrl**: 视频地址数组 (字符串数组)
|
||||
"""
|
||||
try:
|
||||
# TODO: 后续实现根据videoId查询视频地址的逻辑
|
||||
# 目前先返回模拟数据
|
||||
video_urls = []
|
||||
|
||||
# 批量获取视频地址
|
||||
for video_id in request.videoId:
|
||||
# 模拟根据videoId生成视频地址
|
||||
video_url = f"http://example.com/videos/{video_id}.mp4"
|
||||
video_urls.append(video_url)
|
||||
try:
|
||||
video_url = get_video_preview_url(video_id)
|
||||
video_urls.append(video_url)
|
||||
except Exception as e:
|
||||
# 单个视频获取失败时记录错误,但继续处理其他视频
|
||||
video_urls.append(f"net_error_url")
|
||||
|
||||
return VideoUrlResponse(videoUrl=video_urls)
|
||||
|
||||
except Exception as e:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail=f"获取视频地址失败: {str(e)}"
|
||||
detail=f"批量获取视频地址失败: {str(e)}"
|
||||
)
|
||||
Reference in New Issue
Block a user