#!/usr/bin/env python3 """ 通用MCP工具包装器 为所有MCP工具提供目录访问控制和安全限制 """ import os import sys import json import asyncio import subprocess from typing import Dict, Any, Optional from pathlib import Path class MCPSecurityWrapper: """MCP安全包装器""" def __init__(self, allowed_directory: str): self.allowed_directory = os.path.abspath(allowed_directory) self.project_id = os.getenv("PROJECT_ID", "default") def validate_path(self, path: str) -> str: """验证路径是否在允许的目录内""" if not os.path.isabs(path): path = os.path.abspath(path) # 规范化路径 path = os.path.normpath(path) # 检查路径遍历 if ".." in path.split(os.sep): raise ValueError(f"路径遍历攻击被阻止: {path}") # 检查是否在允许的目录内 if not path.startswith(self.allowed_directory): raise ValueError(f"访问被拒绝: {path} 不在允许的目录 {self.allowed_directory} 内") return path def safe_execute_command(self, command: list, cwd: Optional[str] = None) -> Dict[str, Any]: """安全执行命令,限制工作目录""" if cwd is None: cwd = self.allowed_directory else: cwd = self.validate_path(cwd) # 设置环境变量限制 env = os.environ.copy() env["PWD"] = cwd env["PROJECT_DATA_DIR"] = self.allowed_directory env["PROJECT_ID"] = self.project_id try: result = subprocess.run( command, cwd=cwd, env=env, capture_output=True, text=True, timeout=30 # 30秒超时 ) return { "success": result.returncode == 0, "stdout": result.stdout, "stderr": result.stderr, "returncode": result.returncode } except subprocess.TimeoutExpired: return { "success": False, "stdout": "", "stderr": "命令执行超时", "returncode": -1 } except Exception as e: return { "success": False, "stdout": "", "stderr": str(e), "returncode": -1 } async def handle_wrapped_request(request: Dict[str, Any], tool_name: str) -> Dict[str, Any]: """处理包装后的MCP请求""" try: method = request.get("method") params = request.get("params", {}) request_id = request.get("id") allowed_dir = get_allowed_directory() wrapper = MCPSecurityWrapper(allowed_dir) if method == "initialize": return { "jsonrpc": "2.0", "id": request_id, "result": { "protocolVersion": "2024-11-05", "capabilities": { "tools": {} }, "serverInfo": { "name": f"{tool_name}-wrapper", "version": "1.0.0" } } } elif method == "ping": return { "jsonrpc": "2.0", "id": request_id, "result": { "pong": True } } elif method == "tools/list": return { "jsonrpc": "2.0", "id": request_id, "result": { "tools": get_tool_definitions(tool_name) } } elif method == "tools/call": return await execute_tool_call(wrapper, tool_name, params, request_id) else: return { "jsonrpc": "2.0", "id": request_id, "error": { "code": -32601, "message": f"Unknown method: {method}" } } except Exception as e: return { "jsonrpc": "2.0", "id": request.get("id"), "error": { "code": -32603, "message": f"Internal error: {str(e)}" } } def get_allowed_directory(): """获取允许访问的目录""" project_dir = os.getenv("PROJECT_DATA_DIR", "./data") return os.path.abspath(project_dir) def get_tool_definitions(tool_name: str) -> list: """根据工具名称返回工具定义""" if tool_name == "ripgrep-wrapper": return [ { "name": "ripgrep_search", "description": "在项目目录内搜索文本", "inputSchema": { "type": "object", "properties": { "pattern": {"type": "string", "description": "搜索模式"}, "path": {"type": "string", "description": "搜索路径(相对于项目目录)"}, "maxResults": {"type": "integer", "default": 100} }, "required": ["pattern"] } } ] elif tool_name == "directory-tree-wrapper": return [ { "name": "get_directory_tree", "description": "获取项目目录结构", "inputSchema": { "type": "object", "properties": { "path": {"type": "string", "description": "目录路径(相对于项目目录)"}, "max_depth": {"type": "integer", "default": 3} } } } ] else: return [] async def execute_tool_call(wrapper: MCPSecurityWrapper, tool_name: str, params: Dict[str, Any], request_id: Any) -> Dict[str, Any]: """执行工具调用""" try: if tool_name == "ripgrep-wrapper": return await execute_ripgrep_search(wrapper, params, request_id) elif tool_name == "directory-tree-wrapper": return await execute_directory_tree(wrapper, params, request_id) else: return { "jsonrpc": "2.0", "id": request_id, "error": { "code": -32601, "message": f"Unknown tool: {tool_name}" } } except Exception as e: return { "jsonrpc": "2.0", "id": request_id, "error": { "code": -32603, "message": str(e) } } async def execute_ripgrep_search(wrapper: MCPSecurityWrapper, params: Dict[str, Any], request_id: Any) -> Dict[str, Any]: """执行ripgrep搜索""" pattern = params.get("pattern", "") path = params.get("path", ".") max_results = params.get("maxResults", 100) # 验证和构建搜索路径 search_path = os.path.join(wrapper.allowed_directory, path) search_path = wrapper.validate_path(search_path) # 构建ripgrep命令 command = [ "rg", "--json", "--max-count", str(max_results), pattern, search_path ] result = wrapper.safe_execute_command(command) if result["success"]: return { "jsonrpc": "2.0", "id": request_id, "result": { "content": [ { "type": "text", "text": result["stdout"] } ] } } else: return { "jsonrpc": "2.0", "id": request_id, "error": { "code": -32603, "message": f"搜索失败: {result['stderr']}" } } async def execute_directory_tree(wrapper: MCPSecurityWrapper, params: Dict[str, Any], request_id: Any) -> Dict[str, Any]: """执行目录树获取""" path = params.get("path", ".") max_depth = params.get("max_depth", 3) # 验证和构建目录路径 dir_path = os.path.join(wrapper.allowed_directory, path) dir_path = wrapper.validate_path(dir_path) # 构建目录树命令 command = [ "find", dir_path, "-type", "d", "-maxdepth", str(max_depth) ] result = wrapper.safe_execute_command(command) if result["success"]: return { "jsonrpc": "2.0", "id": request_id, "result": { "content": [ { "type": "text", "text": result["stdout"] } ] } } else: return { "jsonrpc": "2.0", "id": request_id, "error": { "code": -32603, "message": f"获取目录树失败: {result['stderr']}" } }