265 lines
12 KiB
Python
265 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
MCP Server for JSON file operations.
|
|
Provides functions to:
|
|
1. Get top-level keys from a JSON file
|
|
2. Get value of a specific key from a JSON file
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
import asyncio
|
|
from typing import Any, Dict, List
|
|
from mcp_common import (
|
|
get_allowed_directory,
|
|
load_tools_from_json,
|
|
resolve_file_path,
|
|
create_error_response,
|
|
create_success_response,
|
|
create_initialize_response,
|
|
create_ping_response,
|
|
create_tools_list_response,
|
|
handle_mcp_streaming
|
|
)
|
|
|
|
|
|
async def handle_request(request: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Handle MCP request"""
|
|
try:
|
|
method = request.get("method")
|
|
params = request.get("params", {})
|
|
request_id = request.get("id")
|
|
|
|
if method == "initialize":
|
|
return create_initialize_response(request_id, "json-reader")
|
|
|
|
elif method == "ping":
|
|
return create_ping_response(request_id)
|
|
|
|
elif method == "tools/list":
|
|
# 从 JSON 文件加载工具定义
|
|
tools = load_tools_from_json("json_reader_tools.json")
|
|
return create_tools_list_response(request_id, tools)
|
|
|
|
elif method == "tools/call":
|
|
tool_name = params.get("name")
|
|
arguments = params.get("arguments", {})
|
|
|
|
if tool_name == "get_all_keys":
|
|
file_path = arguments.get("file_path")
|
|
key_path = arguments.get("key_path")
|
|
|
|
if not file_path:
|
|
return create_error_response(request_id, -32602, "file_path is required")
|
|
|
|
try:
|
|
# 解析文件路径,支持 folder/document.txt 和 document.txt 格式
|
|
file_path = resolve_file_path(file_path)
|
|
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
|
|
# If key_path is provided, navigate to that path first
|
|
if key_path:
|
|
keys = key_path.split('.')
|
|
current = data
|
|
|
|
for key in keys:
|
|
# Handle array indices like [0], [1], etc.
|
|
if '[' in key and key.endswith(']'):
|
|
base_key = key.split('[')[0]
|
|
if base_key:
|
|
if isinstance(current, dict) and base_key in current:
|
|
current = current[base_key]
|
|
else:
|
|
raise ValueError(f"Key '{base_key}' not found")
|
|
|
|
# Extract array index
|
|
index_str = key.split('[')[1].rstrip(']')
|
|
try:
|
|
index = int(index_str)
|
|
if isinstance(current, list) and 0 <= index < len(current):
|
|
current = current[index]
|
|
else:
|
|
raise ValueError(f"Array index {index} out of bounds")
|
|
except ValueError:
|
|
raise ValueError(f"Invalid array index: {index_str}")
|
|
else:
|
|
if isinstance(current, dict) and key in current:
|
|
current = current[key]
|
|
else:
|
|
raise ValueError(f"Key '{key}' not found")
|
|
|
|
# Get keys from the target location
|
|
if isinstance(current, dict):
|
|
keys = list(current.keys())
|
|
elif isinstance(current, list):
|
|
keys = [f"[{i}]" for i in range(len(current))]
|
|
else:
|
|
keys = []
|
|
else:
|
|
# Get top-level keys
|
|
if isinstance(data, dict):
|
|
keys = list(data.keys())
|
|
elif isinstance(data, list):
|
|
keys = [f"[{i}]" for i in range(len(data))]
|
|
else:
|
|
keys = []
|
|
|
|
return create_success_response(request_id, {
|
|
"content": [
|
|
{
|
|
"type": "text",
|
|
"text": json.dumps(keys, indent=2, ensure_ascii=False)
|
|
}
|
|
]
|
|
})
|
|
|
|
except Exception as e:
|
|
return create_error_response(request_id, -32603, str(e))
|
|
|
|
elif tool_name == "get_value":
|
|
file_path = arguments.get("file_path")
|
|
key_path = arguments.get("key_path")
|
|
|
|
if not file_path or not key_path:
|
|
return create_error_response(request_id, -32602, "file_path and key_path are required")
|
|
|
|
try:
|
|
# 解析文件路径,支持 folder/document.txt 和 document.txt 格式
|
|
file_path = resolve_file_path(file_path)
|
|
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
|
|
# Parse the key path (supports dot notation and array indices)
|
|
keys = key_path.split('.')
|
|
current = data
|
|
|
|
for key in keys:
|
|
# Handle array indices like [0], [1], etc.
|
|
if '[' in key and key.endswith(']'):
|
|
base_key = key.split('[')[0]
|
|
if base_key:
|
|
if isinstance(current, dict) and base_key in current:
|
|
current = current[base_key]
|
|
else:
|
|
raise ValueError(f"Key '{base_key}' not found")
|
|
|
|
# Extract array index
|
|
index_str = key.split('[')[1].rstrip(']')
|
|
try:
|
|
index = int(index_str)
|
|
if isinstance(current, list) and 0 <= index < len(current):
|
|
current = current[index]
|
|
else:
|
|
raise ValueError(f"Array index {index} out of bounds")
|
|
except ValueError:
|
|
raise ValueError(f"Invalid array index: {index_str}")
|
|
else:
|
|
if isinstance(current, dict) and key in current:
|
|
current = current[key]
|
|
else:
|
|
raise ValueError(f"Key '{key}' not found")
|
|
|
|
return create_success_response(request_id, {
|
|
"content": [
|
|
{
|
|
"type": "text",
|
|
"text": json.dumps(current, indent=2, ensure_ascii=False)
|
|
}
|
|
]
|
|
})
|
|
|
|
except Exception as e:
|
|
return create_error_response(request_id, -32603, str(e))
|
|
|
|
elif tool_name == "get_multiple_values":
|
|
file_path = arguments.get("file_path")
|
|
key_paths = arguments.get("key_paths")
|
|
|
|
if not file_path or not key_paths:
|
|
return create_error_response(request_id, -32602, "file_path and key_paths are required")
|
|
|
|
if not isinstance(key_paths, list):
|
|
return create_error_response(request_id, -32602, "key_paths must be an array")
|
|
|
|
try:
|
|
# 解析文件路径,支持 folder/document.txt 和 document.txt 格式
|
|
file_path = resolve_file_path(file_path)
|
|
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
|
|
results = {}
|
|
errors = {}
|
|
|
|
# Process each key path
|
|
for key_path in key_paths:
|
|
try:
|
|
# Parse the key path (supports dot notation and array indices)
|
|
keys = key_path.split('.')
|
|
current = data
|
|
|
|
for key in keys:
|
|
# Handle array indices like [0], [1], etc.
|
|
if '[' in key and key.endswith(']'):
|
|
base_key = key.split('[')[0]
|
|
if base_key:
|
|
if isinstance(current, dict) and base_key in current:
|
|
current = current[base_key]
|
|
else:
|
|
raise ValueError(f"Key '{base_key}' not found")
|
|
|
|
# Extract array index
|
|
index_str = key.split('[')[1].rstrip(']')
|
|
try:
|
|
index = int(index_str)
|
|
if isinstance(current, list) and 0 <= index < len(current):
|
|
current = current[index]
|
|
else:
|
|
raise ValueError(f"Array index {index} out of bounds")
|
|
except ValueError:
|
|
raise ValueError(f"Invalid array index: {index_str}")
|
|
else:
|
|
if isinstance(current, dict) and key in current:
|
|
current = current[key]
|
|
else:
|
|
raise ValueError(f"Key '{key}' not found")
|
|
|
|
results[key_path] = current
|
|
|
|
except Exception as e:
|
|
errors[key_path] = str(e)
|
|
|
|
return create_success_response(request_id, {
|
|
"content": [
|
|
{
|
|
"type": "text",
|
|
"text": json.dumps({
|
|
"results": results,
|
|
"errors": errors
|
|
}, indent=2, ensure_ascii=False)
|
|
}
|
|
]
|
|
})
|
|
|
|
except Exception as e:
|
|
return create_error_response(request_id, -32603, str(e))
|
|
|
|
else:
|
|
return create_error_response(request_id, -32601, f"Unknown tool: {tool_name}")
|
|
|
|
else:
|
|
return create_error_response(request_id, -32601, f"Unknown method: {method}")
|
|
|
|
except Exception as e:
|
|
return create_error_response(request.get("id"), -32603, f"Internal error: {str(e)}")
|
|
|
|
async def main():
|
|
"""Main entry point."""
|
|
await handle_mcp_streaming(handle_request)
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main()) |