qwen_agent/routes/chat.py
朱潮 766b9becda feat(deep-agent): add skills support and improve project structure
- Add skills parameter to ChatRequest for skill file processing
- Extract and unzip skill files to robot project skills directory
- Add robot_config.json with bot_id and environment variables
- Update symlink setup to skip if ~/.deepagents already exists
- Enhance system prompt with directory access restrictions
- Refactor _get_robot_dir to handle symlink paths correctly

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-31 13:21:58 +08:00

559 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import os
import asyncio
from typing import Union, Optional
from fastapi import APIRouter, HTTPException, Header
from fastapi.responses import StreamingResponse
import logging
logger = logging.getLogger('app')
from utils import (
Message, ChatRequest, ChatResponse
)
from utils.api_models import ChatRequestV2
from utils.fastapi_utils import (
process_messages,
create_project_directory, extract_api_key_from_auth, generate_v2_auth_token, fetch_bot_config,
call_preamble_llm,
create_stream_chunk
)
from langchain_core.messages import AIMessageChunk, ToolMessage, AIMessage
from utils.settings import MAX_OUTPUT_TOKENS
from agent.agent_config import AgentConfig
from agent.deep_assistant import init_agent
router = APIRouter()
async def enhanced_generate_stream_response(
config: AgentConfig
):
"""增强的渐进式流式响应生成器 - 并发优化版本
Args:
agent: LangChain agent 对象
config: AgentConfig 对象,包含所有参数
"""
try:
# 创建输出队列和控制事件
output_queue = asyncio.Queue()
preamble_completed = asyncio.Event()
# Preamble 任务
async def preamble_task():
try:
preamble_result = await call_preamble_llm(config)
# 只有当preamble_text不为空且不为"<empty>"时才输出
if preamble_result and preamble_result.strip() and preamble_result != "<empty>":
preamble_content = f"[PREAMBLE]\n{preamble_result}\n"
chunk_data = create_stream_chunk(f"chatcmpl-preamble", config.model_name, preamble_content)
await output_queue.put(("preamble", f"data: {json.dumps(chunk_data, ensure_ascii=False)}\n\n"))
logger.info(f"Stream mode: Generated preamble text ({len(preamble_result)} chars)")
else:
logger.info("Stream mode: Skipped empty preamble text")
# 标记 preamble 完成
preamble_completed.set()
await output_queue.put(("preamble_done", None))
except Exception as e:
logger.error(f"Error generating preamble text: {e}")
# 即使出错也要标记完成,避免阻塞
preamble_completed.set()
await output_queue.put(("preamble_done", None))
# Agent 任务(准备 + 流式处理)
async def agent_task():
checkpointer = None
try:
# 开始流式处理
logger.info(f"Starting agent stream response")
chunk_id = 0
message_tag = ""
agent, checkpointer = await init_agent(config)
async for msg, metadata in agent.astream({"messages": config.messages}, stream_mode="messages", config=config.invoke_config(), max_tokens=MAX_OUTPUT_TOKENS):
new_content = ""
if isinstance(msg, AIMessageChunk):
# 处理工具调用
if msg.tool_call_chunks:
message_tag = "TOOL_CALL"
for tool_call_chunk in msg.tool_call_chunks:
if tool_call_chunk["name"]:
new_content = f"[{message_tag}] {tool_call_chunk['name']}\n"
if tool_call_chunk['args']:
new_content += tool_call_chunk['args']
# 处理文本内容
elif msg.content:
preamble_completed.set()
await output_queue.put(("preamble_done", None))
meta_message_tag = metadata.get("message_tag", "ANSWER")
if meta_message_tag != message_tag:
message_tag = meta_message_tag
new_content = f"[{meta_message_tag}]\n"
if msg.text:
new_content += msg.text
# 处理工具响应
elif isinstance(msg, ToolMessage) and config.tool_response and msg.content:
message_tag = "TOOL_RESPONSE"
new_content = f"[{message_tag}] {msg.name}\n{msg.text}\n"
# 发送内容块
if new_content:
if chunk_id == 0:
logger.info(f"Agent首个Token已生成, 开始流式输出")
chunk_id += 1
chunk_data = create_stream_chunk(f"chatcmpl-{chunk_id}", config.model_name, new_content)
await output_queue.put(("agent", f"data: {json.dumps(chunk_data, ensure_ascii=False)}\n\n"))
# 发送最终chunk
final_chunk = create_stream_chunk(f"chatcmpl-{chunk_id + 1}", config.model_name, finish_reason="stop")
await output_queue.put(("agent", f"data: {json.dumps(final_chunk, ensure_ascii=False)}\n\n"))
await output_queue.put(("agent_done", None))
except Exception as e:
logger.error(f"Error in agent task: {e}")
await output_queue.put(("agent_done", None))
# 并发执行任务
# 只有在 enable_thinking 为 True 时才执行 preamble 任务
if config.enable_thinking:
preamble_task_handle = asyncio.create_task(preamble_task())
else:
# 如果不启用 thinking创建一个空的已完成任务
preamble_task_handle = asyncio.create_task(asyncio.sleep(0))
# 直接标记 preamble 完成
preamble_completed.set()
agent_task_handle = asyncio.create_task(agent_task())
# 输出控制器:确保 preamble 先输出,然后是 agent stream
preamble_output_done = False
while True:
try:
# 设置超时避免无限等待
item_type, item_data = await asyncio.wait_for(output_queue.get(), timeout=1.0)
if item_type == "preamble":
# 立即输出 preamble 内容
if item_data:
yield item_data
preamble_output_done = True
elif item_type == "preamble_done":
# Preamble 已完成,标记并继续
preamble_output_done = True
elif item_type == "agent":
# Agent stream 内容,需要等待 preamble 输出完成
if preamble_output_done:
yield item_data
else:
# preamble 还没输出,先放回队列
await output_queue.put((item_type, item_data))
# 等待 preamble 完成
await preamble_completed.wait()
preamble_output_done = True
elif item_type == "agent_done":
# Agent stream 完成,结束循环
break
except asyncio.TimeoutError:
# 检查是否还有任务在运行
if all(task.done() for task in [preamble_task_handle, agent_task_handle]):
# 所有任务都完成了,退出循环
break
continue
# 发送结束标记
yield "data: [DONE]\n\n"
logger.info(f"Enhanced stream response completed")
except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error(f"Error in enhanced_generate_stream_response: {str(e)}")
logger.error(f"Full traceback: {error_details}")
error_data = {
"error": {
"message": f"Stream error: {str(e)}",
"type": "internal_error"
}
}
yield f"data: {json.dumps(error_data, ensure_ascii=False)}\n\n"
async def create_agent_and_generate_response(
config: AgentConfig
) -> Union[ChatResponse, StreamingResponse]:
"""创建agent并生成响应的公共逻辑
Args:
config: AgentConfig 对象,包含所有参数
"""
# 如果是流式模式,使用增强的流式响应生成器
if config.stream:
return StreamingResponse(
enhanced_generate_stream_response(config),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "Connection": "keep-alive"}
)
agent, checkpointer = await init_agent(config)
# 使用更新后的 messages
agent_responses = await agent.ainvoke({"messages": config.messages}, config=config.invoke_config(), max_tokens=MAX_OUTPUT_TOKENS)
append_messages = agent_responses["messages"][len(config.messages):]
response_text = ""
for msg in append_messages:
if isinstance(msg,AIMessage):
if len(msg.text)>0:
meta_message_tag = msg.additional_kwargs.get("message_tag", "ANSWER")
output_text = msg.text.replace("````","").replace("````","") if meta_message_tag == "THINK" else msg.text
response_text += f"[{meta_message_tag}]\n"+output_text+ "\n"
if len(msg.tool_calls)>0:
response_text += "".join([f"[TOOL_CALL] {tool['name']}\n{json.dumps(tool["args"]) if isinstance(tool["args"],dict) else tool["args"]}\n" for tool in msg.tool_calls])
elif isinstance(msg,ToolMessage) and config.tool_response:
response_text += f"[TOOL_RESPONSE] {msg.name}\n{msg.text}\n"
if len(response_text) > 0:
# 构造OpenAI格式的响应
result = ChatResponse(
choices=[{
"index": 0,
"message": {
"role": "assistant",
"content": response_text
},
"finish_reason": "stop"
}],
usage={
"prompt_tokens": sum(len(msg.get("content", "")) for msg in config.messages),
"completion_tokens": len(response_text),
"total_tokens": sum(len(msg.get("content", "")) for msg in config.messages) + len(response_text)
}
)
else:
raise HTTPException(status_code=500, detail="No response from agent")
return result
@router.post("/api/v1/chat/completions")
async def chat_completions(request: ChatRequest, authorization: Optional[str] = Header(None)):
"""
Chat completions API similar to OpenAI, supports both streaming and non-streaming
Args:
request: ChatRequest containing messages, model, optional dataset_ids list, required bot_id, system_prompt, mcp_settings, and files
authorization: Authorization header containing API key (Bearer <API_KEY>)
Returns:
Union[ChatResponse, StreamingResponse]: Chat completion response or stream
Notes:
- dataset_ids: 可选参数当提供时必须是项目ID列表单个项目也使用数组格式
- bot_id: 必需参数机器人ID
- 只有当 robot_type == "catalog_agent" 且 dataset_ids 为非空数组时才会创建机器人项目目录projects/robot/{bot_id}/
- robot_type 为其他值(包括默认的 "agent")时不创建任何目录
- dataset_ids 为空数组 []、None 或未提供时不创建任何目录
- 支持多知识库合并,自动处理文件夹重名冲突
Required Parameters:
- bot_id: str - 目标机器人ID
- messages: List[Message] - 对话消息列表
Optional Parameters:
- dataset_ids: List[str] - 源知识库项目ID列表单个项目也使用数组格式
- robot_type: str - 机器人类型,默认为 "agent"
Example:
{"bot_id": "my-bot-001", "messages": [{"role": "user", "content": "Hello"}]}
{"dataset_ids": ["project-123"], "bot_id": "my-bot-001", "messages": [{"role": "user", "content": "Hello"}]}
{"dataset_ids": ["project-123", "project-456"], "bot_id": "my-bot-002", "messages": [{"role": "user", "content": "Hello"}]}
{"dataset_ids": ["project-123"], "bot_id": "my-catalog-bot", "robot_type": "catalog_agent", "messages": [{"role": "user", "content": "Hello"}]}
"""
try:
# v1接口从Authorization header中提取API key作为模型API密钥
api_key = extract_api_key_from_auth(authorization)
# 获取bot_id必需参数
bot_id = request.bot_id
if not bot_id:
raise HTTPException(status_code=400, detail="bot_id is required")
# 创建项目目录如果有dataset_ids且不是agent类型
project_dir = create_project_directory(request.dataset_ids, bot_id, request.robot_type, request.skills)
# 收集额外参数作为 generate_cfg
exclude_fields = {'messages', 'model', 'model_server', 'dataset_ids', 'language', 'tool_response', 'system_prompt', 'mcp_settings' ,'stream', 'robot_type', 'bot_id', 'user_identifier', 'session_id', 'enable_thinking', 'skills'}
generate_cfg = {k: v for k, v in request.model_dump().items() if k not in exclude_fields}
# 处理消息
messages = process_messages(request.messages, request.language)
# 创建 AgentConfig 对象
config = AgentConfig.from_v1_request(request, api_key, project_dir, generate_cfg, messages)
# 调用公共的agent创建和响应生成逻辑
return await create_agent_and_generate_response(config)
except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error(f"Error in chat_completions: {str(e)}")
logger.error(f"Full traceback: {error_details}")
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
@router.post("/api/v1/chat/warmup")
async def chat_warmup_v1(request: ChatRequest, authorization: Optional[str] = Header(None)):
"""
预热接口 - 初始化agent但不处理消息用于后续请求的快速响应
Args:
request: ChatRequest containing configuration (messages will be ignored for warmup)
authorization: Authorization header containing API key (Bearer <API_KEY>)
Returns:
JSON response with warmup status and cache key
Required Parameters:
- bot_id: str - 目标机器人ID
Notes:
- 此接口会预先生成并缓存agent后续的chat请求可以复用缓存的agent
- messages参数在预热阶段不会被处理仅用于配置验证
- 预热的agent会基于提供的配置参数生成唯一的缓存键
"""
try:
# v1接口从Authorization header中提取API key作为模型API密钥
api_key = extract_api_key_from_auth(authorization)
# 获取bot_id必需参数
bot_id = request.bot_id
if not bot_id:
raise HTTPException(status_code=400, detail="bot_id is required")
# 创建项目目录如果有dataset_ids且不是agent类型
project_dir = create_project_directory(request.dataset_ids, bot_id, request.robot_type, request.skills)
# 收集额外参数作为 generate_cfg
exclude_fields = {'messages', 'model', 'model_server', 'dataset_ids', 'language', 'tool_response', 'system_prompt', 'mcp_settings' ,'stream', 'robot_type', 'bot_id', 'user_identifier', 'session_id', 'enable_thinking', 'skills'}
generate_cfg = {k: v for k, v in request.model_dump().items() if k not in exclude_fields}
# 创建一个空的消息列表用于预热实际消息不会在warmup中处理
empty_messages = []
# 处理消息(即使是空的)
messages = process_messages(empty_messages, request.language or "ja")
# 创建 AgentConfig 对象
config = AgentConfig.from_v1_request(request, api_key, project_dir, generate_cfg, messages)
# 预热 mcp_tools 缓存
logger.info(f"Warming up mcp_tools for bot_id: {bot_id}")
from agent.deep_assistant import get_tools_from_mcp
from agent.prompt_loader import load_mcp_settings_async
# 加载 mcp_settings
final_mcp_settings = await load_mcp_settings_async(
config.project_dir, config.mcp_settings, config.bot_id, config.robot_type
)
mcp_settings = final_mcp_settings if final_mcp_settings else []
if not isinstance(mcp_settings, list) or len(mcp_settings) == 0:
mcp_settings = []
# 预热 mcp_tools缓存逻辑已内置到 get_tools_from_mcp
mcp_tools = await get_tools_from_mcp(mcp_settings)
return {
"status": "warmed_up",
"bot_id": bot_id,
"mcp_tools_count": len(mcp_tools),
"message": "MCP tools have been cached successfully"
}
except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error(f"Error in chat_warmup_v1: {str(e)}")
logger.error(f"Full traceback: {error_details}")
raise HTTPException(status_code=500, detail=f"Warmup failed: {str(e)}")
@router.post("/api/v2/chat/warmup")
async def chat_warmup_v2(request: ChatRequestV2, authorization: Optional[str] = Header(None)):
"""
预热接口 v2 - 初始化agent但不处理消息用于后续请求的快速响应
使用与 /api/v2/chat/completions 相同的认证和配置获取方式
Args:
request: ChatRequestV2 containing essential parameters (messages will be ignored for warmup)
authorization: Authorization header for authentication (same as v2 chat endpoint)
Returns:
JSON response with warmup status and cache key
Required Parameters:
- bot_id: str - 目标机器人ID
Authentication:
- Requires valid MD5 hash token: MD5(MASTERKEY:bot_id)
- Authorization header should contain: Bearer {token}
Notes:
- 此接口会预先生成并缓存agent后续的chat请求可以复用缓存的agent
- messages参数在预热阶段不会被处理仅用于配置验证
- 预热的agent会基于从后端获取的完整配置生成唯一的缓存键
"""
try:
# 获取bot_id必需参数
bot_id = request.bot_id
if not bot_id:
raise HTTPException(status_code=400, detail="bot_id is required")
# v2接口鉴权验证与chat_completions_v2相同的认证逻辑
expected_token = generate_v2_auth_token(bot_id)
provided_token = extract_api_key_from_auth(authorization)
if not provided_token:
raise HTTPException(
status_code=401,
detail="Authorization header is required for v2 API"
)
if provided_token != expected_token:
raise HTTPException(
status_code=403,
detail=f"Invalid authentication token. Expected: {expected_token[:8]}..., Provided: {provided_token[:8]}..."
)
# 从后端API获取机器人配置使用v2的鉴权方式
bot_config = await fetch_bot_config(bot_id)
# 创建项目目录从后端配置获取dataset_ids和skills
project_dir = create_project_directory(
bot_config.get("dataset_ids", []),
bot_id,
bot_config.get("robot_type", "general_agent"),
bot_config.get("skills")
)
# 创建一个空的消息列表用于预热实际消息不会在warmup中处理
empty_messages = []
# 处理消息
messages = process_messages(empty_messages, request.language or "ja")
# 创建 AgentConfig 对象
config = AgentConfig.from_v2_request(request, bot_config, project_dir, messages)
# 预热 mcp_tools 缓存
logger.info(f"Warming up mcp_tools for bot_id: {bot_id}")
from agent.deep_assistant import get_tools_from_mcp
from agent.prompt_loader import load_mcp_settings_async
# 加载 mcp_settings
final_mcp_settings = await load_mcp_settings_async(
config.project_dir, config.mcp_settings, config.bot_id, config.robot_type
)
mcp_settings = final_mcp_settings if final_mcp_settings else []
if not isinstance(mcp_settings, list) or len(mcp_settings) == 0:
mcp_settings = []
# 预热 mcp_tools缓存逻辑已内置到 get_tools_from_mcp
mcp_tools = await get_tools_from_mcp(mcp_settings)
return {
"status": "warmed_up",
"bot_id": bot_id,
"mcp_tools_count": len(mcp_tools),
"message": "MCP tools have been cached successfully"
}
except HTTPException:
raise
except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error(f"Error in chat_warmup_v2: {str(e)}")
logger.error(f"Full traceback: {error_details}")
raise HTTPException(status_code=500, detail=f"Warmup failed: {str(e)}")
@router.post("/api/v2/chat/completions")
async def chat_completions_v2(request: ChatRequestV2, authorization: Optional[str] = Header(None)):
"""
Chat completions API v2 with simplified parameters.
Only requires messages, stream, tool_response, bot_id, and language parameters.
Other parameters are fetched from the backend bot configuration API.
Args:
request: ChatRequestV2 containing only essential parameters
authorization: Authorization header for authentication (different from v1)
Returns:
Union[ChatResponse, StreamingResponse]: Chat completion response or stream
Required Parameters:
- bot_id: str - 目标机器人ID
- messages: List[Message] - 对话消息列表
Optional Parameters:
- stream: bool - 是否流式输出默认false
- tool_response: bool - 是否包含工具响应默认false
- language: str - 回复语言,默认"ja"
Authentication:
- Requires valid MD5 hash token: MD5(MASTERKEY:bot_id)
- Authorization header should contain: Bearer {token}
- Uses MD5 hash of MASTERKEY:bot_id for backend API authentication
- Optionally uses API key from bot config for model access
"""
try:
# 获取bot_id必需参数
bot_id = request.bot_id
if not bot_id:
raise HTTPException(status_code=400, detail="bot_id is required")
# v2接口鉴权验证
expected_token = generate_v2_auth_token(bot_id)
provided_token = extract_api_key_from_auth(authorization)
if not provided_token:
raise HTTPException(
status_code=401,
detail="Authorization header is required for v2 API"
)
if provided_token != expected_token:
raise HTTPException(
status_code=403,
detail=f"Invalid authentication token. Expected: {expected_token[:8]}..., Provided: {provided_token[:8]}..."
)
# 从后端API获取机器人配置使用v2的鉴权方式
bot_config = await fetch_bot_config(bot_id)
# 创建项目目录从后端配置获取dataset_ids和skills
project_dir = create_project_directory(
bot_config.get("dataset_ids", []),
bot_id,
bot_config.get("robot_type", "general_agent"),
bot_config.get("skills")
)
# 处理消息
messages = process_messages(request.messages, request.language)
# 创建 AgentConfig 对象
config = AgentConfig.from_v2_request(request, bot_config, project_dir, messages)
# 调用公共的agent创建和响应生成逻辑
return await create_agent_and_generate_response(config)
except HTTPException:
raise
except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error(f"Error in chat_completions_v2: {str(e)}")
logger.error(f"Full traceback: {error_details}")
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")