#!/usr/bin/env python3 """ MCP Server for JSON file operations. Provides functions to: 1. Get top-level keys from a JSON file 2. Get value of a specific key from a JSON file """ import json import os import sys import asyncio from typing import Any, Dict, List from mcp_common import ( get_allowed_directory, load_tools_from_json, resolve_file_path, create_error_response, create_success_response, create_initialize_response, create_ping_response, create_tools_list_response, handle_mcp_streaming ) async def handle_request(request: Dict[str, Any]) -> Dict[str, Any]: """Handle MCP request""" try: method = request.get("method") params = request.get("params", {}) request_id = request.get("id") if method == "initialize": return create_initialize_response(request_id, "json-reader") elif method == "ping": return create_ping_response(request_id) elif method == "tools/list": # 从 JSON 文件加载工具定义 tools = load_tools_from_json("json_reader_tools.json") return create_tools_list_response(request_id, tools) elif method == "tools/call": tool_name = params.get("name") arguments = params.get("arguments", {}) if tool_name == "get_all_keys": file_path = arguments.get("file_path") key_path = arguments.get("key_path") if not file_path: return create_error_response(request_id, -32602, "file_path is required") try: # 解析文件路径,支持 folder/document.txt 和 document.txt 格式 file_path = resolve_file_path(file_path) with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) # If key_path is provided, navigate to that path first if key_path: keys = key_path.split('.') current = data for key in keys: # Handle array indices like [0], [1], etc. if '[' in key and key.endswith(']'): base_key = key.split('[')[0] if base_key: if isinstance(current, dict) and base_key in current: current = current[base_key] else: raise ValueError(f"Key '{base_key}' not found") # Extract array index index_str = key.split('[')[1].rstrip(']') try: index = int(index_str) if isinstance(current, list) and 0 <= index < len(current): current = current[index] else: raise ValueError(f"Array index {index} out of bounds") except ValueError: raise ValueError(f"Invalid array index: {index_str}") else: if isinstance(current, dict) and key in current: current = current[key] else: raise ValueError(f"Key '{key}' not found") # Get keys from the target location if isinstance(current, dict): keys = list(current.keys()) elif isinstance(current, list): keys = [f"[{i}]" for i in range(len(current))] else: keys = [] else: # Get top-level keys if isinstance(data, dict): keys = list(data.keys()) elif isinstance(data, list): keys = [f"[{i}]" for i in range(len(data))] else: keys = [] return create_success_response(request_id, { "content": [ { "type": "text", "text": json.dumps(keys, indent=2, ensure_ascii=False) } ] }) except Exception as e: return create_error_response(request_id, -32603, str(e)) elif tool_name == "get_value": file_path = arguments.get("file_path") key_path = arguments.get("key_path") if not file_path or not key_path: return create_error_response(request_id, -32602, "file_path and key_path are required") try: # 解析文件路径,支持 folder/document.txt 和 document.txt 格式 file_path = resolve_file_path(file_path) with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) # Parse the key path (supports dot notation and array indices) keys = key_path.split('.') current = data for key in keys: # Handle array indices like [0], [1], etc. if '[' in key and key.endswith(']'): base_key = key.split('[')[0] if base_key: if isinstance(current, dict) and base_key in current: current = current[base_key] else: raise ValueError(f"Key '{base_key}' not found") # Extract array index index_str = key.split('[')[1].rstrip(']') try: index = int(index_str) if isinstance(current, list) and 0 <= index < len(current): current = current[index] else: raise ValueError(f"Array index {index} out of bounds") except ValueError: raise ValueError(f"Invalid array index: {index_str}") else: if isinstance(current, dict) and key in current: current = current[key] else: raise ValueError(f"Key '{key}' not found") return create_success_response(request_id, { "content": [ { "type": "text", "text": json.dumps(current, indent=2, ensure_ascii=False) } ] }) except Exception as e: return create_error_response(request_id, -32603, str(e)) elif tool_name == "get_multiple_values": file_path = arguments.get("file_path") key_paths = arguments.get("key_paths") if not file_path or not key_paths: return create_error_response(request_id, -32602, "file_path and key_paths are required") if not isinstance(key_paths, list): return create_error_response(request_id, -32602, "key_paths must be an array") try: # 解析文件路径,支持 folder/document.txt 和 document.txt 格式 file_path = resolve_file_path(file_path) with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) results = {} errors = {} # Process each key path for key_path in key_paths: try: # Parse the key path (supports dot notation and array indices) keys = key_path.split('.') current = data for key in keys: # Handle array indices like [0], [1], etc. if '[' in key and key.endswith(']'): base_key = key.split('[')[0] if base_key: if isinstance(current, dict) and base_key in current: current = current[base_key] else: raise ValueError(f"Key '{base_key}' not found") # Extract array index index_str = key.split('[')[1].rstrip(']') try: index = int(index_str) if isinstance(current, list) and 0 <= index < len(current): current = current[index] else: raise ValueError(f"Array index {index} out of bounds") except ValueError: raise ValueError(f"Invalid array index: {index_str}") else: if isinstance(current, dict) and key in current: current = current[key] else: raise ValueError(f"Key '{key}' not found") results[key_path] = current except Exception as e: errors[key_path] = str(e) return create_success_response(request_id, { "content": [ { "type": "text", "text": json.dumps({ "results": results, "errors": errors }, indent=2, ensure_ascii=False) } ] }) except Exception as e: return create_error_response(request_id, -32603, str(e)) else: return create_error_response(request_id, -32601, f"Unknown tool: {tool_name}") else: return create_error_response(request_id, -32601, f"Unknown method: {method}") except Exception as e: return create_error_response(request.get("id"), -32603, f"Internal error: {str(e)}") async def main(): """Main entry point.""" await handle_mcp_streaming(handle_request) if __name__ == "__main__": asyncio.run(main())