This commit is contained in:
朱潮 2025-10-06 19:51:39 +08:00
commit 1602509026
13 changed files with 42047 additions and 0 deletions

BIN
.DS_Store vendored Normal file

Binary file not shown.

Binary file not shown.

195
agent_prompt.txt Normal file
View File

@ -0,0 +1,195 @@
# 智能数据检索助手
## 角色定义
您是基于倒排索引和多层数据架构的智能检索专家,专门处理大规模、多源异构数据的高效查询与分析任务。
## 回复语言限制
**重要:必须使用中文回复所有用户请求和查询结果**
## 核心能力
- **倒排索引检索**:基于预构建索引实现毫秒级字段查询
- **多层数据融合**:整合索引、序列化、文档三层信息
- **智能查询优化**:动态调整查询策略,平衡性能与精度
- **正则表达式精通**:精准模式匹配与复杂条件组合
- **结果聚合分析**:结构化输出与深度洞察挖掘
## 系统架构
### 数据存储层次
```
./data/
├── [数据集文件夹]/
│ ├── schema.json # 倒排索引层
│ ├── serialization.txt # 序列化数据层
│ └── document.txt # 原始文档层
```
### 三层数据模型
#### 1. 索引层 (schema.json)
- **功能**:字段枚举值倒排索引,查询入口点
- **访问方式**`json-reader-get_all_keys({"file_path": "./data/[数据集文件夹]/schema.json", "key_path": "schema"})`
- **数据结构**
```json
{
"schema": {
"字段名": {
"txt_file_name": "document.txt",
"serialization_file_name": "serialization.txt",
"enums": ["枚举值1", "枚举值2", ...],
"description": "字段其他描述"
}
}
}
```
#### 2. 序列化层 (serialization.txt)
- **功能**:结构化产品数据,支持快速正则匹配
- **数据格式**`字段1:值1;字段2:值2;字段3:值3`
- **访问方式**ripgrep工具进行模式匹配
#### 3. 文档层 (document.txt)
- **功能**完整PDF解析文本详细规格与描述
- **访问方式**:基于关键词的深度搜索
- **用途**:补充序列化数据,提供完整上下文
## 查询执行框架
### 阶段0数据集探索
**目标**:识别可用数据集,确定查询目标
**执行步骤**
1. **目录扫描**查看data目录下的所有数据集文件夹
2. **数据集选择**:根据用户需求选择合适的数据集文件夹
### 阶段1智能索引分析
**目标**:构建查询策略,确定最优路径
**执行步骤**
1. **加载索引**读取schema.json获取字段元数据
2. **字段分析**:识别数值字段、文本字段、枚举字段
3. **字段详情分析**:对于相关字段调用`json-reader-get_value({"file_path": "./data/[数据集文件夹]/schema.json", "key_path": "schema.[字段名]"})`查看具体的枚举值和取值范围
4. **策略制定**:基于查询条件选择最优检索路径
5. **范围预估**:评估各条件的数据分布和选择度
### 阶段2精准数据匹配
**目标**:从序列化数据中提取符合条件的记录
**执行步骤**
1. **预检查**`ripgrep-count-matches({"path": "./data/[数据集文件夹]/serialization.txt", "pattern": "匹配模式"})`
2. **智能限流**
- 匹配数 > 1000增加过滤条件重新预检查
- 匹配数 100-1000`ripgrep-search({"maxResults": 30})`
- 匹配数 < 100正常搜索
3. **模式构建**:构建精确的正则表达式模式
- **重要提醒**:尽量避免组装复杂的正则匹配模式,因为字段顺序、格式差异或部分信息缺失都会导致无法直接匹配
- **推荐策略**:使用简单的字段匹配模式,然后通过后处理筛选结果
4. **数据提取**:获取完整的产品记录行
5. **持续搜索策略**
- **关键原则**:即使找到部分匹配数据,也不要立即停止搜索
- **搜索扩展**:当获得初步匹配结果后,继续扩大搜索范围,确保没有遗漏相关数据
- **多轮验证**:使用不同的查询模式和关键词组合进行交叉验证
- **完整性检查**:确认已穷尽所有可能的查询路径后再终止搜索
### 阶段3深度文档检索
**目标**:获取完整的产品详情和上下文信息
**执行步骤**
1. **关键词提取**:从匹配结果中提取产品标识信息
2. **上下文控制**
- 高匹配量(>50)`rg -C 5`
- 中匹配量(10-50)`rg -C 10`
- 低匹配量(<10)`rg -C 20`
3. **详情检索**在document.txt中搜索完整描述
### 阶段4智能结果聚合
**目标**:生成结构化的查询结果报告
**执行步骤**
1. **数据融合**:整合多层检索结果
2. **去重排序**:基于相关性和完整性排序
3. **结构化输出**:生成标准化的结果格式
4. **质量评估**:标注结果可信度和完整度
## 高级查询策略
### 复合条件查询
**模式**多字段AND/OR条件组合
**实现**
```python
# 伪代码示例
conditions = [
"type:笔记本电脑",
"price:[25000-35000]日元",
"memory_gb:16"
]
# 注意避免使用build_complex_regex构建复杂正则
# 推荐使用简单的字段匹配 + 后处理筛选
query_pattern = simple_field_match(conditions[0]) # 先匹配主要条件
```
### 数值范围查询
**策略**
1. **索引分析**:识别数值字段的分布特征
2. **范围划分**:将连续值离散化为区间
3. **精确匹配**使用MCP工具进行数值比较
4. **动态优化**:根据结果集大小调整查询粒度
### 模糊匹配与同义词扩展
**能力**
- **编辑距离匹配**:容忍拼写错误
- **同义词扩展**:基于领域知识库扩展查询词
- **模糊正则**:使用近似匹配模式
- **注意**:即使模糊匹配也要避免过于复杂的正则表达式,优先考虑简单模式匹配
### 工具调用前说明
每次调用工具前需要用自然语言说明调用理由,示例:
```
我现在需要使用`[工具名称]`来[说明本次调用的目的和预期获取的信息]
```
- 使用自然流畅的语言,避免生硬的格式化表达
- 可以适当添加emoji表情增强可读性
- 说明要简洁明了,突出调用目的
### 可用工具
#### JSON 数据读取工具
- **json-reader-get_all_keys**: 获取 JSON 文件中的所有键名或指定路径下的键名
- **json-reader-get_value**: 获取 JSON 文件中指定键路径的单个值
- **json-reader-get_multiple_values**: 🆕 获取 JSON 文件中多个键路径的值(支持批量查询,提高效率)
### 调用序列
1. **目录树查看** → `deep-directory-tree-get_deep_directory_tree`
2. **索引查询** → `json-reader-get_all_keys`
3. **字段详情分析** → `json-reader-get_value` 或 `json-reader-get_multiple_values` (推荐使用多值工具批量获取相关字段的枚举值和范围)
4. **数量预估** → `ripgrep-count-matches`
5. **数据检索** → `ripgrep-search`
6. **详情搜索** → `ripgrep-search` (document.txt)
### 工具使用优化建议
- **批量查询优化**: 当需要分析多个相关字段时,优先使用 `json-reader-get_multiple_values` 一次性获取多个字段信息,减少工具调用次数
- **字段组合分析**: 可以同时查询 `[字段名1, 字段名2, 字段名3]` 来快速了解多个字段的枚举值范围和约束条件
- **查询效率提升**: 使用多值工具可以显著提升字段分析阶段的执行效率
## 质量保证
### 查询准确性
- **结果验证**:交叉验证多层检索结果
- **一致性检查**:确保数据逻辑一致性
- **完整性验证**:检查关键字段完整度
### 查询设计原则
1. **由宽到精**:从宽泛条件逐步精确化
2. **索引优先**:充分利用索引减少数据扫描
3. **批量操作**:合并相似查询减少开销
4. **结果预判**:预估结果规模避免超限
5. **单次查询限制**:≤ 100行数据
6. **全面搜索原则**
- **不满足初步结果**:如果找到部分匹配数据,也要继续探索其他可能的查询路径
- **多角度搜索**:从不同字段、不同关键词组合入手进行搜索
- **渐进式扩展**:逐步放宽查询条件以发现更多相关数据
- **交叉验证**:使用多种方法验证搜索结果的完整性
## 重要说明
1. 查询的设备类型为第一优先级,比如笔记本和台式机。
2. 针对"CPU处理器"和"GPU显卡"的查询,因为命名方式多样性,查询优先级最低。
3. 如果确实无法找到完全匹配的数据,根据用户要求,可接受性能更高(更低)的CPU处理器和GPU显卡是作为代替。
---
**执行提醒**:始终使用完整的文件路径参数调用工具,确保数据访问的准确性和安全性。在查询执行过程中,动态调整策略以适应不同的数据特征和查询需求。

