import os import urllib.parse import socket import json from http.server import ThreadingHTTPServer, SimpleHTTPRequestHandler import sys sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from utils.hikvision_cam_utils import get_organization_list, get_final_list, get_camera_preview_url # ========== 海康威视 API 配置 ========== ROOT_PARENT_INDEX_CODE = "4fa15af07b6b400f94af1e35d8235c30" def transform_org_node(item, parent_id=None): """将海康威视组织节点转换为前端期望的格式""" return { "id": item["indexCode"], "name": item["name"], "parent_id": parent_id or item.get("parentIndexCode"), "is_leaf": False, # 组织机构节点不是叶子节点 "stream_url": None } def transform_camera_node(item, parent_id=None): """将海康威视摄像头节点转换为前端期望的格式(叶子节点)""" return { "id": item["cameraIndexCode"], "name": item["name"], "parent_id": parent_id, "is_leaf": True, # 摄像头是叶子节点 "stream_url": None } def get_children(parent_id): """返回父节点下的直接子节点列表(从海康威视 API 获取) 逻辑: 1. 先调用 get_organization_list 获取子组织 2. 如果返回 list 为空,则调用 get_final_list 获取摄像头(叶子节点) """ if parent_id is None: parent_id = ROOT_PARENT_INDEX_CODE try: # 先尝试获取子组织 result = get_organization_list(parent_id) if result.get("code") != "0": print(f"海康威视 API 返回错误: {result.get('msg')}") return [] items = result.get("data", {}).get("list", []) # 如果有子组织,返回组织节点 if items: return [transform_org_node(item, parent_id) for item in items] # 如果没有子组织,尝试获取摄像头列表(叶子节点) print(f"组织 {parent_id} 无下级组织,尝试获取摄像头列表...") final_result = get_final_list(parent_id) if final_result.get("code") != "0": print(f"获取摄像头列表失败: {final_result.get('msg')}") return [] camera_items = final_result.get("data", {}).get("list", []) print(f"获取到 {len(camera_items)} 个摄像头") return [transform_camera_node(item, parent_id) for item in camera_items] except Exception as e: print(f"调用海康威视 API 失败: {e}") return [] def get_stream_url(node_id, stream_type=0): """获取摄像头的视频流地址 Args: node_id: 摄像头的 cameraIndexCode stream_type: 码流类型,0=主码流,1=子码流 """ try: result = get_camera_preview_url(node_id, stream_type) if result.get("code") != "0": print(f"获取视频流地址失败: {result.get('msg')}") return None url = result.get("data", {}).get("url") return url except Exception as e: print(f"调用 get_camera_preview_url 失败: {e}") return None # ========== HTTP 处理器 ========== class APIHandler(SimpleHTTPRequestHandler): # 设置超时 timeout = 30 # MIME 类型映射 MIME_TYPES = { '.html': 'text/html; charset=utf-8', '.css': 'text/css', '.js': 'application/javascript', '.json': 'application/json', '.png': 'image/png', '.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', '.gif': 'image/gif', '.svg': 'image/svg+xml', '.ico': 'image/x-icon', '.mp3': 'audio/mpeg', '.wav': 'audio/wav', '.mp4': 'video/mp4', '.txt': 'text/plain; charset=utf-8', } def log_message(self, format, *args): print(f"[{self.log_date_time_string()}] {self.address_string()} - {format % args}") def send_json_response(self, data, status=200): """统一返回 JSON 格式""" self.send_response(status) self.send_header('Content-type', 'application/json') self.send_header('Access-Control-Allow-Origin', '*') self.end_headers() self.wfile.write(json.dumps(data, ensure_ascii=False).encode('utf-8')) def send_error_json(self, message, status=400): """返回错误 JSON""" self.send_json_response({"error": message}, status) def do_GET(self): try: parsed_path = urllib.parse.urlparse(self.path) path = parsed_path.path query = parsed_path.query # 新 API 路由 (RESTful) if path.startswith('/api/roots'): # GET /api/roots roots = get_children(None) self.send_json_response(roots) return elif path.startswith('/api/children/'): # GET /api/children/21020000 node_id = path.split('/')[-1] if not node_id: self.send_error_json("Invalid node id", 400) return children = get_children(node_id) self.send_json_response(children) return elif path.startswith('/api/stream/'): # GET /api/stream/21020000?stream_type=0 node_id = path.split('/')[-1] if not node_id: self.send_error_json("Invalid node id", 400) return # 解析 stream_type 参数 params = urllib.parse.parse_qs(query) stream_type = int(params.get('stream_type', ['0'])[0]) url = get_stream_url(node_id, stream_type) if url is None: self.send_error_json("Stream not found or node is not a leaf", 404) return # 返回完整信息 self.send_json_response({ "cameraIndexCode": node_id, "url": url, "stream_type": stream_type }) return # 静态文件服务(与原逻辑一致) if path == '/' or path == '/index.html': self.serve_file('index.html', query='api=1') return else: filename = path.lstrip('/') if os.path.exists(filename): self.serve_static_file(filename) else: self.send_error(404, 'Not Found') return except Exception as e: print(f"Error handling request: {e}") self.send_error(500, 'Internal Server Error') def serve_file(self, filename, query=None): """与原代码相同的文件服务(保留原逻辑)""" try: file_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), filename) if os.path.exists(file_path): self.send_response(200) self.send_header('Content-type', 'text/html; charset=utf-8') self.end_headers() with open(file_path, 'rb') as f: content = f.read() if query: try: content = content.decode('utf-8') # 替换原有 js 中的 apiParam 赋值语句(兼容旧前端) content = content.replace( "const apiParam = urlParams.get('api') || '1';", f"const apiParam = '{query.split('=')[1]}';" ) content = content.encode('utf-8') except Exception as e: print(f"Error modifying HTML content: {e}") self.wfile.write(content) else: self.send_error(404, f'{filename} not found') except Exception as e: print(f"Error serving file: {e}") raise def serve_static_file(self, filename): """与原代码相同的静态文件服务""" try: ext = os.path.splitext(filename)[1].lower() content_type = self.MIME_TYPES.get(ext, 'application/octet-stream') self.send_response(200) self.send_header('Content-type', content_type) self.send_header('Access-Control-Allow-Origin', '*') self.end_headers() with open(filename, 'rb') as f: self.wfile.write(f.read()) except Exception as e: print(f"Error serving static file: {e}") self.send_error(500, 'Internal Server Error') def check_port_available(port): """检查端口是否可用""" with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: try: s.bind(('', port)) return True except socket.error: return False def run(): port = 18369 if not check_port_available(port): print(f"错误: 端口 {port} 已被占用") return server_address = ('', port) httpd = ThreadingHTTPServer(server_address, APIHandler) print(f'Server running on http://localhost:{port}') print('API endpoints:') print(' GET /api/roots - 获取所有根节点') print(' GET /api/children/ - 获取指定节点的子节点(自动判断组织/摄像头)') print(' GET /api/stream/ - 获取视频流地址') print('静态文件服务: 访问 / 或 /index.html') print('按 Ctrl+C 停止服务器') try: httpd.serve_forever() except KeyboardInterrupt: print('\n服务器已停止') httpd.server_close() if __name__ == '__main__': run()