catalog-agent/README.md
2025-10-18 09:20:59 +08:00

10 KiB
Raw Permalink Blame History

Qwen Agent - 智能数据检索专家系统

Python FastAPI License

📋 项目概述

Qwen Agent 是一个基于 FastAPI 构建的智能数据检索专家系统,专门用于处理和分析结构化数据集。系统通过无状态的 ZIP 项目加载机制,支持动态加载多种数据集,并提供类似 OpenAI 的聊天接口,便于与现有 AI 应用集成。

🌟 核心特性

  • 🔍 智能数据检索 - 基于倒排索引和多层数据架构的专业数据检索
  • 📦 无状态项目加载 - 通过 ZIP URL 动态加载数据集,自动缓存和解压
  • 🏗️ 多层架构数据处理 - 文档层、序列化层、索引层的分层存储
  • 🚀 异步文件处理队列 - 基于 huey 和 SQLite 的高性能异步任务队列
  • 📊 任务状态管理 - 实时任务状态查询和 SQLite 数据持久化
  • 🤖 兼容 OpenAI API - 完全兼容 OpenAI chat/completions 接口

🚀 快速开始

环境要求

  • Python 3.8+
  • Poetry (推荐) 或 pip
  • 足够的磁盘空间用于缓存

安装依赖

# 使用 Poetry (推荐)
poetry install
poetry run python fastapi_app.py

# 或使用 pip
pip install -r requirements.txt
python fastapi_app.py

Docker 部署

# 构建镜像
docker build -t qwen-agent .

# 运行容器
docker run -p 8001:8001 qwen-agent

# 或使用 Docker Compose
docker-compose up -d

📖 使用指南

1. 聊天接口 (OpenAI 兼容)

端点: POST /api/v1/chat/completions

curl -X POST "http://localhost:8001/api/v1/chat/completions" \
  -H "Content-Type: application/json" \
  -d '{
    "messages": [
      {
        "role": "user",
        "content": "HP Elite Mini 800 G9ってートPC"
      }
    ],
    "model": "qwen3-next",
    "zip_url": "http://127.0.0.1:8080/all_hp_product_spec_book2506.zip",
    "stream": false
  }'

2. 异步文件处理队列

启动队列系统

# 终端1启动队列消费者
poetry run python task_queue/consumer.py --workers 2

# 终端2启动API服务器
poetry run python fastapi_app.py

提交异步任务

curl -X POST "http://localhost:8001/api/v1/files/process/async" \
  -H "Content-Type: application/json" \
  -d '{
    "unique_id": "my_project_123",
    "files": {
      "documents": ["public/document.txt"],
      "reports": ["public/data.zip"]
    },
    "system_prompt": "处理这些文档"
  }'

响应:

{
  "success": true,
  "task_id": "abc-123-def",
  "unique_id": "my_project_123",
  "task_status": "pending",
  "estimated_processing_time": 30
}

查询任务状态

# 🎯 主要接口 - 只需要记住这一个
curl "http://localhost:8001/api/v1/task/abc-123-def/status"

状态响应:

{
  "success": true,
  "task_id": "abc-123-def",
  "status": "completed",
  "unique_id": "my_project_123",
  "result": {
    "status": "success",
    "message": "成功处理了 2 个文档文件",
    "processed_files": ["projects/my_project_123/dataset/docs/document.txt"]
  }
}

3. Python 客户端示例

import requests
import time

def submit_and_monitor_task():
    # 1. 提交任务
    response = requests.post(
        "http://localhost:8001/api/v1/files/process/async",
        json={
            "unique_id": "my_project",
            "files": {"docs": ["public/file.txt"]}
        }
    )
    
    task_id = response.json()["task_id"]
    print(f"任务已提交: {task_id}")
    
    # 2. 监控任务状态
    while True:
        response = requests.get(f"http://localhost:8001/api/v1/task/{task_id}/status")
        data = response.json()
        
        status = data["status"]
        print(f"任务状态: {status}")
        
        if status == "completed":
            print("🎉 任务完成!")
            break
        elif status == "failed":
            print("❌ 任务失败!")
            break
        
        time.sleep(2)

submit_and_monitor_task()

🗃️ 数据包结构

ZIP 数据集格式

dataset_name/
├── README.md              # 数据集说明文档
├── dataset/
│   └── data_collection/
│       ├── document.txt      # 原始文本内容
│       ├── serialization.txt # 结构化数据
│       └── schema.json       # 字段定义和元数据
├── mcp_settings.json      # MCP 工具配置
└── system_prompt.md       # 系统提示词(可选)

文件说明

  • document.txt: 原始 Markdown 文本,提供完整上下文
  • serialization.txt: 格式化结构数据,每行 字段1:值1;字段2:值2
  • schema.json: 字段定义、枚举值映射和文件关联关系

📊 数据存储和管理

任务状态存储

任务状态存储在 SQLite 数据库中:

queue_data/task_status.db

数据库结构:

