add rag_retrieve autoload

This commit is contained in:
朱潮 2026-04-16 19:38:13 +08:00
parent 53fb98e44e
commit e1bf685314
10 changed files with 525 additions and 77 deletions

View File

@ -1,14 +1,5 @@
[
{
"mcpServers": {
"rag_retrieve": {
"transport": "stdio",
"command": "python",
"args": [
"./mcp/rag_retrieve_server.py",
"{bot_id}"
]
}
}
"mcpServers": {}
}
]

View File

@ -69,60 +69,6 @@ When creating scripts in `executable_code/`, follow these organization rules:
- Temporary script (when needed): `{agent_dir_path}/executable_code/tmp/test.py`
- Downloaded file: `{agent_dir_path}/download/report.pdf`
# Retrieval Policy
### 1. Retrieval Order and Tool Selection
- Follow this section for source choice, tool choice, query rewrite, `top_k`, fallback, result handling, and citations.
- Use this default retrieval order and execute it sequentially: skill-enabled knowledge retrieval tools > `rag_retrieve` / `table_rag_retrieve` > local filesystem retrieval.
- Do NOT answer from model knowledge first.
- Do NOT skip directly to local filesystem retrieval when an earlier retrieval source may answer the question.
- When a suitable skill-enabled knowledge retrieval tool is available, use it first.
- If no suitable skill-enabled retrieval tool is available, or if its result is insufficient, continue with `rag_retrieve` or `table_rag_retrieve`.
- Use `table_rag_retrieve` first for values, prices, quantities, inventory, specifications, rankings, comparisons, summaries, extraction, lists, tables, name lookup, historical coverage, mixed questions, and unclear cases.
- Use `rag_retrieve` first only for clearly pure concept, definition, workflow, policy, or explanation questions without structured data needs.
- After each retrieval step, evaluate sufficiency before moving to the next source. Do NOT run these retrieval sources in parallel.
### 2. Query Preparation
- Do NOT pass the raw user question unless it already works well for retrieval.
- Rewrite for recall: extract entity, time scope, attributes, and intent.
- Add useful variants: synonyms, aliases, abbreviations, related titles, historical names, and category terms.
- Expand list-style, extraction, overview, historical, roster, timeline, and archive queries more aggressively.
- Preserve meaning. Do NOT introduce unrelated topics.
### 3. Retrieval Breadth (`top_k`)
- Apply `top_k` only to `rag_retrieve`. Use the smallest sufficient value, then expand only if coverage is insufficient.
- Use `30` for simple fact lookup.
- Use `50` for moderate synthesis, comparison, summarization, or disambiguation.
- Use `100` for broad recall, such as comprehensive analysis, scattered knowledge, multiple entities or periods, or list / catalog / timeline / roster / overview requests.
- Raise `top_k` when keyword branches are many or results are too few, repetitive, incomplete, sparse, or too narrow.
- Use this expansion order: `30 -> 50 -> 100`. If unsure, use `100`.
### 4. Result Evaluation
- Treat results as insufficient if they are empty, start with `Error:`, say `no excel files found`, are off-topic, miss the core entity or scope, or provide no usable evidence.
- Also treat results as insufficient when they cover only part of the request, or when full-list, historical, comparison, or mixed data + explanation requests return only partial or truncated coverage.
### 5. Fallback and Sequential Retry
- If the first retrieval result is insufficient, call the next retrieval source in the default order before replying.
- If the first RAG tool is insufficient, call the other RAG tool next before moving to local filesystem retrieval.
- If `table_rag_retrieve` is insufficient or empty, continue with `rag_retrieve`.
- If `rag_retrieve` is insufficient or empty, continue with `table_rag_retrieve`.
- If both `rag_retrieve` and `table_rag_retrieve` are insufficient, continue with local filesystem retrieval.
- Say no relevant information was found only after all applicable skill-enabled retrieval tools, both `rag_retrieve` and `table_rag_retrieve`, and local filesystem retrieval have been tried and still do not provide enough evidence.
- Do NOT reply that no relevant information was found before the final local filesystem fallback has also been tried.
### 6. Table RAG Result Handling
- Follow all `[INSTRUCTION]` and `[EXTRA_INSTRUCTION]` content in `table_rag_retrieve` results.
- If results are truncated, explicitly tell the user total matches (`N+M`), displayed count (`N`), and omitted count (`M`).
- Cite data sources using filenames from `file_ref_table`.
### 7. Citation Requirements for Retrieved Knowledge
- When using knowledge from `rag_retrieve` or `table_rag_retrieve`, you MUST generate `<CITATION ... />` tags.
- Follow the citation format returned by each tool.
- Place citations immediately after the paragraph or bullet list that uses the knowledge.
- Do NOT collect citations at the end.
- Use 1-2 citations per paragraph or bullet list when possible.
- If learned knowledge is used, include at least 1 `<CITATION ... />`.
# System Information
<env>
Working directory: {agent_dir_path}