BIN
data/.DS_Store vendored Normal file

Binary file not shown.

Binary file not shown.

File diff suppressed because one or more lines are too long

File diff suppressed because it is too large Load Diff

File diff suppressed because one or more lines are too long

70
fastapi_app.py Normal file
View File

@ -0,0 +1,70 @@
from typing import Optional
import uvicorn
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from gbase_agent import init_agent_service
app = FastAPI(title="Database Assistant API", version="1.0.0")
# Initialize agent globally at startup
bot = init_agent_service()
class QueryRequest(BaseModel):
question: str
file_url: Optional[str] = None
class QueryResponse(BaseModel):
answer: str
@app.post("/query", response_model=QueryResponse)
async def query_database(request: QueryRequest):
"""
Process a database query using the assistant agent.
Args:
request: QueryRequest containing the query and optional file URL
Returns:
QueryResponse containing the assistant's response
"""
try:
messages = []
if request.file_url:
messages.append(
{
"role": "user",
"content": [{"text":"使用sqlite数据库用日语回答下面问题"+request.question}, {"file": request.file_url}],
}
)
else:
messages.append({"role": "user", "content": request.question})
responses = []
for response in bot.run(messages):
responses.append(response)
if responses:
final_response = responses[-1][-1]
return QueryResponse(answer=final_response["content"])
else:
raise HTTPException(status_code=500, detail="No response from agent")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
@app.get("/")
async def root():
"""Health check endpoint"""
return {"message": "Database Assistant API is running"}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)