CREATE TABLE task_status (
    task_id TEXT PRIMARY KEY,      -- 任务ID
    unique_id TEXT NOT NULL,      -- 项目ID
    status TEXT NOT NULL,         -- 任务状态
    created_at REAL NOT NULL,     -- 创建时间
    updated_at REAL NOT NULL,     -- 更新时间
    result TEXT,                  -- 处理结果(JSON)
    error TEXT                    -- 错误信息
);

数据库管理工具

# 查看数据库内容
poetry run python db_manager.py view

# 交互式管理
poetry run python db_manager.py interactive

# 获取统计信息
curl "http://localhost:8001/api/v1/tasks/statistics"

数据备份

# 备份数据库
cp queue_data/task_status.db queue_data/backup_$(date +%Y%m%d).db

# 清理旧记录
curl -X POST "http://localhost:8001/api/v1/tasks/cleanup?older_than_days=7"

🛠️ API 接口总览

聊天接口

  • POST /api/v1/chat/completions - OpenAI 兼容的聊天接口

文件处理接口

  • POST /api/v1/files/process - 同步文件处理
  • POST /api/v1/files/process/async - 异步文件处理
  • GET /api/v1/files/{unique_id}/status - 文件处理状态

任务管理接口

  • GET /api/v1/task/{task_id}/status - 主要接口 - 查询任务状态
  • GET /api/v1/tasks - 列出任务(支持筛选)
  • GET /api/v1/tasks/statistics - 获取统计信息
  • DELETE /api/v1/task/{task_id} - 删除任务记录

系统管理接口

  • GET /api/health - 健康检查
  • GET /system/status - 系统状态
  • POST /system/cleanup-cache - 清理缓存

🔧 配置和部署

环境变量

# 模型配置
MODEL_SERVER=https://openrouter.ai/api/v1
API_KEY=your-api-key

# 队列配置  
MAX_CACHED_AGENTS=20

# 其他配置
TOKENIZERS_PARALLELISM=false

生产部署建议

  1. 队列配置

    # 设置合适的工作线程数
    poetry run python task_queue/consumer.py --workers 4 --worker-type threads
    
  2. 性能优化

    • 使用 Redis 作为队列后端(可选)
    • 配置 nginx 作为反向代理
    • 设置适当的缓存策略
  3. 监控

    • 定期检查任务状态
    • 监控磁盘空间使用
    • 设置日志轮转

📈 性能特性

智能检索策略

  • 探索性查询: 结构分析 → 模式发现 → 结果扩展
  • 精确性查询: 目标定位 → 直接搜索 → 结果验证
  • 分析性查询: 多维度分析 → 深度挖掘 → 洞察提取

缓存机制

  • ZIP 文件基于 URL 的 MD5 哈希值进行缓存
  • 助手实例缓存,提高响应速度
  • SQLite 查询缓存

并发处理

  • 异步文件处理队列
  • 多线程任务执行
  • 支持批量操作

📁 项目结构

qwen-agent/
├── fastapi_app.py              # FastAPI 主应用
├── gbase_agent.py              # 助手服务逻辑
├── task_queue/                 # 队列系统
│   ├── config.py              # 队列配置
│   ├── manager.py             # 队列管理器
│   ├── tasks.py               # 文件处理任务
│   ├── integration_tasks.py   # 集成任务
│   ├── task_status.py         # 任务状态存储
│   └── consumer.py            # 队列消费者
├── utils/                      # 工具模块
├── models/                     # 模型文件
├── projects/                   # 项目目录
├── queue_data/                 # 队列数据
├── public/                     # 静态文件
├── db_manager.py               # 数据库管理工具
├── requirements.txt            # 依赖列表
├── pyproject.toml             # Poetry 配置
├── Dockerfile                  # Docker 构建文件
└── docker-compose.yml          # Docker Compose 配置

🎯 使用场景

适用场景

  • 产品规格检索 - 快速查找产品技术规格
  • 文档分析 - 大量文档的智能检索和分析
  • 数据问答 - 基于结构化数据的问答系统
  • 知识库构建 - 企业知识库的智能检索

示例数据集

项目包含 HP 产品规格书数据集:

  • 商用/个人笔记本电脑 (EliteBook/OmniBook)
  • 台式机 (Elite/OMEN)
  • 工作站 (Z系列)
  • 显示器 (Series 3/5/OMEN)
  • Poly 通信设备
  • HyperX 游戏配件

🤝 贡献指南

  1. Fork 项目
  2. 创建特性分支 (git checkout -b feature/AmazingFeature)
  3. 提交更改 (git commit -m 'Add some AmazingFeature')
  4. 推送到分支 (git push origin feature/AmazingFeature)
  5. 打开 Pull Request

📄 许可证

本项目采用 MIT 许可证 - 查看 LICENSE 文件了解详情。


🆘 支持


🎉 开始使用

  1. 克隆项目

    git clone https://github.com/your-repo/qwen-agent.git
    cd qwen-agent
    
  2. 安装依赖

    poetry install
    
  3. 启动服务

    # 启动队列消费者
    poetry run python task_queue/consumer.py --workers 2
    
    # 启动API服务器
    poetry run python fastapi_app.py
    
  4. 测试接口

    # 运行测试脚本
    poetry run python test_simple_task.py
    

现在您可以开始使用 Qwen Agent 进行智能数据检索了!🚀