catalog-agent/mcp/json_reader_server.py
2025-10-07 12:25:41 +08:00

487 lines
20 KiB
Python

#!/usr/bin/env python3
"""
MCP Server for JSON file operations.
Provides functions to:
1. Get top-level keys from a JSON file
2. Get value of a specific key from a JSON file
"""
import json
import os
import sys
import asyncio
from typing import Any, Dict, List
def validate_file_path(file_path: str, allowed_dir: str) -> str:
"""验证文件路径是否在允许的目录内"""
# 转换为绝对路径
if not os.path.isabs(file_path):
file_path = os.path.abspath(file_path)
allowed_dir = os.path.abspath(allowed_dir)
# 检查路径是否在允许的目录内
if not file_path.startswith(allowed_dir):
raise ValueError(f"访问被拒绝: 路径 {file_path} 不在允许的目录 {allowed_dir}")
# 检查路径遍历攻击
if ".." in file_path:
raise ValueError(f"访问被拒绝: 检测到路径遍历攻击尝试")
return file_path
def get_allowed_directory():
"""获取允许访问的目录"""
# 从环境变量读取项目数据目录
project_dir = os.getenv("PROJECT_DATA_DIR", "./projects")
return os.path.abspath(project_dir)
async def handle_request(request: Dict[str, Any]) -> Dict[str, Any]:
"""Handle MCP request"""
try:
method = request.get("method")
params = request.get("params", {})
request_id = request.get("id")
if method == "initialize":
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {
"tools": {}
},
"serverInfo": {
"name": "json-reader",
"version": "1.0.0"
}
}
}
elif method == "ping":
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"pong": True
}
}
elif method == "tools/list":
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"tools": [
{
"name": "get_all_keys",
"description": "Get keys from a JSON file. If keypath is provided, get keys under that path. Otherwise, get top-level keys.",
"inputSchema": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "Path to the JSON file"
},
"key_path": {
"type": "string",
"description": "Optional key path to get keys from (e.g., 'user.address' or 'items[0]')"
}
},
"required": ["file_path"]
}
},
{
"name": "get_value",
"description": "Get value of a specific key from a JSON file using dot notation (e.g., 'user.name' or 'items[0].price')",
"inputSchema": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "Path to the JSON file"
},
"key_path": {
"type": "string",
"description": "Path to the key using dot notation (e.g., 'user.name' or 'items[0].price')"
}
},
"required": ["file_path", "key_path"]
}
},
{
"name": "get_multiple_values",
"description": "Get values of multiple keys from a JSON file using dot notation (e.g., ['user.name', 'items[0].price'])",
"inputSchema": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "Path to the JSON file"
},
"key_paths": {
"type": "array",
"items": {
"type": "string"
},
"description": "Array of key paths using dot notation (e.g., ['user.name', 'items[0].price'])"
}
},
"required": ["file_path", "key_paths"]
}
}
]
}
}
elif method == "tools/call":
tool_name = params.get("name")
arguments = params.get("arguments", {})
if tool_name == "get_all_keys":
file_path = arguments.get("file_path")
key_path = arguments.get("key_path")
if not file_path:
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32602,
"message": "file_path is required"
}
}
try:
# 验证文件路径是否在允许的目录内
allowed_dir = get_allowed_directory()
file_path = validate_file_path(file_path, allowed_dir)
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# If key_path is provided, navigate to that path first
if key_path:
keys = key_path.split('.')
current = data
for key in keys:
# Handle array indices like [0], [1], etc.
if '[' in key and key.endswith(']'):
base_key = key.split('[')[0]
if base_key:
if isinstance(current, dict) and base_key in current:
current = current[base_key]
else:
raise ValueError(f"Key '{base_key}' not found")
# Extract array index
index_str = key.split('[')[1].rstrip(']')
try:
index = int(index_str)
if isinstance(current, list) and 0 <= index < len(current):
current = current[index]
else:
raise ValueError(f"Array index {index} out of bounds")
except ValueError:
raise ValueError(f"Invalid array index: {index_str}")
else:
if isinstance(current, dict) and key in current:
current = current[key]
else:
raise ValueError(f"Key '{key}' not found")
# Get keys from the target location
if isinstance(current, dict):
keys = list(current.keys())
elif isinstance(current, list):
keys = [f"[{i}]" for i in range(len(current))]
else:
keys = []
else:
# Get top-level keys
if isinstance(data, dict):
keys = list(data.keys())
elif isinstance(data, list):
keys = [f"[{i}]" for i in range(len(data))]
else:
keys = []
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"content": [
{
"type": "text",
"text": json.dumps(keys, indent=2, ensure_ascii=False)
}
]
}
}
except Exception as e:
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32603,
"message": str(e)
}
}
elif tool_name == "get_value":
file_path = arguments.get("file_path")
key_path = arguments.get("key_path")
if not file_path or not key_path:
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32602,
"message": "file_path and key_path are required"
}
}
try:
# 验证文件路径是否在允许的目录内
allowed_dir = get_allowed_directory()
file_path = validate_file_path(file_path, allowed_dir)
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# Parse the key path (supports dot notation and array indices)
keys = key_path.split('.')
current = data
for key in keys:
# Handle array indices like [0], [1], etc.
if '[' in key and key.endswith(']'):
base_key = key.split('[')[0]
if base_key:
if isinstance(current, dict) and base_key in current:
current = current[base_key]
else:
raise ValueError(f"Key '{base_key}' not found")
# Extract array index
index_str = key.split('[')[1].rstrip(']')
try:
index = int(index_str)
if isinstance(current, list) and 0 <= index < len(current):
current = current[index]
else:
raise ValueError(f"Array index {index} out of bounds")
except ValueError:
raise ValueError(f"Invalid array index: {index_str}")
else:
if isinstance(current, dict) and key in current:
current = current[key]
else:
raise ValueError(f"Key '{key}' not found")
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"content": [
{
"type": "text",
"text": json.dumps(current, indent=2, ensure_ascii=False)
}
]
}
}
except Exception as e:
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32603,
"message": str(e)
}
}
elif tool_name == "get_multiple_values":
file_path = arguments.get("file_path")
key_paths = arguments.get("key_paths")
if not file_path or not key_paths:
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32602,
"message": "file_path and key_paths are required"
}
}
if not isinstance(key_paths, list):
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32602,
"message": "key_paths must be an array"
}
}
try:
# 验证文件路径是否在允许的目录内
allowed_dir = get_allowed_directory()
file_path = validate_file_path(file_path, allowed_dir)
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
results = {}
errors = {}
# Process each key path
for key_path in key_paths:
try:
# Parse the key path (supports dot notation and array indices)
keys = key_path.split('.')
current = data
for key in keys:
# Handle array indices like [0], [1], etc.
if '[' in key and key.endswith(']'):
base_key = key.split('[')[0]
if base_key:
if isinstance(current, dict) and base_key in current:
current = current[base_key]
else:
raise ValueError(f"Key '{base_key}' not found")
# Extract array index
index_str = key.split('[')[1].rstrip(']')
try:
index = int(index_str)
if isinstance(current, list) and 0 <= index < len(current):
current = current[index]
else:
raise ValueError(f"Array index {index} out of bounds")
except ValueError:
raise ValueError(f"Invalid array index: {index_str}")
else:
if isinstance(current, dict) and key in current:
current = current[key]
else:
raise ValueError(f"Key '{key}' not found")
results[key_path] = current
except Exception as e:
errors[key_path] = str(e)
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"content": [
{
"type": "text",
"text": json.dumps({
"results": results,
"errors": errors
}, indent=2, ensure_ascii=False)
}
]
}
}
except Exception as e:
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32603,
"message": str(e)
}
}
else:
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32601,
"message": f"Unknown tool: {tool_name}"
}
}
else:
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32601,
"message": f"Unknown method: {method}"
}
}
except Exception as e:
return {
"jsonrpc": "2.0",
"id": request.get("id"),
"error": {
"code": -32603,
"message": f"Internal error: {str(e)}"
}
}
async def main():
"""Main entry point."""
try:
while True:
# Read from stdin
line = await asyncio.get_event_loop().run_in_executor(None, sys.stdin.readline)
if not line:
break
line = line.strip()
if not line:
continue
try:
request = json.loads(line)
response = await handle_request(request)
# Write to stdout
sys.stdout.write(json.dumps(response) + "\n")
sys.stdout.flush()
except json.JSONDecodeError:
error_response = {
"jsonrpc": "2.0",
"error": {
"code": -32700,
"message": "Parse error"
}
}
sys.stdout.write(json.dumps(error_response) + "\n")
sys.stdout.flush()
except Exception as e:
error_response = {
"jsonrpc": "2.0",
"error": {
"code": -32603,
"message": f"Internal error: {str(e)}"
}
}
sys.stdout.write(json.dumps(error_response) + "\n")
sys.stdout.flush()
except KeyboardInterrupt:
pass
if __name__ == "__main__":
asyncio.run(main())