342
gbase_agent.py Normal file
View File

@ -0,0 +1,342 @@
# Copyright 2023 The Qwen team, Alibaba Group. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""A sqlite database assistant implemented by assistant"""
import argparse
import asyncio
import copy
import json
import os
from typing import Dict, List, Optional, Union
from qwen_agent.agents import Assistant
from qwen_agent.gui import WebUI
from qwen_agent.llm.oai import TextChatAtOAI
from qwen_agent.llm.schema import ASSISTANT, FUNCTION, Message
from qwen_agent.utils.output_beautify import typewriter_print
ROOT_RESOURCE = os.path.join(os.path.dirname(__file__), "resource")
class GPT4OChat(TextChatAtOAI):
"""自定义 GPT-4o 聊天类,修复 tool_call_id 问题"""
def convert_messages_to_dicts(self, messages: List[Message]) -> List[dict]:
# 使用父类方法进行基础转换
messages = super().convert_messages_to_dicts(messages)
# 应用修复后的消息转换
messages = self._fixed_conv_qwen_agent_messages_to_oai(messages)
return messages
@staticmethod
def _fixed_conv_qwen_agent_messages_to_oai(messages: List[Union[Message, Dict]]):
"""修复后的消息转换方法,确保 tool 消息包含 tool_call_id 字段"""
new_messages = []
i = 0
while i < len(messages):
msg = messages[i]
if msg['role'] == ASSISTANT:
# 处理 assistant 消息
assistant_msg = {'role': 'assistant'}
# 设置 content
content = msg.get('content', '')
if isinstance(content, (list, dict)):
assistant_msg['content'] = json.dumps(content, ensure_ascii=False)
elif content is None:
assistant_msg['content'] = ''
else:
assistant_msg['content'] = content
# 设置 reasoning_content
if msg.get('reasoning_content'):
assistant_msg['reasoning_content'] = msg['reasoning_content']
# 检查是否需要构造 tool_calls
has_tool_call = False
tool_calls = []
# 情况1当前消息有 function_call
if msg.get('function_call'):
has_tool_call = True
tool_calls.append({
'id': msg.get('extra', {}).get('function_id', '1'),
'type': 'function',
'function': {
'name': msg['function_call']['name'],
'arguments': msg['function_call']['arguments']
}
})
# 注意:不再为孤立的 tool 消息构造虚假的 tool_call
if has_tool_call:
assistant_msg['tool_calls'] = tool_calls
new_messages.append(assistant_msg)
# 检查后续是否有对应的 tool 消息
if i + 1 < len(messages) and messages[i + 1]['role'] == 'tool':
tool_msg = copy.deepcopy(messages[i + 1])
# 确保 tool_call_id 匹配
tool_msg['tool_call_id'] = tool_calls[0]['id']
# 移除多余字段
for field in ['id', 'extra', 'function_call']:
if field in tool_msg:
del tool_msg[field]
# 确保 content 有效且为字符串
content = tool_msg.get('content', '')
if isinstance(content, (list, dict)):
tool_msg['content'] = json.dumps(content, ensure_ascii=False)
elif content is None:
tool_msg['content'] = ''
new_messages.append(tool_msg)
i += 2
else:
i += 1
else:
new_messages.append(assistant_msg)
i += 1
elif msg['role'] == 'tool':
# 孤立的 tool 消息,转换为 assistant + user 消息序列
# 首先添加一个包含工具结果的 assistant 消息
assistant_result = {'role': 'assistant'}
content = msg.get('content', '')
if isinstance(content, (list, dict)):
content = json.dumps(content, ensure_ascii=False)
assistant_result['content'] = f"工具查询结果: {content}"
new_messages.append(assistant_result)
# 然后添加一个 user 消息来继续对话
new_messages.append({'role': 'user', 'content': '请继续分析以上结果'})
i += 1
else:
# 处理其他角色消息
new_msg = copy.deepcopy(msg)
# 确保 content 有效且为字符串
content = new_msg.get('content', '')
if isinstance(content, (list, dict)):
new_msg['content'] = json.dumps(content, ensure_ascii=False)
elif content is None:
new_msg['content'] = ''
new_messages.append(new_msg)
i += 1
return new_messages
def read_mcp_settings():
with open("./mcp/mcp_settings.json", "r") as f:
mcp_settings_json = json.load(f)
return mcp_settings_json
def read_system_prompt():
with open("./agent_prompt.txt", "r", encoding="utf-8") as f:
return f.read().strip()
def init_agent_service():
llm_cfg = {
"llama-33": {
"model": "gbase-llama-33",
"model_server": "http://llmapi:9009/v1",
"api_key": "any",
},
"gpt-oss-120b": {
"model": "openai/gpt-oss-120b",
"model_server": "https://openrouter.ai/api/v1", # base_url, also known as api_base
"api_key": "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",
"generate_cfg": {
"use_raw_api": True, # GPT-OSS true ,Qwen false
"fncall_prompt_type": "nous", # 使用 nous 风格的函数调用提示
}
},
"claude-3.7": {
"model": "claude-3-7-sonnet-20250219",
"model_server": "https://one.felo.me/v1",
"api_key": "sk-9gtHriq7C3jAvepq5dA0092a5cC24a54Aa83FbC99cB88b21-2",
"generate_cfg": {
"use_raw_api": True, # GPT-OSS true ,Qwen false
},
},
"gpt-4o": {
"model": "gpt-4o",
"model_server": "https://one-dev.felo.me/v1",
"api_key": "sk-hsKClH0Z695EkK5fDdB2Ec2fE13f4fC1B627BdBb8e554b5b-4",
"generate_cfg": {
"use_raw_api": True, # 启用 raw_api 但使用自定义类修复 tool_call_id 问题
"fncall_prompt_type": "nous", # 使用 nous 风格的函数调用提示
},
},
"Gpt-4o-back": {
"model_type": "oai", # 使用 oai 类型以便使用自定义类
"model": "gpt-4o",
"model_server": "https://one-dev.felo.me/v1",
"api_key": "sk-hsKClH0Z695EkK5fDdB2Ec2fE13f4fC1B627BdBb8e554b5b-4",
"generate_cfg": {
"use_raw_api": True, # 启用 raw_api 但使用自定义类修复 tool_call_id 问题
"fncall_prompt_type": "nous", # 使用 nous 风格的函数调用提示
},
# 使用自定义的 GPT4OChat 类
"llm_class": GPT4OChat,
},
"glm-45": {
"model_server": "https://open.bigmodel.cn/api/paas/v4",
"api_key": "0c9cbaca9d2bbf864990f1e1decdf340.dXRMsZCHTUbPQ0rm",
"model": "glm-4.5",
"generate_cfg": {
"use_raw_api": True, # GPT-OSS true ,Qwen false
},
},
"qwen3-next": {
"model": "qwen/qwen3-next-80b-a3b-instruct",
"model_server": "https://openrouter.ai/api/v1", # base_url, also known as api_base
"api_key": "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",
},
"deepresearch": {
"model": "alibaba/tongyi-deepresearch-30b-a3b",
"model_server": "https://openrouter.ai/api/v1", # base_url, also known as api_base
"api_key": "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",
},
"qwen3-coder":{
"model": "Qwen/Qwen3-Coder-30B-A3B-Instruct",
"model_server": "https://api-inference.modelscope.cn/v1", # base_url, also known as api_base
"api_key": "ms-92027446-2787-4fd6-af01-f002459ec556",
},
"openrouter-gpt4o":{
"model": "openai/gpt-4o",
"model_server": "https://openrouter.ai/api/v1", # base_url, also known as api_base
"api_key": "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",
"generate_cfg": {
"use_raw_api": True, # GPT-OSS true ,Qwen false
"fncall_prompt_type": "nous", # 使用 nous 风格的函数调用提示
},
}
}
system = read_system_prompt()
# 暂时禁用 MCP 工具以测试 GPT-4o
tools = read_mcp_settings()
# 使用自定义的 GPT-4o 配置
llm_instance = llm_cfg["qwen3-next"]
if "llm_class" in llm_instance:
llm_instance = llm_instance.get("llm_class", TextChatAtOAI)(llm_instance)
bot = Assistant(
llm=llm_instance, # 使用自定义的 GPT-4o 实例
name="数据库助手",
description="数据库查询",
system_message=system,
function_list=tools,
)
return bot
def test(query="数据库里有几张表"):
# Define the agent
bot = init_agent_service()
# Chat
messages = []
messages.append({"role": "user", "content": query})
responses = []
for response in bot.run(messages):
responses.append(response)
# 只输出最终结果,不显示中间过程
if responses:
final_response = responses[-1][-1] # 取最后一个响应作为最终结果
print("Answer:", final_response["content"])
def app_tui():
# Define the agent
bot = init_agent_service()
# Chat
messages = []
while True:
# Query example: 数据库里有几张表
query = input("user question: ")
# File example: resource/poem.pdf
file = input("file url (press enter if no file): ").strip()
if not query:
print("user question cannot be empty")
continue
if not file:
messages.append({"role": "user", "content": query})
else:
messages.append(
{"role": "user", "content": [{"text": query}, {"file": file}]}
)
response = []
for response in bot.run(messages):
print("bot response:", response)
messages.extend(response)
def app_gui():
# Define the agent
bot = init_agent_service()
chatbot_config = {
"prompt.suggestions": [
"数据库里有几张表",
"创建一个学生表包括学生的姓名、年龄",
"增加一个学生名字叫韩梅梅今年6岁",
]
}
WebUI(
bot,
chatbot_config=chatbot_config,
).run()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="数据库助手")
parser.add_argument(
"--query", type=str, default="数据库里有几张表", help="用户问题"
)
parser.add_argument(
"--mode",
type=str,
choices=["test", "tui", "gui"],
default="test",
help="运行模式",
)
args = parser.parse_args()
if args.mode == "test":
test(args.query)
elif args.mode == "tui":
app_tui()
elif args.mode == "gui":
app_gui()

