#!/usr/bin/env python3 """ MCP Server for JSON file operations. Provides functions to: 1. Get top-level keys from a JSON file 2. Get value of a specific key from a JSON file """ import json import os import sys import asyncio from typing import Any, Dict, List def validate_file_path(file_path: str, allowed_dir: str) -> str: """验证文件路径是否在允许的目录内""" # 转换为绝对路径 if not os.path.isabs(file_path): file_path = os.path.abspath(file_path) allowed_dir = os.path.abspath(allowed_dir) # 检查路径是否在允许的目录内 if not file_path.startswith(allowed_dir): raise ValueError(f"访问被拒绝: 路径 {file_path} 不在允许的目录 {allowed_dir} 内") # 检查路径遍历攻击 if ".." in file_path: raise ValueError(f"访问被拒绝: 检测到路径遍历攻击尝试") return file_path def get_allowed_directory(): """获取允许访问的目录""" # 从环境变量读取项目数据目录 project_dir = os.getenv("PROJECT_DATA_DIR", "./projects") return os.path.abspath(project_dir) def load_tools_from_json() -> List[Dict[str, Any]]: """从 JSON 文件加载工具定义""" try: tools_file = os.path.join(os.path.dirname(__file__), "tools", "json_reader_tools.json") if os.path.exists(tools_file): with open(tools_file, 'r', encoding='utf-8') as f: return json.load(f) else: # 如果 JSON 文件不存在,使用默认定义 return [] except Exception as e: print(f"警告: 无法加载工具定义 JSON 文件: {str(e)}") return [] async def handle_request(request: Dict[str, Any]) -> Dict[str, Any]: """Handle MCP request""" try: method = request.get("method") params = request.get("params", {}) request_id = request.get("id") if method == "initialize": return { "jsonrpc": "2.0", "id": request_id, "result": { "protocolVersion": "2024-11-05", "capabilities": { "tools": {} }, "serverInfo": { "name": "json-reader", "version": "1.0.0" } } } elif method == "ping": return { "jsonrpc": "2.0", "id": request_id, "result": { "pong": True } } elif method == "tools/list": # 从 JSON 文件加载工具定义 tools = load_tools_from_json() return { "jsonrpc": "2.0", "id": request_id, "result": { "tools": tools } } elif method == "tools/call": tool_name = params.get("name") arguments = params.get("arguments", {}) if tool_name == "get_all_keys": file_path = arguments.get("file_path") key_path = arguments.get("key_path") if not file_path: return { "jsonrpc": "2.0", "id": request_id, "error": { "code": -32602, "message": "file_path is required" } } try: # 验证文件路径是否在允许的目录内 allowed_dir = get_allowed_directory() file_path = validate_file_path(file_path, allowed_dir) with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) # If key_path is provided, navigate to that path first if key_path: keys = key_path.split('.') current = data for key in keys: # Handle array indices like [0], [1], etc. if '[' in key and key.endswith(']'): base_key = key.split('[')[0] if base_key: if isinstance(current, dict) and base_key in current: current = current[base_key] else: raise ValueError(f"Key '{base_key}' not found") # Extract array index index_str = key.split('[')[1].rstrip(']') try: index = int(index_str) if isinstance(current, list) and 0 <= index < len(current): current = current[index] else: raise ValueError(f"Array index {index} out of bounds") except ValueError: raise ValueError(f"Invalid array index: {index_str}") else: if isinstance(current, dict) and key in current: current = current[key] else: raise ValueError(f"Key '{key}' not found") # Get keys from the target location if isinstance(current, dict): keys = list(current.keys()) elif isinstance(current, list): keys = [f"[{i}]" for i in range(len(current))] else: keys = [] else: # Get top-level keys if isinstance(data, dict): keys = list(data.keys()) elif isinstance(data, list): keys = [f"[{i}]" for i in range(len(data))] else: keys = [] return { "jsonrpc": "2.0", "id": request_id, "result": { "content": [ { "type": "text", "text": json.dumps(keys, indent=2, ensure_ascii=False) } ] } } except Exception as e: return { "jsonrpc": "2.0", "id": request_id, "error": { "code": -32603, "message": str(e) } } elif tool_name == "get_value": file_path = arguments.get("file_path") key_path = arguments.get("key_path") if not file_path or not key_path: return { "jsonrpc": "2.0", "id": request_id, "error": { "code": -32602, "message": "file_path and key_path are required" } } try: # 验证文件路径是否在允许的目录内 allowed_dir = get_allowed_directory() file_path = validate_file_path(file_path, allowed_dir) with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) # Parse the key path (supports dot notation and array indices) keys = key_path.split('.') current = data for key in keys: # Handle array indices like [0], [1], etc. if '[' in key and key.endswith(']'): base_key = key.split('[')[0] if base_key: if isinstance(current, dict) and base_key in current: current = current[base_key] else: raise ValueError(f"Key '{base_key}' not found") # Extract array index index_str = key.split('[')[1].rstrip(']') try: index = int(index_str) if isinstance(current, list) and 0 <= index < len(current): current = current[index] else: raise ValueError(f"Array index {index} out of bounds") except ValueError: raise ValueError(f"Invalid array index: {index_str}") else: if isinstance(current, dict) and key in current: current = current[key] else: raise ValueError(f"Key '{key}' not found") return { "jsonrpc": "2.0", "id": request_id, "result": { "content": [ { "type": "text", "text": json.dumps(current, indent=2, ensure_ascii=False) } ] } } except Exception as e: return { "jsonrpc": "2.0", "id": request_id, "error": { "code": -32603, "message": str(e) } } elif tool_name == "get_multiple_values": file_path = arguments.get("file_path") key_paths = arguments.get("key_paths") if not file_path or not key_paths: return { "jsonrpc": "2.0", "id": request_id, "error": { "code": -32602, "message": "file_path and key_paths are required" } } if not isinstance(key_paths, list): return { "jsonrpc": "2.0", "id": request_id, "error": { "code": -32602, "message": "key_paths must be an array" } } try: # 验证文件路径是否在允许的目录内 allowed_dir = get_allowed_directory() file_path = validate_file_path(file_path, allowed_dir) with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) results = {} errors = {} # Process each key path for key_path in key_paths: try: # Parse the key path (supports dot notation and array indices) keys = key_path.split('.') current = data for key in keys: # Handle array indices like [0], [1], etc. if '[' in key and key.endswith(']'): base_key = key.split('[')[0] if base_key: if isinstance(current, dict) and base_key in current: current = current[base_key] else: raise ValueError(f"Key '{base_key}' not found") # Extract array index index_str = key.split('[')[1].rstrip(']') try: index = int(index_str) if isinstance(current, list) and 0 <= index < len(current): current = current[index] else: raise ValueError(f"Array index {index} out of bounds") except ValueError: raise ValueError(f"Invalid array index: {index_str}") else: if isinstance(current, dict) and key in current: current = current[key] else: raise ValueError(f"Key '{key}' not found") results[key_path] = current except Exception as e: errors[key_path] = str(e) return { "jsonrpc": "2.0", "id": request_id, "result": { "content": [ { "type": "text", "text": json.dumps({ "results": results, "errors": errors }, indent=2, ensure_ascii=False) } ] } } except Exception as e: return { "jsonrpc": "2.0", "id": request_id, "error": { "code": -32603, "message": str(e) } } else: return { "jsonrpc": "2.0", "id": request_id, "error": { "code": -32601, "message": f"Unknown tool: {tool_name}" } } else: return { "jsonrpc": "2.0", "id": request_id, "error": { "code": -32601, "message": f"Unknown method: {method}" } } except Exception as e: return { "jsonrpc": "2.0", "id": request.get("id"), "error": { "code": -32603, "message": f"Internal error: {str(e)}" } } async def main(): """Main entry point.""" try: while True: # Read from stdin line = await asyncio.get_event_loop().run_in_executor(None, sys.stdin.readline) if not line: break line = line.strip() if not line: continue try: request = json.loads(line) response = await handle_request(request) # Write to stdout sys.stdout.write(json.dumps(response) + "\n") sys.stdout.flush() except json.JSONDecodeError: error_response = { "jsonrpc": "2.0", "error": { "code": -32700, "message": "Parse error" } } sys.stdout.write(json.dumps(error_response) + "\n") sys.stdout.flush() except Exception as e: error_response = { "jsonrpc": "2.0", "error": { "code": -32603, "message": f"Internal error: {str(e)}" } } sys.stdout.write(json.dumps(error_response) + "\n") sys.stdout.flush() except KeyboardInterrupt: pass if __name__ == "__main__": asyncio.run(main())