View File

@ -0,0 +1,22 @@
{
"name": "rag-retrieve",
"description": "rag-retrieve and table-rag-retrieve",
"hooks": {
"PrePrompt": [
{
"type": "command",
"command": "python hooks/pre_prompt.py"
}
]
},
"mcpServers": {
"rag_retrieve": {
"transport": "stdio",
"command": "python",
"args": [
"./skills_autoload/rag-retrieve/rag_retrieve_server.py",
"{bot_id}"
]
}
}
}

View File

@ -0,0 +1,153 @@
# User Context Loader
用户上下文加载器示例 Skill演示 Claude Plugins 模式的 hooks 机制。
## 功能说明
本 Skill 演示了三种 Hook 类型:
### PrePrompt Hook
在 system_prompt 加载时执行,动态注入用户上下文信息。
- 文件: `hooks/pre_prompt.py`
- 用途: 查询用户信息、偏好设置、历史记录等,注入到 prompt 中
### PostAgent Hook
在 agent 执行完成后执行,用于后处理。
- 文件: `hooks/post_agent.py`
- 用途: 记录分析数据、触发异步任务、发送通知等
### PreSave Hook
在消息保存前执行,用于内容处理。
- 文件: `hooks/pre_save.py`
- 用途: 内容过滤、敏感信息脱敏、格式转换等
## 目录结构
```
user-context-loader/
├── README.md # Skill 说明文档
├── .claude-plugin/
│ └── plugin.json # Hook 和 MCP 配置文件
└── hooks/
├── pre_prompt.py # PrePrompt hook 脚本
├── post_agent.py # PostAgent hook 脚本
└── pre_save.py # PreSave hook 脚本
```
## plugin.json 格式
```json
{
"name": "user-context-loader",
"description": "用户上下文加载器示例 Skill",
"hooks": {
"PrePrompt": [
{
"type": "command",
"command": "python hooks/pre_prompt.py"
}
],
"PostAgent": [
{
"type": "command",
"command": "python hooks/post_agent.py"
}
],
"PreSave": [
{
"type": "command",
"command": "python hooks/pre_save.py"
}
]
},
"mcpServers": {
"server-name": {
"command": "node",
"args": ["path/to/server.js"],
"env": {
"API_KEY": "${API_KEY}"
}
}
}
}
```
## Hook 脚本格式
Hook 脚本通过子进程执行,通过环境变量接收参数,通过 stdout 返回结果。
### 可用环境变量
| 环境变量 | 说明 | 适用于 |
|---------|------|--------|
| `ASSISTANT_ID` | Bot ID | 所有 hook |
| `USER_IDENTIFIER` | 用户标识 | 所有 hook |
| `SESSION_ID` | 会话 ID | 所有 hook |
| `LANGUAGE` | 语言代码 | 所有 hook |
| `HOOK_TYPE` | Hook 类型 | 所有 hook |
| `CONTENT` | 消息内容 | PreSave |
| `ROLE` | 消息角色 | PreSave |
| `RESPONSE` | Agent 响应 | PostAgent |
| `METADATA` | 元数据 JSON | PostAgent |
### PrePrompt 示例
```python
#!/usr/bin/env python3
import os
import sys
def main():
user_identifier = os.environ.get('USER_IDENTIFIER', '')
bot_id = os.environ.get('ASSISTANT_ID', '')
# 输出要注入到 prompt 中的内容
print(f"## User Context\n\n用户: {user_identifier}")
return 0
if __name__ == '__main__':
sys.exit(main())
```
### PreSave 示例
```python
#!/usr/bin/env python3
import os
import sys
def main():
content = os.environ.get('CONTENT', '')
# 处理内容并输出
print(content) # 输出处理后的内容
return 0
if __name__ == '__main__':
sys.exit(main())
```
### PostAgent 示例
```python
#!/usr/bin/env python3
import os
import sys
def main():
response = os.environ.get('RESPONSE', '')
session_id = os.environ.get('SESSION_ID', '')
# 记录日志(输出到 stderr
print(f"Session {session_id}: Response length {len(response)}", file=sys.stderr)
return 0
if __name__ == '__main__':
sys.exit(main())
```
## 使用场景
1. **PrePrompt**: 用户登录时自动加载其偏好设置、历史订单等
2. **PostAgent**: 记录对话分析数据,触发后续业务流程
3. **PreSave**: 敏感信息脱敏后再存储,如手机号、邮箱等