BIN
mcp/.DS_Store vendored Normal file

Binary file not shown.

461
mcp/json_reader_server.py Normal file
View File

@ -0,0 +1,461 @@
#!/usr/bin/env python3
"""
MCP Server for JSON file operations.
Provides functions to:
1. Get top-level keys from a JSON file
2. Get value of a specific key from a JSON file
"""
import json
import os
import sys
import asyncio
from typing import Any, Dict, List
async def handle_request(request: Dict[str, Any]) -> Dict[str, Any]:
"""Handle MCP request"""
try:
method = request.get("method")
params = request.get("params", {})
request_id = request.get("id")
if method == "initialize":
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {
"tools": {}
},
"serverInfo": {
"name": "json-reader",
"version": "1.0.0"
}
}
}
elif method == "ping":
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"pong": True
}
}
elif method == "tools/list":
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"tools": [
{
"name": "get_all_keys",
"description": "Get keys from a JSON file. If keypath is provided, get keys under that path. Otherwise, get top-level keys.",
"inputSchema": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "Path to the JSON file"
},
"key_path": {
"type": "string",
"description": "Optional key path to get keys from (e.g., 'user.address' or 'items[0]')"
}
},
"required": ["file_path"]
}
},
{
"name": "get_value",
"description": "Get value of a specific key from a JSON file using dot notation (e.g., 'user.name' or 'items[0].price')",
"inputSchema": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "Path to the JSON file"
},
"key_path": {
"type": "string",
"description": "Path to the key using dot notation (e.g., 'user.name' or 'items[0].price')"
}
},
"required": ["file_path", "key_path"]
}
},
{
"name": "get_multiple_values",
"description": "Get values of multiple keys from a JSON file using dot notation (e.g., ['user.name', 'items[0].price'])",
"inputSchema": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "Path to the JSON file"
},
"key_paths": {
"type": "array",
"items": {
"type": "string"
},
"description": "Array of key paths using dot notation (e.g., ['user.name', 'items[0].price'])"
}
},
"required": ["file_path", "key_paths"]
}
}
]
}
}
elif method == "tools/call":
tool_name = params.get("name")
arguments = params.get("arguments", {})
if tool_name == "get_all_keys":
file_path = arguments.get("file_path")
key_path = arguments.get("key_path")
if not file_path:
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32602,
"message": "file_path is required"
}
}
try:
# Convert relative path to absolute path
if not os.path.isabs(file_path):
file_path = os.path.abspath(file_path)
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# If key_path is provided, navigate to that path first
if key_path:
keys = key_path.split('.')
current = data
for key in keys:
# Handle array indices like [0], [1], etc.
if '[' in key and key.endswith(']'):
base_key = key.split('[')[0]
if base_key:
if isinstance(current, dict) and base_key in current:
current = current[base_key]
else:
raise ValueError(f"Key '{base_key}' not found")
# Extract array index
index_str = key.split('[')[1].rstrip(']')
try:
index = int(index_str)
if isinstance(current, list) and 0 <= index < len(current):
current = current[index]
else:
raise ValueError(f"Array index {index} out of bounds")
except ValueError:
raise ValueError(f"Invalid array index: {index_str}")
else:
if isinstance(current, dict) and key in current:
current = current[key]
else:
raise ValueError(f"Key '{key}' not found")
# Get keys from the target location
if isinstance(current, dict):
keys = list(current.keys())
elif isinstance(current, list):
keys = [f"[{i}]" for i in range(len(current))]
else:
keys = []
else:
# Get top-level keys
if isinstance(data, dict):
keys = list(data.keys())
elif isinstance(data, list):
keys = [f"[{i}]" for i in range(len(data))]
else:
keys = []
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"content": [
{
"type": "text",
"text": json.dumps(keys, indent=2, ensure_ascii=False)
}
]
}
}
except Exception as e:
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32603,
"message": str(e)
}
}
elif tool_name == "get_value":
file_path = arguments.get("file_path")
key_path = arguments.get("key_path")
if not file_path or not key_path:
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32602,
"message": "file_path and key_path are required"
}
}
try:
# Convert relative path to absolute path
if not os.path.isabs(file_path):
file_path = os.path.abspath(file_path)
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# Parse the key path (supports dot notation and array indices)
keys = key_path.split('.')
current = data
for key in keys:
# Handle array indices like [0], [1], etc.
if '[' in key and key.endswith(']'):
base_key = key.split('[')[0]
if base_key:
if isinstance(current, dict) and base_key in current:
current = current[base_key]
else:
raise ValueError(f"Key '{base_key}' not found")
# Extract array index
index_str = key.split('[')[1].rstrip(']')
try:
index = int(index_str)
if isinstance(current, list) and 0 <= index < len(current):
current = current[index]
else:
raise ValueError(f"Array index {index} out of bounds")
except ValueError:
raise ValueError(f"Invalid array index: {index_str}")
else:
if isinstance(current, dict) and key in current:
current = current[key]
else:
raise ValueError(f"Key '{key}' not found")
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"content": [
{
"type": "text",
"text": json.dumps(current, indent=2, ensure_ascii=False)
}
]
}
}
except Exception as e:
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32603,
"message": str(e)
}
}
elif tool_name == "get_multiple_values":
file_path = arguments.get("file_path")
key_paths = arguments.get("key_paths")
if not file_path or not key_paths:
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32602,
"message": "file_path and key_paths are required"
}
}
if not isinstance(key_paths, list):
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32602,
"message": "key_paths must be an array"
}
}
try:
# Convert relative path to absolute path
if not os.path.isabs(file_path):
file_path = os.path.abspath(file_path)
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
results = {}
errors = {}
# Process each key path
for key_path in key_paths:
try:
# Parse the key path (supports dot notation and array indices)
keys = key_path.split('.')
current = data
for key in keys:
# Handle array indices like [0], [1], etc.
if '[' in key and key.endswith(']'):
base_key = key.split('[')[0]
if base_key:
if isinstance(current, dict) and base_key in current:
current = current[base_key]
else:
raise ValueError(f"Key '{base_key}' not found")
# Extract array index
index_str = key.split('[')[1].rstrip(']')
try:
index = int(index_str)
if isinstance(current, list) and 0 <= index < len(current):
current = current[index]
else:
raise ValueError(f"Array index {index} out of bounds")
except ValueError:
raise ValueError(f"Invalid array index: {index_str}")
else:
if isinstance(current, dict) and key in current:
current = current[key]
else:
raise ValueError(f"Key '{key}' not found")
results[key_path] = current
except Exception as e:
errors[key_path] = str(e)
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"content": [
{
"type": "text",
"text": json.dumps({
"results": results,
"errors": errors
}, indent=2, ensure_ascii=False)
}
]
}
}
except Exception as e:
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32603,
"message": str(e)
}
}
else:
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32601,
"message": f"Unknown tool: {tool_name}"
}
}
else:
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32601,
"message": f"Unknown method: {method}"
}
}
except Exception as e:
return {
"jsonrpc": "2.0",
"id": request.get("id"),
"error": {
"code": -32603,
"message": f"Internal error: {str(e)}"
}
}
async def main():
"""Main entry point."""
try:
while True:
# Read from stdin
line = await asyncio.get_event_loop().run_in_executor(None, sys.stdin.readline)
if not line:
break
line = line.strip()
if not line:
continue
try:
request = json.loads(line)
response = await handle_request(request)
# Write to stdout
sys.stdout.write(json.dumps(response) + "\n")
sys.stdout.flush()
except json.JSONDecodeError:
error_response = {
"jsonrpc": "2.0",
"error": {
"code": -32700,
"message": "Parse error"
}
}
sys.stdout.write(json.dumps(error_response) + "\n")
sys.stdout.flush()
except Exception as e:
error_response = {
"jsonrpc": "2.0",
"error": {
"code": -32603,
"message": f"Internal error: {str(e)}"
}
}
sys.stdout.write(json.dumps(error_response) + "\n")
sys.stdout.flush()
except KeyboardInterrupt:
pass
if __name__ == "__main__":
asyncio.run(main())

33
mcp/mcp_settings.json Normal file
View File

@ -0,0 +1,33 @@
[
{
"mcpServers": {
"deep-directory-tree": {
"command": "npx",
"args": [
"-y",
"@andredezzy/deep-directory-tree-mcp"
]
},
"mcp-server-code-runner": {
"command": "npx",
"args": [
"-y",
"mcp-server-code-runner@latest"
]
},
"ripgrep": {
"command": "npx",
"args": [
"-y",
"mcp-ripgrep@latest"
]
},
"json-reader": {
"command": "python",
"args": [
"./mcp/json_reader_server.py"
]
}
}
}
]