qwen_agent/mcp/json_reader_server.py
2025-10-20 12:51:36 +08:00

447 lines
17 KiB
Python

#!/usr/bin/env python3
"""
MCP Server for JSON file operations.
Provides functions to:
1. Get top-level keys from a JSON file
2. Get value of a specific key from a JSON file
"""
import json
import os
import sys
import asyncio
from typing import Any, Dict, List
def validate_file_path(file_path: str, allowed_dir: str) -> str:
"""验证文件路径是否在允许的目录内"""
# 转换为绝对路径
if not os.path.isabs(file_path):
file_path = os.path.abspath(file_path)
allowed_dir = os.path.abspath(allowed_dir)
# 检查路径是否在允许的目录内
if not file_path.startswith(allowed_dir):
raise ValueError(f"访问被拒绝: 路径 {file_path} 不在允许的目录 {allowed_dir}")
# 检查路径遍历攻击
if ".." in file_path:
raise ValueError(f"访问被拒绝: 检测到路径遍历攻击尝试")
return file_path
def get_allowed_directory():
"""获取允许访问的目录"""
# 从环境变量读取项目数据目录
project_dir = os.getenv("PROJECT_DATA_DIR", "./projects")
return os.path.abspath(project_dir)
def load_tools_from_json() -> List[Dict[str, Any]]:
"""从 JSON 文件加载工具定义"""
try:
tools_file = os.path.join(os.path.dirname(__file__), "tools", "json_reader_tools.json")
if os.path.exists(tools_file):
with open(tools_file, 'r', encoding='utf-8') as f:
return json.load(f)
else:
# 如果 JSON 文件不存在,使用默认定义
return []
except Exception as e:
print(f"警告: 无法加载工具定义 JSON 文件: {str(e)}")
return []
async def handle_request(request: Dict[str, Any]) -> Dict[str, Any]:
"""Handle MCP request"""
try:
method = request.get("method")
params = request.get("params", {})
request_id = request.get("id")
if method == "initialize":
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {
"tools": {}
},
"serverInfo": {
"name": "json-reader",
"version": "1.0.0"
}
}
}
elif method == "ping":
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"pong": True
}
}
elif method == "tools/list":
# 从 JSON 文件加载工具定义
tools = load_tools_from_json()
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"tools": tools
}
}
elif method == "tools/call":
tool_name = params.get("name")
arguments = params.get("arguments", {})
if tool_name == "get_all_keys":
file_path = arguments.get("file_path")
key_path = arguments.get("key_path")
if not file_path:
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32602,
"message": "file_path is required"
}
}
try:
# 验证文件路径是否在允许的目录内
allowed_dir = get_allowed_directory()
file_path = validate_file_path(file_path, allowed_dir)
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# If key_path is provided, navigate to that path first
if key_path:
keys = key_path.split('.')
current = data
for key in keys:
# Handle array indices like [0], [1], etc.
if '[' in key and key.endswith(']'):
base_key = key.split('[')[0]
if base_key:
if isinstance(current, dict) and base_key in current:
current = current[base_key]
else:
raise ValueError(f"Key '{base_key}' not found")
# Extract array index
index_str = key.split('[')[1].rstrip(']')
try:
index = int(index_str)
if isinstance(current, list) and 0 <= index < len(current):
current = current[index]
else:
raise ValueError(f"Array index {index} out of bounds")
except ValueError:
raise ValueError(f"Invalid array index: {index_str}")
else:
if isinstance(current, dict) and key in current:
current = current[key]
else:
raise ValueError(f"Key '{key}' not found")
# Get keys from the target location
if isinstance(current, dict):
keys = list(current.keys())
elif isinstance(current, list):
keys = [f"[{i}]" for i in range(len(current))]
else:
keys = []
else:
# Get top-level keys
if isinstance(data, dict):
keys = list(data.keys())
elif isinstance(data, list):
keys = [f"[{i}]" for i in range(len(data))]
else:
keys = []
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"content": [
{
"type": "text",
"text": json.dumps(keys, indent=2, ensure_ascii=False)
}
]
}
}
except Exception as e:
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32603,
"message": str(e)
}
}
elif tool_name == "get_value":
file_path = arguments.get("file_path")
key_path = arguments.get("key_path")
if not file_path or not key_path:
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32602,
"message": "file_path and key_path are required"
}
}
try:
# 验证文件路径是否在允许的目录内
allowed_dir = get_allowed_directory()
file_path = validate_file_path(file_path, allowed_dir)
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# Parse the key path (supports dot notation and array indices)
keys = key_path.split('.')
current = data
for key in keys:
# Handle array indices like [0], [1], etc.
if '[' in key and key.endswith(']'):
base_key = key.split('[')[0]
if base_key:
if isinstance(current, dict) and base_key in current:
current = current[base_key]
else:
raise ValueError(f"Key '{base_key}' not found")
# Extract array index
index_str = key.split('[')[1].rstrip(']')
try:
index = int(index_str)
if isinstance(current, list) and 0 <= index < len(current):
current = current[index]
else:
raise ValueError(f"Array index {index} out of bounds")
except ValueError:
raise ValueError(f"Invalid array index: {index_str}")
else:
if isinstance(current, dict) and key in current:
current = current[key]
else:
raise ValueError(f"Key '{key}' not found")
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"content": [
{
"type": "text",
"text": json.dumps(current, indent=2, ensure_ascii=False)
}
]
}
}
except Exception as e:
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32603,
"message": str(e)
}
}
elif tool_name == "get_multiple_values":
file_path = arguments.get("file_path")
key_paths = arguments.get("key_paths")
if not file_path or not key_paths:
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32602,
"message": "file_path and key_paths are required"
}
}
if not isinstance(key_paths, list):
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32602,
"message": "key_paths must be an array"
}
}
try:
# 验证文件路径是否在允许的目录内
allowed_dir = get_allowed_directory()
file_path = validate_file_path(file_path, allowed_dir)
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
results = {}
errors = {}
# Process each key path
for key_path in key_paths:
try:
# Parse the key path (supports dot notation and array indices)
keys = key_path.split('.')
current = data
for key in keys:
# Handle array indices like [0], [1], etc.
if '[' in key and key.endswith(']'):
base_key = key.split('[')[0]
if base_key:
if isinstance(current, dict) and base_key in current:
current = current[base_key]
else:
raise ValueError(f"Key '{base_key}' not found")
# Extract array index
index_str = key.split('[')[1].rstrip(']')
try:
index = int(index_str)
if isinstance(current, list) and 0 <= index < len(current):
current = current[index]
else:
raise ValueError(f"Array index {index} out of bounds")
except ValueError:
raise ValueError(f"Invalid array index: {index_str}")
else:
if isinstance(current, dict) and key in current:
current = current[key]
else:
raise ValueError(f"Key '{key}' not found")
results[key_path] = current
except Exception as e:
errors[key_path] = str(e)
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"content": [
{
"type": "text",
"text": json.dumps({
"results": results,
"errors": errors
}, indent=2, ensure_ascii=False)
}
]
}
}
except Exception as e:
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32603,
"message": str(e)
}
}
else:
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32601,
"message": f"Unknown tool: {tool_name}"
}
}
else:
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32601,
"message": f"Unknown method: {method}"
}
}
except Exception as e:
return {
"jsonrpc": "2.0",
"id": request.get("id"),
"error": {
"code": -32603,
"message": f"Internal error: {str(e)}"
}
}
async def main():
"""Main entry point."""
try:
while True:
# Read from stdin
line = await asyncio.get_event_loop().run_in_executor(None, sys.stdin.readline)
if not line:
break
line = line.strip()
if not line:
continue
try:
request = json.loads(line)
response = await handle_request(request)
# Write to stdout
sys.stdout.write(json.dumps(response) + "\n")
sys.stdout.flush()
except json.JSONDecodeError:
error_response = {
"jsonrpc": "2.0",
"error": {
"code": -32700,
"message": "Parse error"
}
}
sys.stdout.write(json.dumps(error_response) + "\n")
sys.stdout.flush()
except Exception as e:
error_response = {
"jsonrpc": "2.0",
"error": {
"code": -32603,
"message": f"Internal error: {str(e)}"
}
}
sys.stdout.write(json.dumps(error_response) + "\n")
sys.stdout.flush()
except KeyboardInterrupt:
pass
if __name__ == "__main__":
asyncio.run(main())