View File

@ -0,0 +1,20 @@
#!/usr/bin/env python3
"""
PreMemoryPrompt Hook - 用户上下文加载器示例
在记忆提取提示词FACT_RETRIEVAL_PROMPT加载时执行
读取同目录下的 memory_prompt.md 作为自定义记忆提取提示词模板
"""
import sys
from pathlib import Path
def main():
prompt_file = Path(__file__).parent / "retrieval-policy.md"
if prompt_file.exists():
print(prompt_file.read_text(encoding="utf-8"))
return 0
if __name__ == '__main__':
sys.exit(main())

View File

@ -0,0 +1,53 @@
# Retrieval Policy
### 1. Retrieval Order and Tool Selection
- Follow this section for source choice, tool choice, query rewrite, `top_k`, fallback, result handling, and citations.
- Use this default retrieval order and execute it sequentially: skill-enabled knowledge retrieval tools > `rag_retrieve` / `table_rag_retrieve` > local filesystem retrieval.
- Do NOT answer from model knowledge first.
- Do NOT skip directly to local filesystem retrieval when an earlier retrieval source may answer the question.
- When a suitable skill-enabled knowledge retrieval tool is available, use it first.
- If no suitable skill-enabled retrieval tool is available, or if its result is insufficient, continue with `rag_retrieve` or `table_rag_retrieve`.
- Use `table_rag_retrieve` first for values, prices, quantities, inventory, specifications, rankings, comparisons, summaries, extraction, lists, tables, name lookup, historical coverage, mixed questions, and unclear cases.
- Use `rag_retrieve` first only for clearly pure concept, definition, workflow, policy, or explanation questions without structured data needs.
- After each retrieval step, evaluate sufficiency before moving to the next source. Do NOT run these retrieval sources in parallel.
### 2. Query Preparation
- Do NOT pass the raw user question unless it already works well for retrieval.
- Rewrite for recall: extract entity, time scope, attributes, and intent.
- Add useful variants: synonyms, aliases, abbreviations, related titles, historical names, and category terms.
- Expand list-style, extraction, overview, historical, roster, timeline, and archive queries more aggressively.
- Preserve meaning. Do NOT introduce unrelated topics.
### 3. Retrieval Breadth (`top_k`)
- Apply `top_k` only to `rag_retrieve`. Use the smallest sufficient value, then expand only if coverage is insufficient.
- Use `30` for simple fact lookup.
- Use `50` for moderate synthesis, comparison, summarization, or disambiguation.
- Use `100` for broad recall, such as comprehensive analysis, scattered knowledge, multiple entities or periods, or list / catalog / timeline / roster / overview requests.
- Raise `top_k` when keyword branches are many or results are too few, repetitive, incomplete, sparse, or too narrow.
- Use this expansion order: `30 -> 50 -> 100`. If unsure, use `100`.
### 4. Result Evaluation
- Treat results as insufficient if they are empty, start with `Error:`, say `no excel files found`, are off-topic, miss the core entity or scope, or provide no usable evidence.
- Also treat results as insufficient when they cover only part of the request, or when full-list, historical, comparison, or mixed data + explanation requests return only partial or truncated coverage.
### 5. Fallback and Sequential Retry
- If the first retrieval result is insufficient, call the next retrieval source in the default order before replying.
- If the first RAG tool is insufficient, call the other RAG tool next before moving to local filesystem retrieval.
- If `table_rag_retrieve` is insufficient or empty, continue with `rag_retrieve`.
- If `rag_retrieve` is insufficient or empty, continue with `table_rag_retrieve`.
- If both `rag_retrieve` and `table_rag_retrieve` are insufficient, continue with local filesystem retrieval.
- Say no relevant information was found only after all applicable skill-enabled retrieval tools, both `rag_retrieve` and `table_rag_retrieve`, and local filesystem retrieval have been tried and still do not provide enough evidence.
- Do NOT reply that no relevant information was found before the final local filesystem fallback has also been tried.
### 6. Table RAG Result Handling
- Follow all `[INSTRUCTION]` and `[EXTRA_INSTRUCTION]` content in `table_rag_retrieve` results.
- If results are truncated, explicitly tell the user total matches (`N+M`), displayed count (`N`), and omitted count (`M`).
- Cite data sources using filenames from `file_ref_table`.
### 7. Citation Requirements for Retrieved Knowledge
- When using knowledge from `rag_retrieve` or `table_rag_retrieve`, you MUST generate `<CITATION ... />` tags.
- Follow the citation format returned by each tool.
- Place citations immediately after the paragraph or bullet list that uses the knowledge.
- Do NOT collect citations at the end.
- Use 1-2 citations per paragraph or bullet list when possible.
- If learned knowledge is used, include at least 1 `<CITATION ... />`.

