from http.server import ThreadingHTTPServer, SimpleHTTPRequestHandler import os import urllib.parse import socket class APIHandler(SimpleHTTPRequestHandler): # 设置超时,避免长时间占用连接 timeout = 30 # MIME 类型映射 MIME_TYPES = { '.html': 'text/html; charset=utf-8', '.css': 'text/css', '.js': 'application/javascript', '.json': 'application/json', '.png': 'image/png', '.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', '.gif': 'image/gif', '.svg': 'image/svg+xml', '.ico': 'image/x-icon', '.mp3': 'audio/mpeg', '.wav': 'audio/wav', '.mp4': 'video/mp4', '.txt': 'text/plain; charset=utf-8', } def log_message(self, format, *args): # 自定义日志格式 print(f"[{self.log_date_time_string()}] {self.address_string()} - {format % args}") def do_GET(self): try: # 解析路径和查询参数 parsed_path = urllib.parse.urlparse(self.path) path = parsed_path.path query = parsed_path.query # 检查是否是API接口 api_param = None if path.startswith('/api/'): api_param = path[5:] # 提取 /api/ 后面的数字 # 生成带参数的index.html URL if api_param: # 使用HTML文件并附加api参数 self.serve_file('index.html', query=f'api={api_param}') elif path == '/' or path == '/index.html': # 默认访问使用 api=1 self.serve_file('index.html', query='api=1') elif path == '/coords' or path == '/coordinate.html': self.serve_file('coordinate.html') else: # 处理静态文件请求 # 移除开头的 / filename = path.lstrip('/') if os.path.exists(filename): self.serve_static_file(filename) else: self.send_error(404, 'Not Found') except Exception as e: print(f"Error handling request: {e}") self.send_error(500, 'Internal Server Error') def serve_file(self, filename, query=None): try: file_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), filename) if os.path.exists(file_path): self.send_response(200) self.send_header('Content-type', 'text/html; charset=utf-8') self.end_headers() with open(file_path, 'rb') as f: content = f.read() # 如果有查询参数,修改HTML中的URL参数解析部分 if query: try: content = content.decode('utf-8') # 修改URL参数行,替换默认值为实际参数 content = content.replace( "const apiParam = urlParams.get('api') || '1';", f"const apiParam = '{query.split('=')[1]}';" ) content = content.encode('utf-8') except Exception as e: print(f"Error modifying HTML content: {e}") self.wfile.write(content) else: self.send_error(404, f'{filename} not found') except Exception as e: print(f"Error serving file: {e}") raise def serve_static_file(self, filename): """提供静态文件服务""" try: # 获取文件扩展名 ext = os.path.splitext(filename)[1].lower() content_type = self.MIME_TYPES.get(ext, 'application/octet-stream') self.send_response(200) self.send_header('Content-type', content_type) self.send_header('Access-Control-Allow-Origin', '*') self.end_headers() with open(filename, 'rb') as f: content = f.read() self.wfile.write(content) except Exception as e: print(f"Error serving static file: {e}") self.send_error(500, 'Internal Server Error') def check_port_available(port): """检查端口是否可用""" with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: try: s.bind(('', port)) return True except socket.error: return False def run(): port = 8086 if not check_port_available(port): print(f"错误: 端口 {port} 已被占用") return server_address = ('', port) httpd = ThreadingHTTPServer(server_address, APIHandler) print(f'Server running on http://localhost:{port}') print(f'支持的接口: /, /api/1, /api/2, /api/3, /api/4, /api/5, /api/6, /api/7, /api/11-16') print(f'坐标提取工具: /coords') print('按 Ctrl+C 停止服务器') try: httpd.serve_forever() except KeyboardInterrupt: print('\n服务器已停止') httpd.server_close() if __name__ == '__main__': run()