MCP AI应用通信的底层机制
技术小馆专注AI与Java领域的前沿技术知识库 技术小馆官网
MCP在AI应用快速发展的今天,不同AI系统之间的高效通信成为技术架构的关键挑战。MCP(Model Context Protocol)作为新一代AI应用通信协议,正在重新定义AI工具生态的构建方式。
想象一下,当你需要让ChatGPT调用外部API、访问数据库或执行复杂计算时,传统的插件机制往往面临兼容性差、扩展性有限的困境。MCP协议的出现,为这个问题提供了优雅的解决方案。通过标准化的通信接口和灵活的架构设计,MCP让AI应用能够无缝集成各种外部服务和工具,就像给AI装上了"万能工具箱"。无论是数据查询、文件处理还是复杂业务逻辑,MCP都能让AI应用以统一的方式调用和协作。
一、MCP协议概述
什么是MCP协议
MCP(Model Context Protocol)是一个开放标准协议,专门设计用于AI模型与外部工具和服务之间的通信。它采用JSON-RPC 2.0规范,通过标准化的消息格式实现AI应用与各种外部资源的无缝集成。
{ "jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {} }
MCP的核心思想是提供一个统一的接口层,让AI模型能够像调用本地函数一样调用远程服务,同时保持协议的简洁性和扩展性。
MCP的设计目标与核心价值
MCP的设计遵循几个核心原则:
- 标准化:统一的协议规范,确保不同实现之间的互操作性
- 安全性:内置认证和授权机制,保护敏感数据
- 可扩展性:支持动态工具发现和注册
- 性能优化:高效的序列化和传输机制
这些设计目标使得MCP成为构建AI工具生态的理想选择。
MCP在AI生态中的定位
在当前的AI技术栈中,MCP扮演着"连接器"的角色。它位于AI模型层和应用服务层之间,为两者提供标准化的通信桥梁。
AI模型层 (ChatGPT, Claude等) ↓ MCP协议层 (标准化通信接口) ↓ 应用服务层 (数据库、API、文件系统等)
这种分层架构使得AI应用能够灵活地集成各种外部资源,而不需要为每种服务开发专门的适配器。
二、MCP的核心架构
协议层设计原理
MCP采用分层设计,每一层都有明确的职责:
- 传输层:处理底层网络通信,支持TCP、WebSocket等多种传输方式
- 协议层:实现JSON-RPC 2.0规范,处理消息的序列化和反序列化
- 应用层:定义具体的工具接口和业务逻辑
# MCP服务器基础实现示例 import json import asyncio class MCPServer: def __init__(self): self.tools = {} async def handle_request(self, request): """处理MCP请求""" try: method = request.get("method") params = request.get("params", {}) if method == "tools/list": return {"result": list(self.tools.keys())} elif method == "tools/call": tool_name = params.get("name") tool_params = params.get("arguments", {}) return await self.call_tool(tool_name, tool_params) except Exception as e: return {"error": {"code": -1, "message": str(e)}}
客户端-服务器模型
MCP采用经典的客户端-服务器架构,其中:
- 客户端:通常是AI模型或AI应用,发起工具调用请求
- 服务器:提供各种工具和服务,响应客户端的请求
# MCP客户端示例 class MCPClient: def __init__(self, server_url): self.server_url = server_url async def list_tools(self): """获取可用工具列表""" request = { "jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {} } return await self.send_request(request) async def call_tool(self, tool_name, arguments): """调用指定工具""" request = { "jsonrpc": "2.0", "id": 2, "method": "tools/call", "params": { "name": tool_name, "arguments": arguments } } return await self.send_request(request)
消息传递机制
MCP使用JSON-RPC 2.0作为消息传递协议,支持三种消息类型:
- 请求消息:客户端向服务器发送的调用请求
- 响应消息:服务器返回的处理结果
- 通知消息:单向消息,不需要响应
// 请求消息示例 { "jsonrpc": "2.0", "id": 1, "method": "tools/call", "params": { "name": "database/query", "arguments": { "sql": "SELECT * FROM users WHERE age > 18" } } } // 响应消息示例 { "jsonrpc": "2.0", "id": 1, "result": { "data": [ {"id": 1, "name": "张三", "age": 25}, {"id": 2, "name": "李四", "age": 30} ] } }
错误处理与容错机制
MCP定义了标准的错误码和错误处理机制:
# 错误处理示例 class MCPError(Exception): def __init__(self, code, message, data=None): self.code = code self.message = message self.data = data # 标准错误码 ERROR_CODES = { -32700: "解析错误", -32600: "无效请求", -32601: "方法未找到", -32602: "无效参数", -32603: "内部错误" } def handle_error(error_code, message): """统一错误处理""" return { "jsonrpc": "2.0", "error": { "code": error_code, "message": message } }
三、MCP协议详解
请求-响应模式
MCP采用同步的请求-响应模式,每个请求都有唯一的ID,确保响应的正确匹配:
class RequestManager: def __init__(self): self.pending_requests = {} self.request_id = 0 def create_request(self, method, params): """创建新请求""" self.request_id += 1 request = { "jsonrpc": "2.0", "id": self.request_id, "method": method, "params": params } self.pending_requests[self.request_id] = request return request def handle_response(self, response): """处理响应""" request_id = response.get("id") if request_id in self.pending_requests: del self.pending_requests[request_id] return response return None
数据格式与序列化
MCP使用JSON作为数据交换格式,支持复杂的数据结构:
# 数据类型示例 class MCPDataTypes: @staticmethod def serialize_tool_result(data): """序列化工具调用结果""" if isinstance(data, (dict, list)): return json.dumps(data, ensure_ascii=False) return str(data) @staticmethod def deserialize_tool_params(params_str): """反序列化工具参数""" try: return json.loads(params_str) except json.JSONDecodeError: raise MCPError(-32700, "参数格式错误")
认证与安全机制
MCP支持多种认证方式,确保通信安全:
# 认证机制示例 class MCPAuth: def __init__(self): self.api_keys = {} def authenticate(self, headers): """验证请求身份""" api_key = headers.get("Authorization") if not api_key: raise MCPError(-32001, "缺少认证信息") if api_key not in self.api_keys: raise MCPError(-32002, "无效的API密钥") return True def add_api_key(self, key, permissions): """添加API密钥""" self.api_keys[key] = permissions
版本兼容性策略
MCP采用语义化版本控制,确保向后兼容:
# 版本兼容性处理 class MCPVersionManager: def __init__(self): self.supported_versions = ["1.0", "1.1", "1.2"] def check_compatibility(self, client_version): """检查版本兼容性""" major, minor = client_version.split(".") for supported in self.supported_versions: s_major, s_minor = supported.split(".") if major == s_major and int(minor) <= int(s_minor): return True return False def negotiate_version(self, client_versions): """协商最佳版本""" for version in client_versions: if version in self.supported_versions: return version return self.supported_versions[0]
四、MCP的实际应用场景
AI应用集成外部API
MCP最常见的应用场景是让AI模型调用外部API服务:
# 天气API集成示例 class WeatherTool: def __init__(self, api_key): self.api_key = api_key async def get_weather(self, city): """获取城市天气信息""" url = f"https://api.weatherapi.com/v1/current.json" params = { "key": self.api_key, "q": city, "aqi": "no" } async with aiohttp.ClientSession() as session: async with session.get(url, params=params) as response: data = await response.json() return { "city": city, "temperature": data["current"]["temp_c"], "condition": data["current"]["condition"]["text"] } # 注册到MCP服务器 mcp_server = MCPServer() weather_tool = WeatherTool("your_api_key") mcp_server.tools["weather/get"] = weather_tool.get_weather
数据库连接与查询
MCP可以安全地让AI模型访问数据库:
# 数据库工具示例 class DatabaseTool: def __init__(self, connection_string): self.connection_string = connection_string async def query_database(self, sql, params=None): """执行数据库查询""" try: async with asyncpg.connect(self.connection_string) as conn: result = await conn.fetch(sql, *params or []) return [dict(row) for row in result] except Exception as e: raise MCPError(-32603, f"数据库查询错误: {str(e)}") async def execute_sql(self, sql, params=None): """执行SQL语句""" try: async with asyncpg.connect(self.connection_string) as conn: result = await conn.execute(sql, *params or []) return {"affected_rows": result} except Exception as e: raise MCPError(-32603, f"SQL执行错误: {str(e)}") # 注册数据库工具 db_tool = DatabaseTool("postgresql://user:pass@localhost/db") mcp_server.tools["database/query"] = db_tool.query_database mcp_server.tools["database/execute"] = db_tool.execute_sql
文件系统操作
MCP支持安全的文件系统操作:
# 文件系统工具示例 class FileSystemTool: def __init__(self, base_path): self.base_path = Path(base_path) async def read_file(self, file_path): """读取文件内容""" full_path = self.base_path / file_path if not full_path.exists(): raise MCPError(-32003, "文件不存在") try: with open(full_path, 'r', encoding='utf-8') as f: return {"content": f.read(), "path": str(full_path)} except Exception as e: raise MCPError(-32603, f"文件读取错误: {str(e)}") async def write_file(self, file_path, content): """写入文件内容""" full_path = self.base_path / file_path try: full_path.parent.mkdir(parents=True, exist_ok=True) with open(full_path, 'w', encoding='utf-8') as f: f.write(content) return {"success": True, "path": str(full_path)} except Exception as e: raise MCPError(-32603, f"文件写入错误: {str(e)}") # 注册文件系统工具 fs_tool = FileSystemTool("/safe/path") mcp_server.tools["filesystem/read"] = fs_tool.read_file mcp_server.tools["filesystem/write"] = fs_tool.write_file
自定义工具开发
开发者可以创建自定义工具来扩展AI的能力:
# 自定义计算工具示例 class CalculatorTool: def __init__(self): self.supported_operations = ['+', '-', '*', '/', '**'] async def calculate(self, expression): """安全计算数学表达式""" try: # 安全检查,只允许基本数学运算 allowed_chars = set('0123456789+-*/.() ') if not all(c in allowed_chars for c in expression): raise ValueError("表达式包含不允许的字符") result = eval(expression) return {"expression": expression, "result": result} except Exception as e: raise MCPError(-32602, f"计算错误: {str(e)}") async def get_statistics(self, numbers): """计算统计信息""" try: nums = [float(n) for n in numbers] return { "mean": sum(nums) / len(nums), "max": max(nums), "min": min(nums), "count": len(nums) } except Exception as e: raise MCPError(-32602, f"统计计算错误: {str(e)}") # 注册计算工具 calc_tool = CalculatorTool() mcp_server.tools["calculator/compute"] = calc_tool.calculate mcp_server.tools["calculator/stats"] = calc_tool.get_statistics
五、MCP与其他协议的对比
与OpenAI插件协议的差异
MCP相比OpenAI插件协议具有以下优势:
- 标准化程度更高:MCP是开放标准,不依赖特定厂商
- 扩展性更强:支持动态工具发现和注册
- 安全性更好:内置认证和授权机制
# OpenAI插件 vs MCP对比示例 # OpenAI插件方式 class OpenAIPlugin: def __init__(self): self.manifest = { "schema_version": "v1", "name_for_model": "weather_plugin", "name_for_human": "天气查询插件", "description_for_model": "获取城市天气信息", "description_for_human": "查询指定城市的天气情况", "auth": {"type": "none"}, "api": {"type": "openapi", "url": "https://api.example.com/openapi.yaml"}, "logo_url": "https://example.com/logo.png", "contact_email": "support@example.com", "legal_info_url": "https://example.com/legal" } # MCP方式 class MCPTool: def __init__(self): self.tool_info = { "name": "weather/get", "description": "获取城市天气信息", "parameters": { "type": "object", "properties": { "city": {"type": "string", "description": "城市名称"} }, "required": ["city"] } }
与REST API的对比分析
MCP相比传统REST API的优势:
- 统一接口:所有工具通过相同的协议调用
- 类型安全:支持强类型参数定义
- 错误处理:标准化的错误码和错误信息
# REST API vs MCP对比 # 传统REST API调用 async def rest_api_call(): async with aiohttp.ClientSession() as session: # 需要为每个API维护不同的调用方式 weather_response = await session.get("https://api.weather.com/current?city=北京") db_response = await session.post("https://api.database.com/query", json={"sql": "SELECT * FROM users"}) file_response = await session.get("https://api.files.com/read?path=/data.txt") # MCP统一调用 async def mcp_call(): client = MCPClient("mcp://localhost:3000") # 所有工具使用相同的调用方式 weather = await client.call_tool("weather/get", {"city": "北京"}) users = await client.call_tool("database/query", {"sql": "SELECT * FROM users"}) content = await client.call_tool("filesystem/read", {"path": "/data.txt"})
性能与扩展性优势
MCP在性能和扩展性方面的优势:
# 性能对比示例 import time import asyncio class PerformanceTest: def __init__(self): self.mcp_client = MCPClient("mcp://localhost:3000") async def test_mcp_performance(self): """测试MCP性能""" start_time = time.time() # 并发调用多个工具 tasks = [ self.mcp_client.call_tool("weather/get", {"city": "北京"}), self.mcp_client.call_tool("database/query", {"sql": "SELECT * FROM users"}), self.mcp_client.call_tool("calculator/compute", {"expression": "2+2*3"}) ] results = await asyncio.gather(*tasks) end_time = time.time() return { "total_time": end_time - start_time, "results": results } async def test_scalability(self): """测试扩展性""" # 动态添加新工具 new_tool = { "name": "custom/process", "description": "自定义数据处理工具", "handler": lambda data: {"processed": data.upper()} } # 无需重启服务器即可使用新工具 result = await self.mcp_client.call_tool("custom/process", {"data": "hello world"}) return result
六、MCP开发实践指南
环境搭建与配置
开始MCP开发需要搭建基础环境:
# 环境配置示例 # requirements.txt mcp==1.0.0 aiohttp==3.8.5 asyncpg==0.28.0 pydantic==2.0.0 # 基础配置文件 import yaml class MCPConfig: def __init__(self, config_path="mcp_config.yaml"): with open(config_path, 'r', encoding='utf-8') as f: self.config = yaml.safe_load(f) @property def server_host(self): return self.config.get("server", {}).get("host", "localhost") @property def server_port(self): return self.config.get("server", {}).get("port", 3000) @property def auth_token(self): return self.config.get("auth", {}).get("token") @property def tools_config(self): return self.config.get("tools", {}) # mcp_config.yaml server: host: localhost port: 3000 debug: true auth: token: "your_secret_token" enabled: true tools: weather: api_key: "your_weather_api_key" enabled: true database: connection_string: "postgresql://user:pass@localhost/db" enabled: true filesystem: base_path: "/safe/path" enabled: true
基础客户端开发
开发MCP客户端的基本步骤:
# 基础MCP客户端实现 import asyncio import aiohttp import json from typing import Dict, Any class MCPClient: def __init__(self, server_url: str, auth_token: str = None): self.server_url = server_url self.auth_token = auth_token self.session = None self.request_id = 0 async def __aenter__(self): """异步上下文管理器入口""" self.session = aiohttp.ClientSession() return self async def __aexit__(self, exc_type, exc_val, exc_tb): """异步上下文管理器出口""" if self.session: await self.session.close() async def send_request(self, method: str, params: Dict[str, Any] = None) -> Dict[str, Any]: """发送MCP请求""" self.request_id += 1 request = { "jsonrpc": "2.0", "id": self.request_id, "method": method, "params": params or {} } headers = {"Content-Type": "application/json"} if self.auth_token: headers["Authorization"] = f"Bearer {self.auth_token}" async with self.session.post( self.server_url, json=request, headers=headers ) as response: result = await response.json() if "error" in result: raise MCPError( result["error"]["code"], result["error"]["message"] ) return result["result"] async def list_tools(self) -> list: """获取可用工具列表""" return await self.send_request("tools/list") async def call_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]: """调用指定工具""" return await self.send_request("tools/call", { "name": tool_name, "arguments": arguments }) # 使用示例 async def main(): async with MCPClient("http://localhost:3000", "your_token") as client: # 获取可用工具 tools = await client.list_tools() print(f"可用工具: {tools}") # 调用天气工具 weather = await client.call_tool("weather/get", {"city": "北京"}) print(f"天气信息: {weather}") # 调用数据库工具 users = await client.call_tool("database/query", { "sql": "SELECT * FROM users LIMIT 5" }) print(f"用户数据: {users}") if __name__ == "__main__": asyncio.run(main())
自定义工具实现
创建自定义MCP工具的完整示例:
# 自定义工具实现 from typing import Dict, Any, Optional import asyncio import json class CustomTool: """自定义MCP工具基类""" def __init__(self, name: str, description: str): self.name = name self.description = description self.parameters = {} def add_parameter(self, name: str, param_type: str, description: str, required: bool = False): """添加工具参数""" self.parameters[name] = { "type": param_type, "description": description, "required": required } async def execute(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """执行工具逻辑(子类需要重写)""" raise NotImplementedError("子类必须实现execute方法") def get_info(self) -> Dict[str, Any]: """获取工具信息""" return { "name": self.name, "description": self.description, "parameters": { "type": "object", "properties": self.parameters, "required": [name for name, param in self.parameters.items() if param.get("required", False)] } } # 图像处理工具示例 class ImageProcessingTool(CustomTool): def __init__(self): super().__init__("image/process", "图像处理工具") self.add_parameter("image_url", "string", "图像URL", required=True) self.add_parameter("operation", "string", "处理操作(resize/rotate/filter)", required=True) self.add_parameter("parameters", "object", "处理参数") async def execute(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """执行图像处理""" image_url = arguments["image_url"] operation = arguments["operation"] params = arguments.get("parameters", {}) # 模拟图像处理 if operation == "resize": width = params.get("width", 800) height = params.get("height", 600) return { "status": "success", "operation": "resize", "new_size": f"{width}x{height}", "processed_url": f"{image_url}_resized" } elif operation == "rotate": angle = params.get("angle", 90) return { "status": "success", "operation": "rotate", "angle": angle, "processed_url": f"{image_url}_rotated" } else: raise MCPError(-32602, f"不支持的操作: {operation}") # 数据分析工具示例 class DataAnalysisTool(CustomTool): def __init__(self): super().__init__("data/analyze", "数据分析工具") self.add_parameter("data", "array", "要分析的数据", required=True) self.add_parameter("analysis_type", "string", "分析类型(statistics/trend/correlation)", required=True) async def execute(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """执行数据分析""" data = arguments["data"] analysis_type = arguments["analysis_type"] if not isinstance(data, list): raise MCPError(-32602, "数据必须是数组格式") if analysis_type == "statistics": return self._calculate_statistics(data) elif analysis_type == "trend": return self._analyze_trend(data) elif analysis_type == "correlation": return self._calculate_correlation(data) else: raise MCPError(-32602, f"不支持的分析类型: {analysis_type}") def _calculate_statistics(self, data: list) -> Dict[str, Any]: """计算统计信息""" if not data: return {"error": "数据为空"} numeric_data = [float(x) for x in data if isinstance(x, (int, float))] if not numeric_data: return {"error": "没有有效的数值数据"} return { "count": len(numeric_data), "sum": sum(numeric_data), "mean": sum(numeric_data) / len(numeric_data), "min": min(numeric_data), "max": max(numeric_data), "median": sorted(numeric_data)[len(numeric_data) // 2] } def _analyze_trend(self, data: list) -> Dict[str, Any]: """分析趋势""" if len(data) < 2: return {"error": "数据点不足,无法分析趋势"} numeric_data = [float(x) for x in data if isinstance(x, (int, float))] if len(numeric_data) < 2: return {"error": "数值数据点不足"} # 简单趋势分析 first_half = numeric_data[:len(numeric_data)//2] second_half = numeric_data[len(numeric_data)//2:] first_avg = sum(first_half) / len(first_half) second_avg = sum(second_half) / len(second_half) trend = "上升" if second_avg > first_avg else "下降" if second_avg < first_avg else "平稳" return { "trend": trend, "change_rate": (second_avg - first_avg) / first_avg * 100, "first_half_avg": first_avg, "second_half_avg": second_avg } def _calculate_correlation(self, data: list) -> Dict[str, Any]: """计算相关性(简化版)""" if len(data) < 4: return {"error": "数据点不足,无法计算相关性"} # 假设数据是成对的 if len(data) % 2 != 0: return {"error": "数据必须是成对的"} pairs = [(data[i], data[i+1]) for i in range(0, len(data), 2)] x_values = [pair[0] for pair in pairs if isinstance(pair[0], (int, float))] y_values = [pair[1] for pair in pairs if isinstance(pair[1], (int, float))] if len(x_values) != len(y_values) or len(x_values) < 2: return {"error": "有效的数据对不足"} # 简化的相关系数计算 n = len(x_values) sum_x = sum(x_values) sum_y = sum(y_values) sum_xy = sum(x * y for x, y in zip(x_values, y_values)) sum_x2 = sum(x * x for x in x_values) sum_y2 = sum(y * y for y in y_values) numerator = n * sum_xy - sum_x * sum_y denominator = ((n * sum_x2 - sum_x * sum_x) * (n * sum_y2 - sum_y * sum_y)) ** 0.5 if denominator == 0: correlation = 0 else: correlation = numerator / denominator return { "correlation_coefficient": correlation, "data_points": n, "interpretation": "强正相关" if correlation > 0.7 else "中等正相关" if correlation > 0.3 else "弱正相关" if correlation > 0 else "强负相关" if correlation < -0.7 else "中等负相关" if correlation < -0.3 else "弱负相关" if correlation < 0 else "无相关" } # 注册自定义工具 def register_custom_tools(mcp_server: MCPServer): """注册自定义工具到MCP服务器""" # 注册图像处理工具 image_tool = ImageProcessingTool() mcp_server.tools[image_tool.name] = image_tool.execute # 注册数据分析工具 data_tool = DataAnalysisTool() mcp_server.tools[data_tool.name] = data_tool.execute return mcp_server
调试与测试技巧
MCP开发的调试和测试策略:
# 调试和测试工具 import logging import unittest from unittest.mock import Mock, patch class MCPDebugger: def __init__(self): self.logger = logging.getLogger("mcp_debugger") self.logger.setLevel(logging.DEBUG) # 添加控制台处理器 handler = logging.StreamHandler() formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) handler.setFormatter(formatter) self.logger.addHandler(handler) def log_request(self, request: Dict[str, Any]): """记录请求日志""" self.logger.info(f"发送请求: {json.dumps(request, ensure_ascii=False, indent=2)}") def log_response(self, response: Dict[str, Any]): """记录响应日志""" self.logger.info(f"收到响应: {json.dumps(response, ensure_ascii=False, indent=2)}") def log_error(self, error: Exception): """记录错误日志""" self.logger.error(f"发生错误: {str(error)}", exc_info=True) # MCP测试框架 class MCPTestCase(unittest.TestCase): def setUp(self): """测试设置""" self.mcp_server = MCPServer() self.mcp_client = MCPClient("http://localhost:3000") self.debugger = MCPDebugger() async def test_tool_registration(self): """测试工具注册""" # 注册测试工具 async def test_tool(arguments): return {"result": "test_success", "input": arguments} self.mcp_server.tools["test/tool"] = test_tool # 验证工具列表 tools = await self.mcp_client.list_tools() self.assertIn("test/tool", tools) async def test_tool_execution(self): """测试工具执行""" # 注册测试工具 async def calculator(arguments): a = arguments.get("a", 0) b = arguments.get("b", 0) operation = arguments.get("operation", "+") if operation == "+": return {"result": a + b} elif operation == "-": return {"result": a - b} elif operation == "*": return {"result": a * b} elif operation == "/": if b == 0: raise MCPError(-32602, "除数不能为零") return {"result": a / b} else: raise MCPError(-32602, f"不支持的操作: {operation}") self.mcp_server.tools["calculator/compute"] = calculator # 测试各种运算 test_cases = [ ({"a": 10, "b": 5, "operation": "+"}, 15), ({"a": 10, "b": 5, "operation": "-"}, 5), ({"a": 10, "b": 5, "operation": "*"}, 50), ({"a": 10, "b": 5, "operation": "/"}, 2), ] for args, expected in test_cases: result = await self.mcp_client.call_tool("calculator/compute", args) self.assertEqual(result["result"], expected) async def test_error_handling(self): """测试错误处理""" # 注册会抛出错误的工具 async def error_tool(arguments): raise MCPError(-32602, "测试错误") self.mcp_server.tools["test/error"] = error_tool # 测试错误处理 with self.assertRaises(MCPError) as context: await self.mcp_client.call_tool("test/error", {}) self.assertEqual(context.exception.code, -32602) self.assertEqual(context.exception.message, "测试错误") # 性能测试 class MCPPerformanceTest: def __init__(self): self.mcp_client = MCPClient("http://localhost:3000") async def benchmark_tool_calls(self, tool_name: str, arguments: Dict[str, Any], iterations: int = 100): """基准测试工具调用性能""" import time start_time = time.time() # 并发执行多次调用 tasks = [ self.mcp_client.call_tool(tool_name, arguments) for _ in range(iterations) ] results = await asyncio.gather(*tasks) end_time = time.time() total_time = end_time - start_time avg_time = total_time / iterations return { "total_time": total_time, "iterations": iterations, "average_time": avg_time, "throughput": iterations / total_time, "success_rate": len([r for r in results if "error" not in r]) / len(results) } async def stress_test(self, concurrent_requests: int = 50): """压力测试""" async def make_request(request_id: int): try: result = await self.mcp_client.call_tool("test/stress", {"id": request_id}) return {"id": request_id, "status": "success", "result": result} except Exception as e: return {"id": request_id, "status": "error", "error": str(e)} tasks = [make_request(i) for i in range(concurrent_requests)] results = await asyncio.gather(*tasks) success_count = len([r for r in results if r["status"] == "success"]) error_count = len([r for r in results if r["status"] == "error"]) return { "total_requests": concurrent_requests, "successful_requests": success_count, "failed_requests": error_count, "success_rate": success_count / concurrent_requests } # 使用示例 async def run_tests(): """运行所有测试""" # 单元测试 test_case = MCPTestCase() await test_case.test_tool_registration() await test_case.test_tool_execution() await test_case.test_error_handling() # 性能测试 perf_test = MCPPerformanceTest() benchmark_result = await perf_test.benchmark_tool_calls( "calculator/compute", {"a": 10, "b": 5, "operation": "+"}, 100 ) print(f"性能测试结果: {benchmark_result}") stress_result = await perf_test.stress_test(50) print(f"压力测试结果: {stress_result}") if __name__ == "__main__": asyncio.run(run_tests())
通过以上完整的MCP开发指南,你可以看到MCP协议不仅是一个技术标准,更是一个完整的生态系统。它通过标准化的接口、灵活的架构设计和丰富的工具生态,为AI应用开发提供了强大的基础设施支持。
无论是简单的API调用,还是复杂的业务逻辑处理,MCP都能提供统一、安全、高效的解决方案。随着AI技术的不断发展,MCP协议将继续演进,为构建更智能、更强大的AI应用生态系统贡献力量。
#MCP#