View File

@ -0,0 +1,251 @@
#!/usr/bin/env python3
"""
MCP服务器通用工具函数
提供路径处理文件验证请求处理等公共功能
"""
import json
import os
import sys
import asyncio
from typing import Any, Dict, List, Optional, Union
import re
def get_allowed_directory():
"""获取允许访问的目录"""
# 优先使用命令行参数传入的dataset_dir
if len(sys.argv) > 1:
dataset_dir = sys.argv[1]
return os.path.abspath(dataset_dir)
# 从环境变量读取项目数据目录
project_dir = os.getenv("PROJECT_DATA_DIR", "./projects/data")
return os.path.abspath(project_dir)
def resolve_file_path(file_path: str, default_subfolder: str = "default") -> str:
"""
解析文件路径支持 folder/document.txt document.txt 两种格式
Args:
file_path: 输入的文件路径
default_subfolder: 当只传入文件名时使用的默认子文件夹名称
Returns:
解析后的完整文件路径
"""
# 如果路径包含文件夹分隔符,直接使用
if '/' in file_path or '\\' in file_path:
clean_path = file_path.replace('\\', '/')
# 移除 projects/ 前缀(如果存在)
if clean_path.startswith('projects/'):
clean_path = clean_path[9:] # 移除 'projects/' 前缀
elif clean_path.startswith('./projects/'):
clean_path = clean_path[11:] # 移除 './projects/' 前缀
else:
# 如果只有文件名,添加默认子文件夹
clean_path = f"{default_subfolder}/{file_path}"
# 获取允许的目录
project_data_dir = get_allowed_directory()
# 尝试在项目目录中查找文件
full_path = os.path.join(project_data_dir, clean_path.lstrip('./'))
if os.path.exists(full_path):
return full_path
# 如果直接路径不存在,尝试递归查找
found = find_file_in_project(clean_path, project_data_dir)
if found:
return found
# 如果是纯文件名且在default子文件夹中不存在尝试在根目录查找
if '/' not in file_path and '\\' not in file_path:
root_path = os.path.join(project_data_dir, file_path)
if os.path.exists(root_path):
return root_path
raise FileNotFoundError(f"File not found: {file_path} (searched in {project_data_dir})")
def find_file_in_project(filename: str, project_dir: str) -> Optional[str]:
"""在项目目录中递归查找文件"""
# 如果filename包含路径只搜索指定的路径
if '/' in filename:
parts = filename.split('/')
target_file = parts[-1]
search_dir = os.path.join(project_dir, *parts[:-1])
if os.path.exists(search_dir):
target_path = os.path.join(search_dir, target_file)
if os.path.exists(target_path):
return target_path
else:
# 纯文件名,递归搜索整个项目目录
for root, dirs, files in os.walk(project_dir):
if filename in files:
return os.path.join(root, filename)
return None
def load_tools_from_json(tools_file_name: str) -> List[Dict[str, Any]]:
"""从 JSON 文件加载工具定义"""
try:
tools_file = os.path.join(os.path.dirname(__file__), tools_file_name)
if os.path.exists(tools_file):
with open(tools_file, 'r', encoding='utf-8') as f:
return json.load(f)
else:
# 如果 JSON 文件不存在,使用默认定义
return []
except Exception as e:
print(f"Warning: Unable to load tool definition JSON file: {str(e)}")
return []
def create_error_response(request_id: Any, code: int, message: str) -> Dict[str, Any]:
"""创建标准化的错误响应"""
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": code,
"message": message
}
}
def create_success_response(request_id: Any, result: Any) -> Dict[str, Any]:
"""创建标准化的成功响应"""
return {
"jsonrpc": "2.0",
"id": request_id,
"result": result
}
def create_initialize_response(request_id: Any, server_name: str, server_version: str = "1.0.0") -> Dict[str, Any]:
"""创建标准化的初始化响应"""
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {
"tools": {}
},
"serverInfo": {
"name": server_name,
"version": server_version
}
}
}
def create_ping_response(request_id: Any) -> Dict[str, Any]:
"""创建标准化的ping响应"""
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"pong": True
}
}
def create_tools_list_response(request_id: Any, tools: List[Dict[str, Any]]) -> Dict[str, Any]:
"""创建标准化的工具列表响应"""
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"tools": tools
}
}
def is_regex_pattern(pattern: str) -> bool:
"""检测字符串是否为正则表达式模式"""
# 检查 /pattern/ 格式
if pattern.startswith('/') and pattern.endswith('/') and len(pattern) > 2:
return True
# 检查 r"pattern" 或 r'pattern' 格式
if pattern.startswith(('r"', "r'")) and pattern.endswith(('"', "'")) and len(pattern) > 3:
return True
# 检查是否包含正则特殊字符
regex_chars = {'*', '+', '?', '|', '(', ')', '[', ']', '{', '}', '^', '$', '\\', '.'}
return any(char in pattern for char in regex_chars)
def compile_pattern(pattern: str) -> Union[re.Pattern, str, None]:
"""编译正则表达式模式,如果不是正则则返回原字符串"""
if not is_regex_pattern(pattern):
return pattern
try:
# 处理 /pattern/ 格式
if pattern.startswith('/') and pattern.endswith('/'):
regex_body = pattern[1:-1]
return re.compile(regex_body)
# 处理 r"pattern" 或 r'pattern' 格式
if pattern.startswith(('r"', "r'")) and pattern.endswith(('"', "'")):
regex_body = pattern[2:-1]
return re.compile(regex_body)
# 直接编译包含正则字符的字符串
return re.compile(pattern)
except re.error as e:
# 如果编译失败返回None表示无效的正则
print(f"Warning: Regular expression '{pattern}' compilation failed: {e}")
return None
async def handle_mcp_streaming(request_handler):
"""处理MCP请求的标准主循环"""
try:
while True:
# Read from stdin
line = await asyncio.get_event_loop().run_in_executor(None, sys.stdin.readline)
if not line:
break
line = line.strip()
if not line:
continue
try:
request = json.loads(line)
response = await request_handler(request)
# Write to stdout
sys.stdout.write(json.dumps(response, ensure_ascii=False) + "\n")
sys.stdout.flush()
except json.JSONDecodeError:
error_response = {
"jsonrpc": "2.0",
"error": {
"code": -32700,
"message": "Parse error"
}
}
sys.stdout.write(json.dumps(error_response, ensure_ascii=False) + "\n")
sys.stdout.flush()
except Exception as e:
error_response = {
"jsonrpc": "2.0",
"error": {
"code": -32603,
"message": f"Internal error: {str(e)}"
}
}
sys.stdout.write(json.dumps(error_response, ensure_ascii=False) + "\n")
sys.stdout.flush()
except KeyboardInterrupt:
pass

