Files
SupervisorAI/live_catalog/live_catalog_backend.py
2026-04-02 13:17:30 +08:00

270 lines
9.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import urllib.parse
import socket
import json
from http.server import ThreadingHTTPServer, SimpleHTTPRequestHandler
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from utils.hikvision_cam_utils import get_organization_list, get_final_list, get_camera_preview_url
# ========== 海康威视 API 配置 ==========
ROOT_PARENT_INDEX_CODE = "4fa15af07b6b400f94af1e35d8235c30"
def transform_org_node(item, parent_id=None):
"""将海康威视组织节点转换为前端期望的格式"""
return {
"id": item["indexCode"],
"name": item["name"],
"parent_id": parent_id or item.get("parentIndexCode"),
"is_leaf": False, # 组织机构节点不是叶子节点
"stream_url": None
}
def transform_camera_node(item, parent_id=None):
"""将海康威视摄像头节点转换为前端期望的格式(叶子节点)"""
return {
"id": item["cameraIndexCode"],
"name": item["name"],
"parent_id": parent_id,
"is_leaf": True, # 摄像头是叶子节点
"stream_url": None
}
def get_children(parent_id):
"""返回父节点下的直接子节点列表(从海康威视 API 获取)
逻辑:
1. 先调用 get_organization_list 获取子组织
2. 如果返回 list 为空,则调用 get_final_list 获取摄像头(叶子节点)
"""
if parent_id is None:
parent_id = ROOT_PARENT_INDEX_CODE
try:
# 先尝试获取子组织
result = get_organization_list(parent_id)
if result.get("code") != "0":
print(f"海康威视 API 返回错误: {result.get('msg')}")
return []
items = result.get("data", {}).get("list", [])
# 如果有子组织,返回组织节点
if items:
return [transform_org_node(item, parent_id) for item in items]
# 如果没有子组织,尝试获取摄像头列表(叶子节点)
print(f"组织 {parent_id} 无下级组织,尝试获取摄像头列表...")
final_result = get_final_list(parent_id)
if final_result.get("code") != "0":
print(f"获取摄像头列表失败: {final_result.get('msg')}")
return []
camera_items = final_result.get("data", {}).get("list", [])
print(f"获取到 {len(camera_items)} 个摄像头")
return [transform_camera_node(item, parent_id) for item in camera_items]
except Exception as e:
print(f"调用海康威视 API 失败: {e}")
return []
def get_stream_url(node_id, stream_type=0):
"""获取摄像头的视频流地址
Args:
node_id: 摄像头的 cameraIndexCode
stream_type: 码流类型0=主码流1=子码流
"""
try:
result = get_camera_preview_url(node_id, stream_type)
if result.get("code") != "0":
print(f"获取视频流地址失败: {result.get('msg')}")
return None
url = result.get("data", {}).get("url")
return url
except Exception as e:
print(f"调用 get_camera_preview_url 失败: {e}")
return None
# ========== HTTP 处理器 ==========
class APIHandler(SimpleHTTPRequestHandler):
# 设置超时
timeout = 30
# MIME 类型映射
MIME_TYPES = {
'.html': 'text/html; charset=utf-8',
'.css': 'text/css',
'.js': 'application/javascript',
'.json': 'application/json',
'.png': 'image/png',
'.jpg': 'image/jpeg',
'.jpeg': 'image/jpeg',
'.gif': 'image/gif',
'.svg': 'image/svg+xml',
'.ico': 'image/x-icon',
'.mp3': 'audio/mpeg',
'.wav': 'audio/wav',
'.mp4': 'video/mp4',
'.txt': 'text/plain; charset=utf-8',
}
def log_message(self, format, *args):
print(f"[{self.log_date_time_string()}] {self.address_string()} - {format % args}")
def send_json_response(self, data, status=200):
"""统一返回 JSON 格式"""
self.send_response(status)
self.send_header('Content-type', 'application/json')
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
self.wfile.write(json.dumps(data, ensure_ascii=False).encode('utf-8'))
def send_error_json(self, message, status=400):
"""返回错误 JSON"""
self.send_json_response({"error": message}, status)
def do_GET(self):
try:
parsed_path = urllib.parse.urlparse(self.path)
path = parsed_path.path
query = parsed_path.query
# 新 API 路由 (RESTful)
if path.startswith('/api/roots'):
# GET /api/roots
roots = get_children(None)
self.send_json_response(roots)
return
elif path.startswith('/api/children/'):
# GET /api/children/21020000
node_id = path.split('/')[-1]
if not node_id:
self.send_error_json("Invalid node id", 400)
return
children = get_children(node_id)
self.send_json_response(children)
return
elif path.startswith('/api/stream/'):
# GET /api/stream/21020000?stream_type=0
node_id = path.split('/')[-1]
if not node_id:
self.send_error_json("Invalid node id", 400)
return
# 解析 stream_type 参数
params = urllib.parse.parse_qs(query)
stream_type = int(params.get('stream_type', ['0'])[0])
url = get_stream_url(node_id, stream_type)
if url is None:
self.send_error_json("Stream not found or node is not a leaf", 404)
return
# 返回完整信息
self.send_json_response({
"cameraIndexCode": node_id,
"url": url,
"stream_type": stream_type
})
return
# 静态文件服务(与原逻辑一致)
if path == '/' or path == '/index.html':
self.serve_file('index.html', query='api=1')
return
else:
filename = path.lstrip('/')
if os.path.exists(filename):
self.serve_static_file(filename)
else:
self.send_error(404, 'Not Found')
return
except Exception as e:
print(f"Error handling request: {e}")
self.send_error(500, 'Internal Server Error')
def serve_file(self, filename, query=None):
"""与原代码相同的文件服务(保留原逻辑)"""
try:
file_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), filename)
if os.path.exists(file_path):
self.send_response(200)
self.send_header('Content-type', 'text/html; charset=utf-8')
self.end_headers()
with open(file_path, 'rb') as f:
content = f.read()
if query:
try:
content = content.decode('utf-8')
# 替换原有 js 中的 apiParam 赋值语句(兼容旧前端)
content = content.replace(
"const apiParam = urlParams.get('api') || '1';",
f"const apiParam = '{query.split('=')[1]}';"
)
content = content.encode('utf-8')
except Exception as e:
print(f"Error modifying HTML content: {e}")
self.wfile.write(content)
else:
self.send_error(404, f'{filename} not found')
except Exception as e:
print(f"Error serving file: {e}")
raise
def serve_static_file(self, filename):
"""与原代码相同的静态文件服务"""
try:
ext = os.path.splitext(filename)[1].lower()
content_type = self.MIME_TYPES.get(ext, 'application/octet-stream')
self.send_response(200)
self.send_header('Content-type', content_type)
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
with open(filename, 'rb') as f:
self.wfile.write(f.read())
except Exception as e:
print(f"Error serving static file: {e}")
self.send_error(500, 'Internal Server Error')
def check_port_available(port):
"""检查端口是否可用"""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
try:
s.bind(('', port))
return True
except socket.error:
return False
def run():
port = 18369
if not check_port_available(port):
print(f"错误: 端口 {port} 已被占用")
return
server_address = ('', port)
httpd = ThreadingHTTPServer(server_address, APIHandler)
print(f'Server running on http://localhost:{port}')
print('API endpoints:')
print(' GET /api/roots - 获取所有根节点')
print(' GET /api/children/<id> - 获取指定节点的子节点(自动判断组织/摄像头)')
print(' GET /api/stream/<id> - 获取视频流地址')
print('静态文件服务: 访问 / 或 /index.html')
print('按 Ctrl+C 停止服务器')
try:
httpd.serve_forever()
except KeyboardInterrupt:
print('\n服务器已停止')
httpd.server_close()
if __name__ == '__main__':
run()