qwen_agent/mcp/json_reader_server.py
2025-10-22 23:04:49 +08:00

265 lines
12 KiB
Python

#!/usr/bin/env python3
"""
MCP Server for JSON file operations.
Provides functions to:
1. Get top-level keys from a JSON file
2. Get value of a specific key from a JSON file
"""
import json
import os
import sys
import asyncio
from typing import Any, Dict, List
from mcp_common import (
get_allowed_directory,
load_tools_from_json,
resolve_file_path,
create_error_response,
create_success_response,
create_initialize_response,
create_ping_response,
create_tools_list_response,
handle_mcp_streaming
)
async def handle_request(request: Dict[str, Any]) -> Dict[str, Any]:
"""Handle MCP request"""
try:
method = request.get("method")
params = request.get("params", {})
request_id = request.get("id")
if method == "initialize":
return create_initialize_response(request_id, "json-reader")
elif method == "ping":
return create_ping_response(request_id)
elif method == "tools/list":
# 从 JSON 文件加载工具定义
tools = load_tools_from_json("json_reader_tools.json")
return create_tools_list_response(request_id, tools)
elif method == "tools/call":
tool_name = params.get("name")
arguments = params.get("arguments", {})
if tool_name == "get_all_keys":
file_path = arguments.get("file_path")
key_path = arguments.get("key_path")
if not file_path:
return create_error_response(request_id, -32602, "file_path is required")
try:
# 解析文件路径,支持 folder/document.txt 和 document.txt 格式
file_path = resolve_file_path(file_path)
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# If key_path is provided, navigate to that path first
if key_path:
keys = key_path.split('.')
current = data
for key in keys:
# Handle array indices like [0], [1], etc.
if '[' in key and key.endswith(']'):
base_key = key.split('[')[0]
if base_key:
if isinstance(current, dict) and base_key in current:
current = current[base_key]
else:
raise ValueError(f"Key '{base_key}' not found")
# Extract array index
index_str = key.split('[')[1].rstrip(']')
try:
index = int(index_str)
if isinstance(current, list) and 0 <= index < len(current):
current = current[index]
else:
raise ValueError(f"Array index {index} out of bounds")
except ValueError:
raise ValueError(f"Invalid array index: {index_str}")
else:
if isinstance(current, dict) and key in current:
current = current[key]
else:
raise ValueError(f"Key '{key}' not found")
# Get keys from the target location
if isinstance(current, dict):
keys = list(current.keys())
elif isinstance(current, list):
keys = [f"[{i}]" for i in range(len(current))]
else:
keys = []
else:
# Get top-level keys
if isinstance(data, dict):
keys = list(data.keys())
elif isinstance(data, list):
keys = [f"[{i}]" for i in range(len(data))]
else:
keys = []
return create_success_response(request_id, {
"content": [
{
"type": "text",
"text": json.dumps(keys, indent=2, ensure_ascii=False)
}
]
})
except Exception as e:
return create_error_response(request_id, -32603, str(e))
elif tool_name == "get_value":
file_path = arguments.get("file_path")
key_path = arguments.get("key_path")
if not file_path or not key_path:
return create_error_response(request_id, -32602, "file_path and key_path are required")
try:
# 解析文件路径,支持 folder/document.txt 和 document.txt 格式
file_path = resolve_file_path(file_path)
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# Parse the key path (supports dot notation and array indices)
keys = key_path.split('.')
current = data
for key in keys:
# Handle array indices like [0], [1], etc.
if '[' in key and key.endswith(']'):
base_key = key.split('[')[0]
if base_key:
if isinstance(current, dict) and base_key in current:
current = current[base_key]
else:
raise ValueError(f"Key '{base_key}' not found")
# Extract array index
index_str = key.split('[')[1].rstrip(']')
try:
index = int(index_str)
if isinstance(current, list) and 0 <= index < len(current):
current = current[index]
else:
raise ValueError(f"Array index {index} out of bounds")
except ValueError:
raise ValueError(f"Invalid array index: {index_str}")
else:
if isinstance(current, dict) and key in current:
current = current[key]
else:
raise ValueError(f"Key '{key}' not found")
return create_success_response(request_id, {
"content": [
{
"type": "text",
"text": json.dumps(current, indent=2, ensure_ascii=False)
}
]
})
except Exception as e:
return create_error_response(request_id, -32603, str(e))
elif tool_name == "get_multiple_values":
file_path = arguments.get("file_path")
key_paths = arguments.get("key_paths")
if not file_path or not key_paths:
return create_error_response(request_id, -32602, "file_path and key_paths are required")
if not isinstance(key_paths, list):
return create_error_response(request_id, -32602, "key_paths must be an array")
try:
# 解析文件路径,支持 folder/document.txt 和 document.txt 格式
file_path = resolve_file_path(file_path)
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
results = {}
errors = {}
# Process each key path
for key_path in key_paths:
try:
# Parse the key path (supports dot notation and array indices)
keys = key_path.split('.')
current = data
for key in keys:
# Handle array indices like [0], [1], etc.
if '[' in key and key.endswith(']'):
base_key = key.split('[')[0]
if base_key:
if isinstance(current, dict) and base_key in current:
current = current[base_key]
else:
raise ValueError(f"Key '{base_key}' not found")
# Extract array index
index_str = key.split('[')[1].rstrip(']')
try:
index = int(index_str)
if isinstance(current, list) and 0 <= index < len(current):
current = current[index]
else:
raise ValueError(f"Array index {index} out of bounds")
except ValueError:
raise ValueError(f"Invalid array index: {index_str}")
else:
if isinstance(current, dict) and key in current:
current = current[key]
else:
raise ValueError(f"Key '{key}' not found")
results[key_path] = current
except Exception as e:
errors[key_path] = str(e)
return create_success_response(request_id, {
"content": [
{
"type": "text",
"text": json.dumps({
"results": results,
"errors": errors
}, indent=2, ensure_ascii=False)
}
]
})
except Exception as e:
return create_error_response(request_id, -32603, str(e))
else:
return create_error_response(request_id, -32601, f"Unknown tool: {tool_name}")
else:
return create_error_response(request_id, -32601, f"Unknown method: {method}")
except Exception as e:
return create_error_response(request.get("id"), -32603, f"Internal error: {str(e)}")
async def main():
"""Main entry point."""
await handle_mcp_streaming(handle_request)
if __name__ == "__main__":
asyncio.run(main())