View File

@ -320,6 +320,12 @@ def create_robot_project(dataset_ids: List[str], bot_id: str, force_rebuild: boo
str: 机器人项目目录路径
"""
skills = list(skills or [])
has_rag_retrieve = any(Path(skill.lstrip("@")).name == "rag-retrieve" for skill in skills)
if dataset_ids and not has_rag_retrieve:
skills.append("@skills_autoload/rag-retrieve")
logger.info("Auto loaded skill '@skills_autoload/rag-retrieve' because dataset_ids is not empty")
logger.info(f"Ensuring robot project exists: {bot_id}, skills: {skills}")
# 创建机器人目录结构(如果不存在)
@ -375,27 +381,27 @@ def _extract_skills_to_robot(bot_id: str, skills: List[str], project_path: Path)
- 如果是简单名称 "rag-retrieve"从以下目录按优先级顺序查找
1. projects/uploads/{bot_id}/skills/
2. skills/
- 如果是以 @ 开头的仓库相对路径 "@skills_autoload/rag-retrieve"则从仓库根目录直接解析
搜索目录优先级先搜索 projects/uploads/{bot_id}/skills/再搜索 skills/
Args:
bot_id: 机器人 ID
skills: 技能文件名列表 ["rag-retrieve", "projects/uploads/{bot_id}/skills/rag-retrieve"]
skills: 技能文件名列表 ["rag-retrieve", "@skills_autoload/rag-retrieve", "projects/uploads/{bot_id}/skills/rag-retrieve"]
project_path: 项目路径
"""
import zipfile
# skills 源目录(按优先级顺序)
repo_root = Path(__file__).resolve().parent.parent
skills_source_dirs = [
project_path / "uploads" / bot_id / "skills",
Path("skills"),
repo_root / "skills",
]
skills_target_dir = project_path / "robot" / bot_id / "skills"
skills_target_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"Copying skills to {skills_target_dir}")
# 清理不在列表中的多余 skill 文件夹
expected_skill_names = {os.path.basename(skill) for skill in skills}
expected_skill_names = {Path(skill.lstrip("@")).name for skill in skills}
if skills_target_dir.exists():
for item in skills_target_dir.iterdir():
if item.is_dir() and item.name not in expected_skill_names:
@ -403,7 +409,8 @@ def _extract_skills_to_robot(bot_id: str, skills: List[str], project_path: Path)
shutil.rmtree(item)
for skill in skills:
target_dir = skills_target_dir / os.path.basename(skill)
skill_name = Path(skill.lstrip("@")).name
target_dir = skills_target_dir / skill_name
# 如果目标目录已存在,跳过复制
if target_dir.exists():
@ -412,20 +419,25 @@ def _extract_skills_to_robot(bot_id: str, skills: List[str], project_path: Path)
source_dir = None
# 简单名称:按优先级顺序在多个目录中查找
for base_dir in skills_source_dirs:
candidate_dir = base_dir / skill
if skill.startswith("@"):
candidate_dir = repo_root / skill.lstrip("@")
if candidate_dir.exists():
source_dir = candidate_dir
logger.info(f" Found skill '{skill}' in {base_dir}")
break
logger.info(f" Found skill '{skill}' at {candidate_dir}")
# 简单名称:按优先级顺序在多个目录中查找
if source_dir is None:
for base_dir in skills_source_dirs:
candidate_dir = base_dir / skill
if candidate_dir.exists():
source_dir = candidate_dir
logger.info(f" Found skill '{skill}' in {base_dir}")
break
if source_dir is None:
logger.warning(f" Skill directory '{skill}' not found in any source directory: {[str(d) for d in skills_source_dirs]}")
continue
target_dir = skills_target_dir / os.path.basename(skill)
try:
shutil.copytree(source_dir, target_dir)
logger.info(f" Copied: {source_dir} -> {target_dir}")