19 KiB
Catalog Agent - 智能数据检索专家系统
📋 项目概述
Catalog Agent 是一个基于 FastAPI 构建的智能数据检索专家系统,专门用于处理和分析结构化数据集。系统通过无状态的 ZIP 项目加载机制,支持动态加载多种数据集,并提供类似 OpenAI 的聊天接口,便于与现有 AI 应用集成。
🌟 核心特性
- 🔍 智能数据检索 - 基于倒排索引和多层数据架构的专业数据检索
- 📦 无状态项目加载 - 通过 ZIP URL 动态加载数据集,自动缓存和解压
- 🏗️ 多层架构数据处理 - 文档层、序列化层、索引层的分层存储
- 🚀 异步文件处理队列 - 基于 huey 和 SQLite 的高性能异步任务队列
- 📊 任务状态管理 - 实时任务状态查询和 SQLite 数据持久化
- 🤖 兼容 OpenAI API - 完全兼容 OpenAI chat/completions 接口
- 🔧 MCP 工具集成 - 集成多种 Model Context Protocol 工具,支持语义搜索、关键词搜索、Excel/CSV 操作等
🚀 快速开始
环境要求
- Python 3.12+
- Poetry (推荐) 或 pip
- 足够的磁盘空间用于缓存
安装依赖
# 使用 Poetry (推荐)
poetry install
poetry run python fastapi_app.py
# 或使用 pip
pip install -r requirements.txt
python fastapi_app.py
# 启动队列消费者(另一个终端)
poetry run python task_queue/consumer.py --workers 2
Docker 部署
# 构建镜像
docker build -t qwen-agent .
# 运行容器
docker run -p 8001:8001 qwen-agent
# 或使用 Docker Compose
docker-compose up -d
📖 使用指南
Catalog Agent API 文档
概述
本文档提供 Catalog Agent 服务的 API 接口说明,支持多语言对话、文件上传和解析功能。
服务地址
公网地址
- 开发环境:
https://catalog-agent-dev.gbase.ai - 生产环境:
https://catalog-agent.gbase.ai
内网地址
- 生产环境:
http://catalog-agent.default.svc.cluster.local - 开发环境:
http://catalog-agent.gbase-dev.svc.cluster.local
接口列表
1. 通用聊天接口 V1 (OpenAI 兼容)
端点: POST /api/v1/chat/completions
认证方式: Bearer Token (使用大语言模型的 API Key)
请求示例:
curl -X POST "{host}/api/v1/chat/completions" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer {api_key}" \
-d '{
"messages": [
{
"role": "user",
"content": "1kg未満のノートPCを知りたいので表で出力してください"
}
],
"stream": true,
"language": "ja",
"robot_type": "catalog_agent",
"model": "gpt-4.1",
"model_server": "https://one-dev.felo.me/v1",
"bot_id": "f4aecffd4e9c-624be71-5432-40bf-9758",
"dataset_ids": ["624be71-5432-40bf-9758-f4aecffd4e9c"],
"tool_response": false
}'
请求参数说明:
| 参数 | 类型 | 必填 | 说明 |
|---|---|---|---|
| messages | array | 是 | 对话消息列表 |
| stream | boolean | 是 | 是否启用流式输出 |
| language | string | 是 | 语言代码: zh/en/ja |
| robot_type | string | 是 | 固定值: catalog_agent |
| model | string | 是 | AI 模型名称 |
| model_server | string | 是 | AI 模型服务器地址 |
| bot_id | string | 是 | 机器人唯一标识 |
| dataset_ids | array | 是 | 知识库 ID 数组 |
| tool_response | boolean | 是 | 是否返回工具响应 |
2. GBase 聊天接口 V2
端点: POST /api/v2/chat/completions
认证方式: Bearer Token (使用 md5(master:{bot_id}) 生成)
注意:此接口的模型、服务器、API Key、数据集信息会自动从 GBase 读取,无需手动传入。
请求示例:
curl -X POST "{host}/api/v2/chat/completions" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer {md5(master:{bot_id})}" \
-d '{
"messages": [
{
"role": "user",
"content": "1kg未満のノートPCを知りたいので表で出力してください"
}
],
"stream": true,
"language": "ja",
"bot_id": "f4aecffd4e9c-624be71-5432-40bf-9758",
"tool_response": false
}'
请求参数说明:
| 参数 | 类型 | 必填 | 说明 |
|---|---|---|---|
| messages | array | 是 | 对话消息列表 |
| stream | boolean | 是 | 是否启用流式输出 |
| language | string | 是 | 语言代码: zh/en/ja |
| bot_id | string | 是 | 机器人唯一标识 |
| tool_response | boolean | 是 | 是否返回工具响应 |
3. 文件上传
端点: POST /api/v1/upload
请求方式: 表单数据上传
参数名: file
响应示例:
{
"success": true,
"message": "文件上传成功",
"filename": "12345678-1234-5678-9abc-123456789def.pdf",
"original_filename": "document.pdf",
"file_path": "projects/uploads/12345678-1234-5678-9abc-123456789def.pdf"
}
4. 文件解析
4.1 全量文件解析
端点: POST /api/v1/files/process/async
请求示例:
curl -X POST "{host}/api/v1/files/process/async" \
-H "Content-Type: application/json" \
-d '{
"dataset_id": "624be71-5432-40bf-9758-f4aecffd4e9c",
"files": {
"group_name1": [
"public/document.txt",
"public/data.zip",
"public/goods.xlsx"
],
"group_name2": [
"public/document.txt",
"public/data.zip",
"public/goods.xlsx"
]
}
}'
4.2 增量文件解析
端点: POST /api/v1/files/process/incremental
请求示例:
curl -X POST "{host}/api/v1/files/process/incremental" \
-H "Content-Type: application/json" \
-d '{
"dataset_id": "624be71-5432-40bf-9758-f4aecffd4e9c",
"files_to_add": {
"group_name1": [
"projects/uploads/report_list/1090571652550791207.md"
],
"group_name2": [
"projects/uploads/report_list/1090570966043889684.md"
]
},
"files_to_remove": {
"group_name1": [
"projects/uploads/report_list/1090570712888283212.md"
],
"group_name2": [
"projects/uploads/report_list/1090570712888283214.md"
]
}
}'
解析响应:
{
"success": true,
"task_id": "abc-123-def",
"dataset_id": "624be71-5432-40bf-9758-f4aecffd4e9c",
"task_status": "pending",
"estimated_processing_time": 30
}
5. 查询任务状态
端点: GET /api/v1/task/{task_id}/status
请求示例:
curl "{host}/api/v1/task/{task_id}/status"
响应示例:
{
"success": true,
"task_id": "abc-123-def",
"status": "completed",
"dataset_id": "my_project_123",
"result": {
"status": "success",
"message": "成功处理了 2 个文档文件",
"processed_files": ["projects/my_project_123/dataset/docs/document.txt"]
}
}
注意事项
- 认证方式: V1 和 V2 接口使用不同的认证机制,请根据接口版本选择合适的认证方式
- 必填参数: 所有标记为"必填"的参数必须提供
- 文件路径: 文件解析接口中使用的文件路径应为文件上传接口返回的
file_path - 任务状态: 文件解析为异步操作,需要通过任务状态接口查询处理进度
项目目录结构
文件处理后,会在 projects/{unique_id}/ 目录下生成以下结构:
projects/{unique_id}/
├── README.md # 项目说明文件
├── files/ # 原始文件存储
│ ├── document.txt # 原始文档
│ └── data.zip # 压缩文件
├── dataset/ # 处理后的数据集
│ └── default/ # 默认数据集
│ ├── document.txt # 原始markdown文本内容
│ ├── pagination.txt # 分页数据层,每页5000字符
│ └── embedding.pkl # 文档向量嵌入文件
└── processed_files.json # 文件处理日志
三层数据架构说明:
- 原始文档层 (document.txt): 完整的markdown文本内容,提供上下文信息
- 分页数据层 (pagination.txt): 按页分割的数据,每页5000字符,便于检索
- 向量嵌入层 (embedding.pkl): 文档的语义向量,支持语义搜索
4. 项目目录树接口
获取完整目录树
端点: GET /api/v1/projects/tree
# 获取完整目录树
curl "http://localhost:8001/api/v1/projects/tree"
# 只显示目录结构(不包含文件)
curl "http://localhost:8001/api/v1/projects/tree?include_files=false"
# 只显示data目录
curl "http://localhost:8001/api/v1/projects/tree?filter_type=data"
响应示例:
{
"success": true,
"message": "目录树获取成功",
"tree": {
"name": "projects",
"path": "",
"type": "directory",
"children": [
{
"name": "data",
"path": "data",
"type": "directory",
"children": [
{
"name": "1624be71-5432-40bf-9758-f4aecffd4e9c",
"path": "data/1624be71-5432-40bf-9758-f4aecffd4e9c",
"type": "directory",
"children": [...]
}
]
}
],
"size": 0,
"modified_time": 1234567890
},
"stats": {
"total_directories": 15,
"total_files": 32,
"total_size": 1048576
}
}
获取子目录树结构
端点: GET /api/v1/projects/subtree/{sub_path:path}
# 获取特定项目的目录结构
curl "http://localhost:8001/api/v1/projects/subtree/data/1624be71-5432-40bf-9758-f4aecffd4e9c"
# 只显示目录层级
curl "http://localhost:8001/api/v1/projects/subtree/data/1624be71-5432-40bf-9758-f4aecffd4e9c?include_files=false"
参数说明:
sub_path: 子目录路径,如 'data/1624be71-5432-40bf-9758-f4aecffd4e9c'include_files: 是否包含文件详情(默认true)max_depth: 最大深度限制(默认10)
功能特性:
- 递归构建完整的目录树结构
- 包含文件大小和修改时间信息
- 支持过滤文件类型和目录层级
- 提供统计信息(目录数、文件数、总大小)
- 安全的错误处理机制
---
## 🗃️ 数据包结构
### ZIP 数据集格式
dataset_name/ ├── README.md # 数据集说明文档 ├── dataset/ │ └── data_collection/ │ ├── document.txt # 原始文本内容 │ ├── serialization.txt # 结构化数据 │ └── schema.json # 字段定义和元数据 ├── mcp_settings.json # MCP 工具配置 └── system_prompt.md # 系统提示词(可选)
### 文件说明
- **document.txt**: 原始 Markdown 文本,提供完整上下文
- **serialization.txt**: 格式化结构数据,每行 `字段1:值1;字段2:值2`
- **schema.json**: 字段定义、枚举值映射和文件关联关系
- **mcp_settings.json**: MCP 工具配置,定义可用的数据处理工具
---
## 📊 数据存储和管理
### 任务状态存储
任务状态存储在 SQLite 数据库中:
queue_data/task_status.db
**数据库结构**:
```sql
CREATE TABLE task_status (
task_id TEXT PRIMARY KEY, -- 任务ID
unique_id TEXT NOT NULL, -- 项目ID
status TEXT NOT NULL, -- 任务状态
created_at REAL NOT NULL, -- 创建时间
updated_at REAL NOT NULL, -- 更新时间
result TEXT, -- 处理结果(JSON)
error TEXT -- 错误信息
);
数据库管理工具
# 查看数据库内容
poetry run python db_manager.py view
# 交互式管理
poetry run python db_manager.py interactive
# 获取统计信息
curl "http://localhost:8001/api/v1/tasks/statistics"
数据备份
# 备份数据库
cp queue_data/task_status.db queue_data/backup_$(date +%Y%m%d).db
# 清理旧记录
curl -X POST "http://localhost:8001/api/v1/tasks/cleanup?older_than_days=7"
🛠️ API 接口总览
聊天接口
POST /api/v1/chat/completions- OpenAI 兼容的聊天接口
文件处理接口
POST /api/v1/files/process/async- 异步文件处理GET /api/v1/files/{unique_id}/status- 文件处理状态
任务管理接口
GET /api/v1/task/{task_id}/status- 主要接口 - 查询任务状态GET /api/v1/tasks- 列出任务(支持筛选)GET /api/v1/tasks/statistics- 获取统计信息DELETE /api/v1/task/{task_id}- 删除任务记录POST /api/v1/project/cleanup- 清理项目数据
项目目录树接口
GET /api/v1/projects/tree- 获取projects文件夹完整目录树结构GET /api/v1/projects/subtree/{sub_path:path}- 获取指定子目录的树结构
系统管理接口
GET /api/health- 健康检查GET /system/status- 系统状态POST /system/cleanup-cache- 清理缓存
🔧 配置和部署
环境变量
# 模型配置
MODEL_SERVER=https://openrouter.ai/api/v1
API_KEY=your-api-key
# 队列配置
TOOL_CACHE_MAX_SIZE=20
# 其他配置
TOKENIZERS_PARALLELISM=false
生产部署建议
-
队列配置
# 设置合适的工作线程数 poetry run python task_queue/consumer.py --workers 4 --worker-type threads -
性能优化
- 使用 Redis 作为队列后端(可选)
- 配置 nginx 作为反向代理
- 设置适当的缓存策略
-
监控
- 定期检查任务状态
- 监控磁盘空间使用
- 设置日志轮转
📈 性能特性
智能检索策略
- 探索性查询: 结构分析 → 模式发现 → 结果扩展
- 精确性查询: 目标定位 → 直接搜索 → 结果验证
- 分析性查询: 多维度分析 → 深度挖掘 → 洞察提取
缓存机制
- ZIP 文件基于 URL 的 MD5 哈希值进行缓存
- 助手实例缓存,提高响应速度
- SQLite 查询缓存
并发处理
- 异步文件处理队列
- 多线程任务执行
- 支持批量操作
📁 项目结构
qwen-agent/
├── fastapi_app.py # FastAPI 主应用
├── modified_assistant.py # 修改版助手服务逻辑
├── task_queue/ # 队列系统
│ ├── config.py # 队列配置
│ ├── manager.py # 队列管理器
│ ├── tasks.py # 文件处理任务
│ ├── integration_tasks.py # 集成任务
│ ├── task_status.py # 任务状态存储
│ └── consumer.py # 队列消费者
├── utils/ # 工具模块
│ ├── agent_pool.py # 助手池管理
│ ├── api_models.py # API 数据模型
│ ├── dataset_manager.py # 数据集管理
│ ├── excel_csv_processor.py # Excel/CSV 处理器
│ ├── file_utils.py # 文件操作工具
│ ├── project_manager.py # 项目管理器
│ └── prompt_loader.py # 提示词加载器
├── mcp/ # MCP 工具服务器
│ ├── semantic_search_server.py # 语义搜索服务
│ ├── multi_keyword_search_server.py # 多关键词搜索服务
│ ├── excel_csv_operator_server.py # Excel/CSV 操作服务
│ ├── json_reader_server.py # JSON 读取服务
│ ├── mcp_settings.json # MCP 配置文件
│ └── tools/ # 工具定义文件
├── models/ # 模型文件
├── projects/ # 项目目录
│ └── queue_data/ # 队列数据
├── public/ # 静态文件
├── embedding/ # 嵌入模型模块
├── prompt/ # 系统提示词
├── db_manager.py # 数据库管理工具
├── requirements.txt # 依赖列表
├── pyproject.toml # Poetry 配置
├── Dockerfile # Docker 构建文件
└── docker-compose.yml # Docker Compose 配置
🎯 使用场景
适用场景
- 产品规格检索 - 快速查找产品技术规格
- 文档分析 - 大量文档的智能检索和分析
- 数据问答 - 基于结构化数据的问答系统
- 知识库构建 - 企业知识库的智能检索
示例数据集
项目包含 HP 产品规格书数据集:
- 商用/个人笔记本电脑 (EliteBook/OmniBook)
- 台式机 (Elite/OMEN)
- 工作站 (Z系列)
- 显示器 (Series 3/5/OMEN)
- Poly 通信设备
- HyperX 游戏配件
MCP 工具支持
系统集成了以下 MCP 工具:
- 语义搜索工具 - 基于句子嵌入的语义相似度搜索
- 多关键词搜索工具 - 支持多字段、多条件的组合搜索
- Excel/CSV 操作工具 - 读取、搜索、分析 Excel 和 CSV 文件
- JSON 读取工具 - 解析和查询 JSON 格式数据
🤝 贡献指南
- Fork 项目
- 创建特性分支 (
git checkout -b feature/AmazingFeature) - 提交更改 (
git commit -m 'Add some AmazingFeature') - 推送到分支 (
git push origin feature/AmazingFeature) - 打开 Pull Request
📄 许可证
本项目采用 MIT 许可证 - 查看 LICENSE 文件了解详情。
🆘 支持
🎉 开始使用
-
克隆项目
git clone https://github.com/your-repo/qwen-agent.git cd qwen-agent -
安装依赖
poetry install -
启动服务
# 启动队列消费者 poetry run python task_queue/consumer.py --workers 2 # 启动API服务器 poetry run python fastapi_app.py -
测试接口
# 测试聊天接口 curl -X POST "http://localhost:8001/api/v1/chat/completions" \ -H "Content-Type: application/json" \ -d '{"messages": [{"role": "user", "content": "你好"}], "model": "qwen3-next"}' # 测试任务状态查询 curl "http://localhost:8001/api/v1/tasks/statistics"
现在您可以开始使用 Catalog Agent 进行智能数据检索了!🚀
提示词工程
指南
1) Condition: 用户意图为“执行设备控制”。 Action: 如果用户在当前交互中已回复肯定确认(例如,同意、确认、好的等),则立即调用 Iot Control-dxcore_update_device_status 工具执行设备操作。否则(用户尚未确认),则向用户发送确认询问:“即将为您[操作内容][设备名称][具体参数],是否确认?”并等待用户回复。
3) Condition: 当用户意图为查询设备状态或信息时。 Action: 调用 Iot Control 设备相关的工具进行查询(不调用 rag_retrieve-rag_retrieve)。
4) Condition: 当用户意图为非设备相关的问题(如提问、遇到难处、咨询事实性问题、价格等)时。 Action: 优先调用 rag_retrieve-rag_retrieve 查询知识库;如果无结果,则调用 WebSearch-web_search;最后综合两个工具的内容进行回复。
术语表
1) Name: term_name1, Description: desc, Synonyms: syn1, syn2
2) Name: term_name2, Description: desc, Synonyms: syn1, syn2