# 智能算法展示平台 API 使用示例 本文档提供了使用智能算法展示平台API的各种示例,涵盖从基础到高级的用法。 ## 准备工作 在开始之前,请确保您已获得API访问令牌。 ### Python 示例 ```python import requests import json # 设置基础URL和认证令牌 BASE_URL = "http://localhost:8001/api/v1" headers = { "Authorization": "Bearer YOUR_ACCESS_TOKEN", "Content-Type": "application/json" } ``` ### JavaScript 示例 ```javascript const BASE_URL = "http://localhost:8001/api/v1"; const headers = { "Authorization": "Bearer YOUR_ACCESS_TOKEN", "Content-Type": "application/json" }; ``` ## 算法管理示例 ### 获取算法列表 **Python** ```python def get_algorithms(): response = requests.get(f"{BASE_URL}/algorithms", headers=headers) if response.status_code == 200: return response.json() else: print(f"Error: {response.status_code} - {response.text}") return None # 使用示例 algorithms = get_algorithms() if algorithms: print(f"找到 {len(algorithms['algorithms'])} 个算法") for alg in algorithms['algorithms']: print(f"- {alg['name']} ({alg['type']})") ``` **JavaScript** ```javascript async function getAlgorithms() { try { const response = await fetch(`${BASE_URL}/algorithms`, { method: 'GET', headers: headers }); if (response.ok) { const data = await response.json(); console.log(`找到 ${data.algorithms.length} 个算法`); data.algorithms.forEach(alg => { console.log(`- ${alg.name} (${alg.type})`); }); return data; } else { console.error(`Error: ${response.status} - ${await response.text()}`); return null; } } catch (error) { console.error('请求失败:', error); return null; } } ``` ### 获取特定算法详情 **Python** ```python def get_algorithm_details(algorithm_id): response = requests.get(f"{BASE_URL}/algorithms/{algorithm_id}", headers=headers) if response.status_code == 200: return response.json() else: print(f"Error: {response.status_code} - {response.text}") return None # 使用示例 algorithm = get_algorithm_details("alg-001") if algorithm: print(f"算法名称: {algorithm['name']}") print(f"算法类型: {algorithm['type']}") print(f"版本数量: {len(algorithm['versions'])}") ``` **JavaScript** ```javascript async function getAlgorithmDetails(algorithmId) { try { const response = await fetch(`${BASE_URL}/algorithms/${algorithmId}`, { method: 'GET', headers: headers }); if (response.ok) { return await response.json(); } else { console.error(`Error: ${response.status} - ${await response.text()}`); return null; } } catch (error) { console.error('请求失败:', error); return null; } } // 使用示例 getAlgorithmDetails("alg-001").then(algorithm => { if (algorithm) { console.log(`算法名称: ${algorithm.name}`); console.log(`算法类型: ${algorithm.type}`); console.log(`版本数量: ${algorithm.versions.length}`); } }); ``` ## 算法调用示例 ### 调用算法 **Python** ```python def call_algorithm(algorithm_id, version_id, input_data, params=None): payload = { "algorithm_id": algorithm_id, "version_id": version_id, "input_data": input_data } if params: payload["params"] = params response = requests.post(f"{BASE_URL}/algorithms/call", headers=headers, json=payload) if response.status_code == 200: return response.json() else: print(f"Error: {response.status_code} - {response.text}") return None # 使用示例 - 图像分类算法 input_data = { "image_url": "https://example.com/sample-image.jpg" } params = { "confidence_threshold": 0.7 } result = call_algorithm("alg-001", "ver-001", input_data, params) if result: print(f"调用ID: {result['id']}") print(f"状态: {result['status']}") print(f"响应时间: {result['response_time']}秒") if result['status'] == 'success': predictions = result['output_data']['predictions'] for pred in predictions: print(f"类别: {pred['class']}, 置信度: {pred['confidence']}") ``` **JavaScript** ```javascript async function callAlgorithm(algorithmId, versionId, inputData, params = null) { const payload = { algorithm_id: algorithmId, version_id: versionId, input_data: inputData }; if (params) { payload.params = params; } try { const response = await fetch(`${BASE_URL}/algorithms/call`, { method: 'POST', headers: headers, body: JSON.stringify(payload) }); if (response.ok) { return await response.json(); } else { console.error(`Error: ${response.status} - ${await response.text()}`); return null; } } catch (error) { console.error('请求失败:', error); return null; } } // 使用示例 const inputData = { image_url: "https://example.com/sample-image.jpg" }; const params = { confidence_threshold: 0.7 }; callAlgorithm("alg-001", "ver-001", inputData, params) .then(result => { if (result) { console.log(`调用ID: ${result.id}`); console.log(`状态: ${result.status}`); console.log(`响应时间: ${result.response_time}秒`); if (result.status === 'success') { const predictions = result.output_data.predictions; predictions.forEach(pred => { console.log(`类别: ${pred.class}, 置信度: ${pred.confidence}`); }); } } }); ``` ### 获取调用结果 **Python** ```python def get_call_result(call_id): response = requests.get(f"{BASE_URL}/algorithms/calls/{call_id}", headers=headers) if response.status_code == 200: return response.json() else: print(f"Error: {response.status_code} - {response.text}") return None # 使用示例 result = get_call_result("call-001") if result: print(f"调用状态: {result['status']}") if result['status'] == 'success': print(f"输出数据: {json.dumps(result['output_data'], indent=2)}") ``` **JavaScript** ```javascript async function getCallResult(callId) { try { const response = await fetch(`${BASE_URL}/algorithms/calls/${callId}`, { method: 'GET', headers: headers }); if (response.ok) { return await response.json(); } else { console.error(`Error: ${response.status} - ${await response.text()}`); return null; } } catch (error) { console.error('请求失败:', error); return null; } } // 使用示例 getCallResult("call-001").then(result => { if (result) { console.log(`调用状态: ${result.status}`); if (result.status === 'success') { console.log(`输出数据:`, JSON.stringify(result.output_data, null, 2)); } } }); ``` ## 通过API网关调用算法 ### 网关调用示例 **Python** ```python def call_algorithm_via_gateway(algorithm_id, version_id, input_data, params=None): payload = { "algorithm_id": algorithm_id, "version_id": version_id, "input_data": input_data } if params: payload["params"] = params response = requests.post(f"{BASE_URL}/gateway/call", headers=headers, json=payload) if response.status_code == 200: return response.json() else: print(f"Error: {response.status_code} - {response.text}") return None # 使用示例 gateway_result = call_algorithm_via_gateway("alg-001", "ver-001", input_data, params) if gateway_result: print(f"网关调用成功,响应时间: {gateway_result['response_time']}秒") ``` **JavaScript** ```javascript async function callAlgorithmViaGateway(algorithmId, versionId, inputData, params = null) { const payload = { algorithm_id: algorithmId, version_id: versionId, input_data: inputData }; if (params) { payload.params = params; } try { const response = await fetch(`${BASE_URL}/gateway/call`, { method: 'POST', headers: headers, body: JSON.stringify(payload) }); if (response.ok) { return await response.json(); } else { console.error(`Error: ${response.status} - ${await response.text()}`); return null; } } catch (error) { console.error('请求失败:', error); return null; } } // 使用示例 callAlgorithmViaGateway("alg-001", "ver-001", inputData, params) .then(gatewayResult => { if (gatewayResult) { console.log(`网关调用成功,响应时间: ${gatewayResult.response_time}秒`); } }); ``` ## 历史记录示例 ### 获取调用历史 **Python** ```python def get_call_history(skip=0, limit=100, algorithm_id=None, status=None): params = {"skip": skip, "limit": limit} if algorithm_id: params["algorithm_id"] = algorithm_id if status: params["status"] = status response = requests.get(f"{BASE_URL}/history/user-calls", headers=headers, params=params) if response.status_code == 200: return response.json() else: print(f"Error: {response.status_code} - {response.text}") return None # 使用示例 history = get_call_history(limit=10, status="success") if history: print(f"找到 {history['count']} 条历史记录") for record in history['history']: print(f"- {record['algorithm_id']}: {record['status']} ({record['response_time']}s)") ``` **JavaScript** ```javascript async function getCallHistory(skip = 0, limit = 100, algorithmId = null, status = null) { let url = `${BASE_URL}/history/user-calls?skip=${skip}&limit=${limit}`; if (algorithmId) url += `&algorithm_id=${algorithmId}`; if (status) url += `&status=${status}`; try { const response = await fetch(url, { method: 'GET', headers: headers }); if (response.ok) { return await response.json(); } else { console.error(`Error: ${response.status} - ${await response.text()}`); return null; } } catch (error) { console.error('请求失败:', error); return null; } } // 使用示例 getCallHistory(0, 10, null, "success").then(history => { if (history) { console.log(`找到 ${history.count} 条历史记录`); history.history.forEach(record => { console.log(`- ${record.algorithm_id}: ${record.status} (${record.response_time}s)`); }); } }); ``` ### 获取调用统计 **Python** ```python def get_call_statistics(user_id=None, algorithm_id=None): params = {} if user_id: params["user_id"] = user_id if algorithm_id: params["algorithm_id"] = algorithm_id response = requests.get(f"{BASE_URL}/history/statistics", headers=headers, params=params) if response.status_code == 200: return response.json() else: print(f"Error: {response.status_code} - {response.text}") return None # 使用示例 stats = get_call_statistics() if stats: print(f"总调用次数: {stats['total_calls']}") print(f"成功率: {stats['success_rate']}%") print(f"平均响应时间: {stats['avg_response_time']}秒") print(f"今日调用: {stats['today_calls']}") ``` **JavaScript** ```javascript async function getCallStatistics(userId = null, algorithmId = null) { let url = `${BASE_URL}/history/statistics`; const params = {}; if (userId) params.user_id = userId; if (algorithmId) params.algorithm_id = algorithmId; if (Object.keys(params).length > 0) { const queryString = new URLSearchParams(params).toString(); url += `?${queryString}`; } try { const response = await fetch(url, { method: 'GET', headers: headers }); if (response.ok) { return await response.json(); } else { console.error(`Error: ${response.status} - ${await response.text()}`); return null; } } catch (error) { console.error('请求失败:', error); return null; } } // 使用示例 getCallStatistics().then(stats => { if (stats) { console.log(`总调用次数: ${stats.total_calls}`); console.log(`成功率: ${stats.success_rate}%`); console.log(`平均响应时间: ${stats.avg_response_time}秒`); console.log(`今日调用: ${stats.today_calls}`); } }); ``` ## 监控API示例 ### 获取系统健康状况 **Python** ```python def get_health_status(): response = requests.get(f"{BASE_URL}/monitoring/health", headers=headers) if response.status_code == 200: return response.json() else: print(f"Error: {response.status_code} - {response.text}") return None # 使用示例 health = get_health_status() if health: print(f"系统状态: {health['status']}") print(f"CPU使用率: {health['system_metrics']['cpu_percent']}%") print(f"内存使用率: {health['system_metrics']['memory_percent']}%") ``` **JavaScript** ```javascript async function getHealthStatus() { try { const response = await fetch(`${BASE_URL}/monitoring/health`, { method: 'GET', headers: headers }); if (response.ok) { return await response.json(); } else { console.error(`Error: ${response.status} - ${await response.text()}`); return null; } } catch (error) { console.error('请求失败:', error); return null; } } // 使用示例 getHealthStatus().then(health => { if (health) { console.log(`系统状态: ${health.status}`); console.log(`CPU使用率: ${health.system_metrics.cpu_percent}%`); console.log(`内存使用率: ${health.system_metrics.memory_percent}%`); } }); ``` ### 获取算法性能指标 **Python** ```python def get_algorithm_performance(algorithm_id, days=7): response = requests.get( f"{BASE_URL}/monitoring/performance/algorithm/{algorithm_id}", headers=headers, params={"days": days} ) if response.status_code == 200: return response.json() else: print(f"Error: {response.status_code} - {response.text}") return None # 使用示例 performance = get_algorithm_performance("alg-001", 30) if performance: print(f"算法 {performance['algorithm_id']} 的30天性能指标:") print(f"总调用次数: {performance['total_calls']}") print(f"成功率: {performance['success_rate']}%") print(f"平均响应时间: {performance['average_response_time']}秒") ``` **JavaScript** ```javascript async function getAlgorithmPerformance(algorithmId, days = 7) { try { const response = await fetch( `${BASE_URL}/monitoring/performance/algorithm/${algorithmId}?days=${days}`, { method: 'GET', headers: headers } ); if (response.ok) { return await response.json(); } else { console.error(`Error: ${response.status} - ${await response.text()}`); return null; } } catch (error) { console.error('请求失败:', error); return null; } } // 使用示例 getAlgorithmPerformance("alg-001", 30).then(performance => { if (performance) { console.log(`算法 ${performance.algorithm_id} 的30天性能指标:`); console.log(`总调用次数: ${performance.total_calls}`); console.log(`成功率: ${performance.success_rate}%`); console.log(`平均响应时间: ${performance.average_response_time}秒`); } }); ``` ## OpenAI集成示例 ### 生成仿真数据 **Python** ```python def generate_simulation_data(prompt, data_type="text"): payload = { "prompt": prompt, "data_type": data_type } response = requests.post(f"{BASE_URL}/openai/generate-data", headers=headers, json=payload) if response.status_code == 200: return response.json() else: print(f"Error: {response.status_code} - {response.text}") return None # 使用示例 simulation_data = generate_simulation_data("一张包含猫和狗的照片", "image") if simulation_data: print(f"生成的仿真数据类型: {simulation_data['type']}") print(f"生成的数据: {simulation_data['data'][:100]}...") ``` **JavaScript** ```javascript async function generateSimulationData(prompt, dataType = "text") { const payload = { prompt: prompt, data_type: dataType }; try { const response = await fetch(`${BASE_URL}/openai/generate-data`, { method: 'POST', headers: headers, body: JSON.stringify(payload) }); if (response.ok) { return await response.json(); } else { console.error(`Error: ${response.status} - ${await response.text()}`); return null; } } catch (error) { console.error('请求失败:', error); return null; } } // 使用示例 generateSimulationData("一张包含猫和狗的照片", "image") .then(simulationData => { if (simulationData) { console.log(`生成的仿真数据类型: ${simulationData.type}`); console.log(`生成的数据: ${simulationData.data.substring(0, 100)}...`); } }); ``` ## 错误处理 API调用可能会返回各种错误,以下是常见的错误处理方式: **Python** ```python def safe_api_call(url, method="GET", payload=None): try: if method.upper() == "GET": response = requests.get(url, headers=headers) elif method.upper() == "POST": response = requests.post(url, headers=headers, json=payload) elif method.upper() == "PUT": response = requests.put(url, headers=headers, json=payload) elif method.upper() == "DELETE": response = requests.delete(url, headers=headers) # 检查HTTP状态码 if response.status_code >= 200 and response.status_code < 300: return {"success": True, "data": response.json()} else: return { "success": False, "error": f"HTTP {response.status_code}", "message": response.text } except requests.exceptions.RequestException as e: return {"success": False, "error": "Request Error", "message": str(e)} except Exception as e: return {"success": False, "error": "Unknown Error", "message": str(e)} # 使用示例 result = safe_api_call(f"{BASE_URL}/algorithms", "GET") if result["success"]: print("API调用成功:", result["data"]) else: print("API调用失败:", result["error"], "-", result["message"]) ``` **JavaScript** ```javascript async function safeApiCall(url, method = "GET", payload = null) { try { const options = { method: method.toUpperCase(), headers: headers }; if (payload && (method.toUpperCase() === "POST" || method.toUpperCase() === "PUT")) { options.body = JSON.stringify(payload); } const response = await fetch(url, options); // 检查HTTP状态码 if (response.ok) { return { success: true, data: await response.json() }; } else { return { success: false, error: `HTTP ${response.status}`, message: await response.text() }; } } catch (error) { return { success: false, error: "Request Error", message: error.message }; } } // 使用示例 safeApiCall(`${BASE_URL}/algorithms`, "GET") .then(result => { if (result.success) { console.log("API调用成功:", result.data); } else { console.log("API调用失败:", result.error, "-", result.message); } }); ``` ## 高级用法 ### 批量调用算法 **Python** ```python import asyncio import aiohttp async def batch_call_algorithms(calls_data): async with aiohttp.ClientSession() as session: tasks = [] for call_data in calls_data: task = asyncio.create_task( async_call_algorithm(session, call_data) ) tasks.append(task) results = await asyncio.gather(*tasks) return results async def async_call_algorithm(session, call_data): headers = { "Authorization": "Bearer YOUR_ACCESS_TOKEN", "Content-Type": "application/json" } async with session.post( f"{BASE_URL}/algorithms/call", headers=headers, json=call_data ) as response: return await response.json() # 使用示例 calls_data = [ { "algorithm_id": "alg-001", "version_id": "ver-001", "input_data": {"image_url": "https://example.com/img1.jpg"} }, { "algorithm_id": "alg-001", "version_id": "ver-001", "input_data": {"image_url": "https://example.com/img2.jpg"} } ] # 注意:这需要在异步环境中运行 # results = asyncio.run(batch_call_algorithms(calls_data)) ``` ### 使用SDK进行API调用 我们还提供了官方SDK来简化API调用: **Python SDK示例** ```python from algorithm_showcase.client import AlgorithmShowcaseClient # 初始化客户端 client = AlgorithmShowcaseClient( base_url="http://localhost:8001", api_key="YOUR_API_KEY" ) # 调用算法 try: result = client.call_algorithm( algorithm_id="alg-001", version_id="ver-001", input_data={"image_url": "https://example.com/sample.jpg"}, params={"confidence_threshold": 0.7} ) print(f"调用成功: {result}") except Exception as e: print(f"调用失败: {e}") ``` **JavaScript SDK示例** ```javascript import { AlgorithmShowcaseClient } from '@algorithm-showcase/sdk'; // 初始化客户端 const client = new AlgorithmShowcaseClient({ baseUrl: 'http://localhost:8001', apiKey: 'YOUR_API_KEY' }); // 调用算法 try { const result = await client.callAlgorithm({ algorithmId: 'alg-001', versionId: 'ver-001', inputData: { imageUrl: 'https://example.com/sample.jpg' }, params: { confidenceThreshold: 0.7 } }); console.log('调用成功:', result); } catch (error) { console.error('调用失败:', error); } ``` 以上是智能算法展示平台API的完整使用示例。这些示例涵盖了从基本操作到高级用法的各个方面,可以帮助开发者快速上手并有效利用平台的功能。