import requests import urllib3 import yaml from hikvision_openapi_signer import HikvisionOpenAPISigner urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # 初始化签名器 signer = HikvisionOpenAPISigner( "https://29.2.19.19:443", "28031410", "Y79rv6LqBahxoEH6adLv", headers={'tagId': '0'} # 根据平台要求设置 ) # def get_camera_preview_url(camera_index_code): # # 签名一个请求 # request = signer.sign( # 'POST', # '/api/video/v1/cameras/previewURLs', # jsons={'cameraIndexCode': camera_index_code, 'protocol': 'rtsp', 'expand': 'streamform=rtp'}, # accept='application/json' # ) # method, url, headers, body = request # # 发送请求 # response = requests.request(method, url, headers=headers, data=body, verify=False) # return response.json() # for test def get_camera_preview_url(camera_index_code): """测试用的方法,直接返回固定结果""" return { "code": "0", "msg": "SUCCESS", "data": { # "url": "rtsp://192.168.110.139:8554/test" "url": "rtsp://192.168.43.33:8554/test" # "url": "rtsp://localhost:8554/test" } } def get_camera_hls_url(camera_index_code, stream_type = 0): """测试用的方法,直接返回固定结果""" return { "code": "0", "msg": "SUCCESS", "data": { "url": "https://demo.unified-streaming.com/k8s/live/scte35.isml/.m3u8" } } # # 读取 config.yaml # with open('config.yaml', 'r', encoding='utf-8') as f: # config = yaml.safe_load(f) # # # 遍历所有摄像头 # for camera in config['cameras']: # if 'index' in camera: # index = camera['index'] # print(f"正在获取摄像头 {camera['name']} (index: {index}) 的预览地址...") # result = get_camera_preview_url(index) # print(f"API返回结果: {result}") # # # 提取 url 并更新到 config # if 'data' in result and 'url' in result['data']: # rtsp_url = result['data']['url'] # camera['rtsp_url'] = rtsp_url # print(f"更新 rtsp_url: {rtsp_url}") # else: # print(f"未找到 url 在返回结果中") # # # 保存更新后的 config.yaml # with open('config.yaml', 'w', encoding='utf-8') as f: # yaml.dump(config, f, default_flow_style=False, allow_unicode=True) # # print("config.yaml 已更新")