10 KiB
10 KiB
Qwen Agent - 智能数据检索专家系统
📋 项目概述
Qwen Agent 是一个基于 FastAPI 构建的智能数据检索专家系统,专门用于处理和分析结构化数据集。系统通过无状态的 ZIP 项目加载机制,支持动态加载多种数据集,并提供类似 OpenAI 的聊天接口,便于与现有 AI 应用集成。
🌟 核心特性
- 🔍 智能数据检索 - 基于倒排索引和多层数据架构的专业数据检索
- 📦 无状态项目加载 - 通过 ZIP URL 动态加载数据集,自动缓存和解压
- 🏗️ 多层架构数据处理 - 文档层、序列化层、索引层的分层存储
- 🚀 异步文件处理队列 - 基于 huey 和 SQLite 的高性能异步任务队列
- 📊 任务状态管理 - 实时任务状态查询和 SQLite 数据持久化
- 🤖 兼容 OpenAI API - 完全兼容 OpenAI chat/completions 接口
🚀 快速开始
环境要求
- Python 3.8+
- Poetry (推荐) 或 pip
- 足够的磁盘空间用于缓存
安装依赖
# 使用 Poetry (推荐)
poetry install
poetry run python fastapi_app.py
# 或使用 pip
pip install -r requirements.txt
python fastapi_app.py
Docker 部署
# 构建镜像
docker build -t qwen-agent .
# 运行容器
docker run -p 8001:8001 qwen-agent
# 或使用 Docker Compose
docker-compose up -d
📖 使用指南
1. 聊天接口 (OpenAI 兼容)
端点: POST /api/v1/chat/completions
curl -X POST "http://localhost:8001/api/v1/chat/completions" \
-H "Content-Type: application/json" \
-d '{
"messages": [
{
"role": "user",
"content": "HP Elite Mini 800 G9ってノートPC?"
}
],
"model": "qwen3-next",
"zip_url": "http://127.0.0.1:8080/all_hp_product_spec_book2506.zip",
"stream": false
}'
2. 异步文件处理队列
启动队列系统
# 终端1:启动队列消费者
poetry run python task_queue/consumer.py --workers 2
# 终端2:启动API服务器
poetry run python fastapi_app.py
提交异步任务
curl -X POST "http://localhost:8001/api/v1/files/process/async" \
-H "Content-Type: application/json" \
-d '{
"unique_id": "my_project_123",
"files": {
"documents": ["public/document.txt"],
"reports": ["public/data.zip"]
},
"system_prompt": "处理这些文档"
}'
响应:
{
"success": true,
"task_id": "abc-123-def",
"unique_id": "my_project_123",
"task_status": "pending",
"estimated_processing_time": 30
}
查询任务状态
# 🎯 主要接口 - 只需要记住这一个
curl "http://localhost:8001/api/v1/task/abc-123-def/status"
状态响应:
{
"success": true,
"task_id": "abc-123-def",
"status": "completed",
"unique_id": "my_project_123",
"result": {
"status": "success",
"message": "成功处理了 2 个文档文件",
"processed_files": ["projects/my_project_123/dataset/docs/document.txt"]
}
}
3. Python 客户端示例
import requests
import time
def submit_and_monitor_task():
# 1. 提交任务
response = requests.post(
"http://localhost:8001/api/v1/files/process/async",
json={
"unique_id": "my_project",
"files": {"docs": ["public/file.txt"]}
}
)
task_id = response.json()["task_id"]
print(f"任务已提交: {task_id}")
# 2. 监控任务状态
while True:
response = requests.get(f"http://localhost:8001/api/v1/task/{task_id}/status")
data = response.json()
status = data["status"]
print(f"任务状态: {status}")
if status == "completed":
print("🎉 任务完成!")
break
elif status == "failed":
print("❌ 任务失败!")
break
time.sleep(2)
submit_and_monitor_task()
🗃️ 数据包结构
ZIP 数据集格式
dataset_name/
├── README.md # 数据集说明文档
├── dataset/
│ └── data_collection/
│ ├── document.txt # 原始文本内容
│ ├── serialization.txt # 结构化数据
│ └── schema.json # 字段定义和元数据
├── mcp_settings.json # MCP 工具配置
└── system_prompt.md # 系统提示词(可选)
文件说明
- document.txt: 原始 Markdown 文本,提供完整上下文
- serialization.txt: 格式化结构数据,每行
字段1:值1;字段2:值2 - schema.json: 字段定义、枚举值映射和文件关联关系
📊 数据存储和管理
任务状态存储
任务状态存储在 SQLite 数据库中:
queue_data/task_status.db
数据库结构:
CREATE TABLE task_status (
task_id TEXT PRIMARY KEY, -- 任务ID
unique_id TEXT NOT NULL, -- 项目ID
status TEXT NOT NULL, -- 任务状态
created_at REAL NOT NULL, -- 创建时间
updated_at REAL NOT NULL, -- 更新时间
result TEXT, -- 处理结果(JSON)
error TEXT -- 错误信息
);
数据库管理工具
# 查看数据库内容
poetry run python db_manager.py view
# 交互式管理
poetry run python db_manager.py interactive
# 获取统计信息
curl "http://localhost:8001/api/v1/tasks/statistics"
数据备份
# 备份数据库
cp queue_data/task_status.db queue_data/backup_$(date +%Y%m%d).db
# 清理旧记录
curl -X POST "http://localhost:8001/api/v1/tasks/cleanup?older_than_days=7"
🛠️ API 接口总览
聊天接口
POST /api/v1/chat/completions- OpenAI 兼容的聊天接口
文件处理接口
POST /api/v1/files/process- 同步文件处理POST /api/v1/files/process/async- 异步文件处理GET /api/v1/files/{unique_id}/status- 文件处理状态
任务管理接口
GET /api/v1/task/{task_id}/status- 主要接口 - 查询任务状态GET /api/v1/tasks- 列出任务(支持筛选)GET /api/v1/tasks/statistics- 获取统计信息DELETE /api/v1/task/{task_id}- 删除任务记录
系统管理接口
GET /api/health- 健康检查GET /system/status- 系统状态POST /system/cleanup-cache- 清理缓存
🔧 配置和部署
环境变量
# 模型配置
MODEL_SERVER=https://openrouter.ai/api/v1
API_KEY=your-api-key
# 队列配置
MAX_CACHED_AGENTS=20
# 其他配置
TOKENIZERS_PARALLELISM=false
生产部署建议
-
队列配置
# 设置合适的工作线程数 poetry run python task_queue/consumer.py --workers 4 --worker-type threads -
性能优化
- 使用 Redis 作为队列后端(可选)
- 配置 nginx 作为反向代理
- 设置适当的缓存策略
-
监控
- 定期检查任务状态
- 监控磁盘空间使用
- 设置日志轮转
📈 性能特性
智能检索策略
- 探索性查询: 结构分析 → 模式发现 → 结果扩展
- 精确性查询: 目标定位 → 直接搜索 → 结果验证
- 分析性查询: 多维度分析 → 深度挖掘 → 洞察提取
缓存机制
- ZIP 文件基于 URL 的 MD5 哈希值进行缓存
- 助手实例缓存,提高响应速度
- SQLite 查询缓存
并发处理
- 异步文件处理队列
- 多线程任务执行
- 支持批量操作
📁 项目结构
qwen-agent/
├── fastapi_app.py # FastAPI 主应用
├── gbase_agent.py # 助手服务逻辑
├── task_queue/ # 队列系统
│ ├── config.py # 队列配置
│ ├── manager.py # 队列管理器
│ ├── tasks.py # 文件处理任务
│ ├── integration_tasks.py # 集成任务
│ ├── task_status.py # 任务状态存储
│ └── consumer.py # 队列消费者
├── utils/ # 工具模块
├── models/ # 模型文件
├── projects/ # 项目目录
├── queue_data/ # 队列数据
├── public/ # 静态文件
├── db_manager.py # 数据库管理工具
├── requirements.txt # 依赖列表
├── pyproject.toml # Poetry 配置
├── Dockerfile # Docker 构建文件
└── docker-compose.yml # Docker Compose 配置
🎯 使用场景
适用场景
- 产品规格检索 - 快速查找产品技术规格
- 文档分析 - 大量文档的智能检索和分析
- 数据问答 - 基于结构化数据的问答系统
- 知识库构建 - 企业知识库的智能检索
示例数据集
项目包含 HP 产品规格书数据集:
- 商用/个人笔记本电脑 (EliteBook/OmniBook)
- 台式机 (Elite/OMEN)
- 工作站 (Z系列)
- 显示器 (Series 3/5/OMEN)
- Poly 通信设备
- HyperX 游戏配件
🤝 贡献指南
- Fork 项目
- 创建特性分支 (
git checkout -b feature/AmazingFeature) - 提交更改 (
git commit -m 'Add some AmazingFeature') - 推送到分支 (
git push origin feature/AmazingFeature) - 打开 Pull Request
📄 许可证
本项目采用 MIT 许可证 - 查看 LICENSE 文件了解详情。
🆘 支持
🎉 开始使用
-
克隆项目
git clone https://github.com/your-repo/qwen-agent.git cd qwen-agent -
安装依赖
poetry install -
启动服务
# 启动队列消费者 poetry run python task_queue/consumer.py --workers 2 # 启动API服务器 poetry run python fastapi_app.py -
测试接口
# 运行测试脚本 poetry run python test_simple_task.py
现在您可以开始使用 Qwen Agent 进行智能数据检索了!🚀