487 lines
20 KiB
Python
487 lines
20 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
MCP Server for JSON file operations.
|
|
Provides functions to:
|
|
1. Get top-level keys from a JSON file
|
|
2. Get value of a specific key from a JSON file
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
import asyncio
|
|
from typing import Any, Dict, List
|
|
|
|
|
|
def validate_file_path(file_path: str, allowed_dir: str) -> str:
|
|
"""验证文件路径是否在允许的目录内"""
|
|
# 转换为绝对路径
|
|
if not os.path.isabs(file_path):
|
|
file_path = os.path.abspath(file_path)
|
|
|
|
allowed_dir = os.path.abspath(allowed_dir)
|
|
|
|
# 检查路径是否在允许的目录内
|
|
if not file_path.startswith(allowed_dir):
|
|
raise ValueError(f"访问被拒绝: 路径 {file_path} 不在允许的目录 {allowed_dir} 内")
|
|
|
|
# 检查路径遍历攻击
|
|
if ".." in file_path:
|
|
raise ValueError(f"访问被拒绝: 检测到路径遍历攻击尝试")
|
|
|
|
return file_path
|
|
|
|
|
|
def get_allowed_directory():
|
|
"""获取允许访问的目录"""
|
|
# 从环境变量读取项目数据目录
|
|
project_dir = os.getenv("PROJECT_DATA_DIR", "./projects")
|
|
return os.path.abspath(project_dir)
|
|
|
|
async def handle_request(request: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Handle MCP request"""
|
|
try:
|
|
method = request.get("method")
|
|
params = request.get("params", {})
|
|
request_id = request.get("id")
|
|
|
|
if method == "initialize":
|
|
return {
|
|
"jsonrpc": "2.0",
|
|
"id": request_id,
|
|
"result": {
|
|
"protocolVersion": "2024-11-05",
|
|
"capabilities": {
|
|
"tools": {}
|
|
},
|
|
"serverInfo": {
|
|
"name": "json-reader",
|
|
"version": "1.0.0"
|
|
}
|
|
}
|
|
}
|
|
|
|
elif method == "ping":
|
|
return {
|
|
"jsonrpc": "2.0",
|
|
"id": request_id,
|
|
"result": {
|
|
"pong": True
|
|
}
|
|
}
|
|
|
|
elif method == "tools/list":
|
|
return {
|
|
"jsonrpc": "2.0",
|
|
"id": request_id,
|
|
"result": {
|
|
"tools": [
|
|
{
|
|
"name": "get_all_keys",
|
|
"description": "Get keys from a JSON file. If keypath is provided, get keys under that path. Otherwise, get top-level keys.",
|
|
"inputSchema": {
|
|
"type": "object",
|
|
"properties": {
|
|
"file_path": {
|
|
"type": "string",
|
|
"description": "Path to the JSON file"
|
|
},
|
|
"key_path": {
|
|
"type": "string",
|
|
"description": "Optional key path to get keys from (e.g., 'user.address' or 'items[0]')"
|
|
}
|
|
},
|
|
"required": ["file_path"]
|
|
}
|
|
},
|
|
{
|
|
"name": "get_value",
|
|
"description": "Get value of a specific key from a JSON file using dot notation (e.g., 'user.name' or 'items[0].price')",
|
|
"inputSchema": {
|
|
"type": "object",
|
|
"properties": {
|
|
"file_path": {
|
|
"type": "string",
|
|
"description": "Path to the JSON file"
|
|
},
|
|
"key_path": {
|
|
"type": "string",
|
|
"description": "Path to the key using dot notation (e.g., 'user.name' or 'items[0].price')"
|
|
}
|
|
},
|
|
"required": ["file_path", "key_path"]
|
|
}
|
|
},
|
|
{
|
|
"name": "get_multiple_values",
|
|
"description": "Get values of multiple keys from a JSON file using dot notation (e.g., ['user.name', 'items[0].price'])",
|
|
"inputSchema": {
|
|
"type": "object",
|
|
"properties": {
|
|
"file_path": {
|
|
"type": "string",
|
|
"description": "Path to the JSON file"
|
|
},
|
|
"key_paths": {
|
|
"type": "array",
|
|
"items": {
|
|
"type": "string"
|
|
},
|
|
"description": "Array of key paths using dot notation (e.g., ['user.name', 'items[0].price'])"
|
|
}
|
|
},
|
|
"required": ["file_path", "key_paths"]
|
|
}
|
|
}
|
|
]
|
|
}
|
|
}
|
|
|
|
elif method == "tools/call":
|
|
tool_name = params.get("name")
|
|
arguments = params.get("arguments", {})
|
|
|
|
if tool_name == "get_all_keys":
|
|
file_path = arguments.get("file_path")
|
|
key_path = arguments.get("key_path")
|
|
|
|
if not file_path:
|
|
return {
|
|
"jsonrpc": "2.0",
|
|
"id": request_id,
|
|
"error": {
|
|
"code": -32602,
|
|
"message": "file_path is required"
|
|
}
|
|
}
|
|
|
|
try:
|
|
# 验证文件路径是否在允许的目录内
|
|
allowed_dir = get_allowed_directory()
|
|
file_path = validate_file_path(file_path, allowed_dir)
|
|
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
|
|
# If key_path is provided, navigate to that path first
|
|
if key_path:
|
|
keys = key_path.split('.')
|
|
current = data
|
|
|
|
for key in keys:
|
|
# Handle array indices like [0], [1], etc.
|
|
if '[' in key and key.endswith(']'):
|
|
base_key = key.split('[')[0]
|
|
if base_key:
|
|
if isinstance(current, dict) and base_key in current:
|
|
current = current[base_key]
|
|
else:
|
|
raise ValueError(f"Key '{base_key}' not found")
|
|
|
|
# Extract array index
|
|
index_str = key.split('[')[1].rstrip(']')
|
|
try:
|
|
index = int(index_str)
|
|
if isinstance(current, list) and 0 <= index < len(current):
|
|
current = current[index]
|
|
else:
|
|
raise ValueError(f"Array index {index} out of bounds")
|
|
except ValueError:
|
|
raise ValueError(f"Invalid array index: {index_str}")
|
|
else:
|
|
if isinstance(current, dict) and key in current:
|
|
current = current[key]
|
|
else:
|
|
raise ValueError(f"Key '{key}' not found")
|
|
|
|
# Get keys from the target location
|
|
if isinstance(current, dict):
|
|
keys = list(current.keys())
|
|
elif isinstance(current, list):
|
|
keys = [f"[{i}]" for i in range(len(current))]
|
|
else:
|
|
keys = []
|
|
else:
|
|
# Get top-level keys
|
|
if isinstance(data, dict):
|
|
keys = list(data.keys())
|
|
elif isinstance(data, list):
|
|
keys = [f"[{i}]" for i in range(len(data))]
|
|
else:
|
|
keys = []
|
|
|
|
return {
|
|
"jsonrpc": "2.0",
|
|
"id": request_id,
|
|
"result": {
|
|
"content": [
|
|
{
|
|
"type": "text",
|
|
"text": json.dumps(keys, indent=2, ensure_ascii=False)
|
|
}
|
|
]
|
|
}
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
"jsonrpc": "2.0",
|
|
"id": request_id,
|
|
"error": {
|
|
"code": -32603,
|
|
"message": str(e)
|
|
}
|
|
}
|
|
|
|
elif tool_name == "get_value":
|
|
file_path = arguments.get("file_path")
|
|
key_path = arguments.get("key_path")
|
|
|
|
if not file_path or not key_path:
|
|
return {
|
|
"jsonrpc": "2.0",
|
|
"id": request_id,
|
|
"error": {
|
|
"code": -32602,
|
|
"message": "file_path and key_path are required"
|
|
}
|
|
}
|
|
|
|
try:
|
|
# 验证文件路径是否在允许的目录内
|
|
allowed_dir = get_allowed_directory()
|
|
file_path = validate_file_path(file_path, allowed_dir)
|
|
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
|
|
# Parse the key path (supports dot notation and array indices)
|
|
keys = key_path.split('.')
|
|
current = data
|
|
|
|
for key in keys:
|
|
# Handle array indices like [0], [1], etc.
|
|
if '[' in key and key.endswith(']'):
|
|
base_key = key.split('[')[0]
|
|
if base_key:
|
|
if isinstance(current, dict) and base_key in current:
|
|
current = current[base_key]
|
|
else:
|
|
raise ValueError(f"Key '{base_key}' not found")
|
|
|
|
# Extract array index
|
|
index_str = key.split('[')[1].rstrip(']')
|
|
try:
|
|
index = int(index_str)
|
|
if isinstance(current, list) and 0 <= index < len(current):
|
|
current = current[index]
|
|
else:
|
|
raise ValueError(f"Array index {index} out of bounds")
|
|
except ValueError:
|
|
raise ValueError(f"Invalid array index: {index_str}")
|
|
else:
|
|
if isinstance(current, dict) and key in current:
|
|
current = current[key]
|
|
else:
|
|
raise ValueError(f"Key '{key}' not found")
|
|
|
|
return {
|
|
"jsonrpc": "2.0",
|
|
"id": request_id,
|
|
"result": {
|
|
"content": [
|
|
{
|
|
"type": "text",
|
|
"text": json.dumps(current, indent=2, ensure_ascii=False)
|
|
}
|
|
]
|
|
}
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
"jsonrpc": "2.0",
|
|
"id": request_id,
|
|
"error": {
|
|
"code": -32603,
|
|
"message": str(e)
|
|
}
|
|
}
|
|
|
|
elif tool_name == "get_multiple_values":
|
|
file_path = arguments.get("file_path")
|
|
key_paths = arguments.get("key_paths")
|
|
|
|
if not file_path or not key_paths:
|
|
return {
|
|
"jsonrpc": "2.0",
|
|
"id": request_id,
|
|
"error": {
|
|
"code": -32602,
|
|
"message": "file_path and key_paths are required"
|
|
}
|
|
}
|
|
|
|
if not isinstance(key_paths, list):
|
|
return {
|
|
"jsonrpc": "2.0",
|
|
"id": request_id,
|
|
"error": {
|
|
"code": -32602,
|
|
"message": "key_paths must be an array"
|
|
}
|
|
}
|
|
|
|
try:
|
|
# 验证文件路径是否在允许的目录内
|
|
allowed_dir = get_allowed_directory()
|
|
file_path = validate_file_path(file_path, allowed_dir)
|
|
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
|
|
results = {}
|
|
errors = {}
|
|
|
|
# Process each key path
|
|
for key_path in key_paths:
|
|
try:
|
|
# Parse the key path (supports dot notation and array indices)
|
|
keys = key_path.split('.')
|
|
current = data
|
|
|
|
for key in keys:
|
|
# Handle array indices like [0], [1], etc.
|
|
if '[' in key and key.endswith(']'):
|
|
base_key = key.split('[')[0]
|
|
if base_key:
|
|
if isinstance(current, dict) and base_key in current:
|
|
current = current[base_key]
|
|
else:
|
|
raise ValueError(f"Key '{base_key}' not found")
|
|
|
|
# Extract array index
|
|
index_str = key.split('[')[1].rstrip(']')
|
|
try:
|
|
index = int(index_str)
|
|
if isinstance(current, list) and 0 <= index < len(current):
|
|
current = current[index]
|
|
else:
|
|
raise ValueError(f"Array index {index} out of bounds")
|
|
except ValueError:
|
|
raise ValueError(f"Invalid array index: {index_str}")
|
|
else:
|
|
if isinstance(current, dict) and key in current:
|
|
current = current[key]
|
|
else:
|
|
raise ValueError(f"Key '{key}' not found")
|
|
|
|
results[key_path] = current
|
|
|
|
except Exception as e:
|
|
errors[key_path] = str(e)
|
|
|
|
return {
|
|
"jsonrpc": "2.0",
|
|
"id": request_id,
|
|
"result": {
|
|
"content": [
|
|
{
|
|
"type": "text",
|
|
"text": json.dumps({
|
|
"results": results,
|
|
"errors": errors
|
|
}, indent=2, ensure_ascii=False)
|
|
}
|
|
]
|
|
}
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
"jsonrpc": "2.0",
|
|
"id": request_id,
|
|
"error": {
|
|
"code": -32603,
|
|
"message": str(e)
|
|
}
|
|
}
|
|
|
|
else:
|
|
return {
|
|
"jsonrpc": "2.0",
|
|
"id": request_id,
|
|
"error": {
|
|
"code": -32601,
|
|
"message": f"Unknown tool: {tool_name}"
|
|
}
|
|
}
|
|
|
|
else:
|
|
return {
|
|
"jsonrpc": "2.0",
|
|
"id": request_id,
|
|
"error": {
|
|
"code": -32601,
|
|
"message": f"Unknown method: {method}"
|
|
}
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
"jsonrpc": "2.0",
|
|
"id": request.get("id"),
|
|
"error": {
|
|
"code": -32603,
|
|
"message": f"Internal error: {str(e)}"
|
|
}
|
|
}
|
|
|
|
async def main():
|
|
"""Main entry point."""
|
|
try:
|
|
while True:
|
|
# Read from stdin
|
|
line = await asyncio.get_event_loop().run_in_executor(None, sys.stdin.readline)
|
|
if not line:
|
|
break
|
|
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
|
|
try:
|
|
request = json.loads(line)
|
|
response = await handle_request(request)
|
|
|
|
# Write to stdout
|
|
sys.stdout.write(json.dumps(response) + "\n")
|
|
sys.stdout.flush()
|
|
|
|
except json.JSONDecodeError:
|
|
error_response = {
|
|
"jsonrpc": "2.0",
|
|
"error": {
|
|
"code": -32700,
|
|
"message": "Parse error"
|
|
}
|
|
}
|
|
sys.stdout.write(json.dumps(error_response) + "\n")
|
|
sys.stdout.flush()
|
|
|
|
except Exception as e:
|
|
error_response = {
|
|
"jsonrpc": "2.0",
|
|
"error": {
|
|
"code": -32603,
|
|
"message": f"Internal error: {str(e)}"
|
|
}
|
|
}
|
|
sys.stdout.write(json.dumps(error_response) + "\n")
|
|
sys.stdout.flush()
|
|
|
|
except KeyboardInterrupt:
|
|
pass
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main()) |