qwen_agent/routes/chat.py
2025-12-03 17:53:18 +08:00

569 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import os
import asyncio
from typing import Union, Optional
from fastapi import APIRouter, HTTPException, Header
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import logging
logger = logging.getLogger('app')
from utils import (
Message, ChatRequest, ChatResponse
)
from agent.sharded_agent_manager import init_global_sharded_agent_manager
from utils.api_models import ChatRequestV2
from agent.prompt_loader import load_guideline_prompt
from utils.fastapi_utils import (
process_messages, extract_block_from_system_prompt, format_messages_to_chat_history,
create_project_directory, extract_api_key_from_auth, generate_v2_auth_token, fetch_bot_config,
_get_optimal_batch_size, process_guideline, get_content_from_messages, call_preamble_llm, get_preamble_text, get_language_text,
create_stream_chunk
)
router = APIRouter()
# 初始化全局助手管理器
agent_manager = init_global_sharded_agent_manager(
max_cached_agents=int(os.getenv("MAX_CACHED_AGENTS", "50")),
shard_count=int(os.getenv("SHARD_COUNT", "16"))
)
def get_user_last_message_content(messages: list) -> Optional[dict]:
"""获取消息列表中的最后一条消息"""
if not messages or len(messages) == 0:
return ""
last_message = messages[-1]
if last_message and last_message.get('role') == 'user':
return last_message["content"]
return ""
def append_user_last_message(messages: list, content: str) -> bool:
"""向最后一条用户消息追加内容
Args:
messages: 消息列表
content: 要追加的内容
condition: 可选条件,如果提供则检查消息角色是否匹配此条件
Returns:
bool: 是否成功追加内容
"""
if not messages or len(messages) == 0:
return messages
last_message = messages[-1]
if last_message and last_message.get('role') == 'user':
messages[-1]['content'] += content
return messages
async def process_guidelines_and_terms(
bot_id: str,
api_key: str,
model_name: str,
model_server: str,
system_prompt: str,
messages: list,
agent_manager,
project_dir: Optional[str],
generate_cfg: Optional[dict],
language: str,
mcp_settings: Optional[list],
robot_type: str,
user_identifier: Optional[str]
) -> tuple:
"""
公共函数处理guideline分析和terms处理返回agent和analysis结果
Returns:
tuple: (agent, processed_system_prompt, guideline_reasoning, terms_analysis)
"""
# 提取system_prompt中的guideline和terms
processed_system_prompt, guidelines, tools, scenarios, terms_list = extract_block_from_system_prompt(system_prompt)
# # 处理terms
terms_analysis = ""
if terms_list:
logger.info(f"Processing terms: {len(terms_list)} terms")
try:
from embedding.embedding import process_terms_with_embedding
query_text = get_user_last_message_content(messages)
terms_analysis = process_terms_with_embedding(terms_list, bot_id, query_text)
if terms_analysis:
processed_system_prompt = processed_system_prompt.replace("{terms}", terms_analysis)
logger.info(f"Terms analysis completed: {len(terms_analysis)} chars")
except Exception as e:
logger.error(f"Error processing terms with embedding: {e}")
terms_analysis = ""
else:
# 当terms_list为空时删除对应的pkl缓存文件
try:
import os
cache_file = f"projects/cache/{bot_id}_terms.pkl"
if os.path.exists(cache_file):
os.remove(cache_file)
logger.info(f"Removed empty terms cache file: {cache_file}")
except Exception as e:
logger.error(f"Error removing terms cache file: {e}")
# 创建所有任务
tasks = []
# 添加agent创建任务
agent_task = agent_manager.get_or_create_agent(
bot_id=bot_id,
project_dir=project_dir,
model_name=model_name,
api_key=api_key,
model_server=model_server,
generate_cfg=generate_cfg,
language=language,
system_prompt=processed_system_prompt,
mcp_settings=mcp_settings,
robot_type=robot_type,
user_identifier=user_identifier
)
tasks.append(agent_task)
guideline_prompt = ""
if robot_type == "general_agent":
if not guidelines:
guidelines = """
1. General Inquiries
Condition: User inquiries about products, policies, troubleshooting, factual questions, etc.
Action: Priority given to invoking the 【Knowledge Base Retrieval】 tool to query the knowledge base.
2.Social Dialogue
Condition: User intent involves small talk, greetings, expressions of thanks, compliments, or other non-substantive conversations.
Action: Provide concise, friendly, and personified natural responses.
"""
if not tools:
tools = """
- **Knowledge Base Retrieval**: For knowledge queries/other inquiries, prioritize searching the knowledge base → rag_retrieve-rag_retrieve
"""
if guidelines:
chat_history = format_messages_to_chat_history(messages)
guideline_prompt = await load_guideline_prompt(chat_history, guidelines, tools, scenarios, terms_analysis, language, user_identifier)
guideline_task = process_guideline(
chat_history=chat_history,
guideline_prompt=guideline_prompt,
model_name=model_name,
api_key=api_key,
model_server=model_server
)
tasks.append(guideline_task)
# 并发执行所有任务
all_results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理结果
agent = all_results[0] if len(all_results) >0 else None # agent创建的结果
guideline_reasoning = all_results[1] if len(all_results) >1 else ""
if guideline_prompt or guideline_reasoning:
logger.info("Guideline Prompt: %s, Reasoning: %s",
guideline_prompt.replace('\n', '\\n') if guideline_prompt else "None",
guideline_reasoning.replace('\n', '\\n') if guideline_reasoning else "None")
logger.info("System Prompt: %s", processed_system_prompt.replace('\n', '\\n'))
return agent, processed_system_prompt, guideline_reasoning
async def enhanced_generate_stream_response(
agent_manager,
bot_id: str,
api_key: str,
messages: list,
tool_response: bool,
model_name: str,
model_server: str,
language: str,
system_prompt: str,
mcp_settings: Optional[list],
robot_type: str,
project_dir: Optional[str],
generate_cfg: Optional[dict],
user_identifier: Optional[str]
):
"""增强的渐进式流式响应生成器"""
try:
# 第一阶段并行启动preamble_text生成和第二阶段处理
query_text = get_user_last_message_content(messages)
chat_history = format_messages_to_chat_history(messages)
# 创建preamble_text生成任务
preamble_text, system_prompt = get_preamble_text(language, system_prompt)
preamble_task = asyncio.create_task(
call_preamble_llm(chat_history, query_text, preamble_text, language, model_name, api_key, model_server)
)
# 创建guideline分析和agent创建任务
guidelines_task = asyncio.create_task(
process_guidelines_and_terms(
bot_id=bot_id,
api_key=api_key,
model_name=model_name,
model_server=model_server,
system_prompt=system_prompt,
messages=messages,
agent_manager=agent_manager,
project_dir=project_dir,
generate_cfg=generate_cfg,
language=language,
mcp_settings=mcp_settings,
robot_type=robot_type,
user_identifier=user_identifier
)
)
# 等待preamble_text任务完成
try:
preamble_text = await preamble_task
# 只有当preamble_text不为空且不为"<empty>"时才输出
if preamble_text and preamble_text.strip() and preamble_text != "<empty>":
preamble_content = get_content_from_messages([{"role": "preamble","content": preamble_text + "\n"}], tool_response=tool_response)
chunk_data = create_stream_chunk(f"chatcmpl-preamble", model_name, preamble_content)
yield f"data: {json.dumps(chunk_data, ensure_ascii=False)}\n\n"
logger.info(f"Stream mode: Generated preamble text ({len(preamble_text)} chars)")
else:
logger.info("Stream mode: Skipped empty preamble text")
except Exception as e:
logger.error(f"Error generating preamble text: {e}")
# 等待guideline分析任务完成
agent, system_prompt, guideline_reasoning = await guidelines_task
# 立即发送guideline_reasoning
if guideline_reasoning:
guideline_content = get_content_from_messages([{"role": "assistant","reasoning_content": guideline_reasoning+ "\n"}], tool_response=tool_response)
chunk_data = create_stream_chunk(f"chatcmpl-guideline", model_name, guideline_content)
yield f"data: {json.dumps(chunk_data, ensure_ascii=False)}\n\n"
# 准备最终的消息
final_messages = messages.copy()
final_messages = append_user_last_message(final_messages, f"\n\nlanguage:{get_language_text(language)}")
if guideline_reasoning:
# 用###分割guideline_reasoning取最后一段作为Guidelines
guidelines_text = guideline_reasoning.split('###')[-1].strip() if guideline_reasoning else ""
final_messages = append_user_last_message(final_messages, f"\n\nGuidelines:\n{guidelines_text}\nPlease follow these guidelines step by step.")
# 第三阶段agent响应流式传输
logger.info(f"Starting agent stream response")
accumulated_content = ""
chunk_id = 0
for response in agent.run(messages=final_messages):
previous_content = accumulated_content
accumulated_content = get_content_from_messages(response, tool_response=tool_response)
# 计算新增的内容
if accumulated_content.startswith(previous_content):
new_content = accumulated_content[len(previous_content):]
else:
new_content = accumulated_content
previous_content = ""
# 只有当有新内容时才发送chunk
if new_content:
if chunk_id == 0:
logger.info(f"Agent首个Token已生成, 开始流式输出")
chunk_id += 1
chunk_data = create_stream_chunk(f"chatcmpl-{chunk_id}", model_name, new_content)
yield f"data: {json.dumps(chunk_data, ensure_ascii=False)}\n\n"
final_chunk = create_stream_chunk(f"chatcmpl-{chunk_id + 1}", model_name, finish_reason="stop")
yield f"data: {json.dumps(final_chunk, ensure_ascii=False)}\n\n"
# 发送结束标记
yield "data: [DONE]\n\n"
logger.info(f"Enhanced stream response completed, total chunks: {chunk_id}")
except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error(f"Error in enhanced_generate_stream_response: {str(e)}")
logger.error(f"Full traceback: {error_details}")
error_data = {
"error": {
"message": f"Stream error: {str(e)}",
"type": "internal_error"
}
}
yield f"data: {json.dumps(error_data, ensure_ascii=False)}\n\n"
async def create_agent_and_generate_response(
bot_id: str,
api_key: str,
messages: list,
stream: bool,
tool_response: bool,
model_name: str,
model_server: str,
language: str,
system_prompt: Optional[str],
mcp_settings: Optional[list],
robot_type: str,
project_dir: Optional[str] = None,
generate_cfg: Optional[dict] = None,
user_identifier: Optional[str] = None
) -> Union[ChatResponse, StreamingResponse]:
"""创建agent并生成响应的公共逻辑"""
if generate_cfg is None:
generate_cfg = {}
# 如果是流式模式,使用增强的流式响应生成器
if stream:
return StreamingResponse(
enhanced_generate_stream_response(
agent_manager=agent_manager,
bot_id=bot_id,
api_key=api_key,
messages=messages,
tool_response=tool_response,
model_name=model_name,
model_server=model_server,
language=language,
system_prompt=system_prompt or "",
mcp_settings=mcp_settings,
robot_type=robot_type,
project_dir=project_dir,
generate_cfg=generate_cfg,
user_identifier=user_identifier
),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "Connection": "keep-alive"}
)
_, system_prompt = get_preamble_text(language, system_prompt)
# 使用公共函数处理所有逻辑
agent, system_prompt, guideline_reasoning = await process_guidelines_and_terms(
bot_id=bot_id,
api_key=api_key,
model_name=model_name,
model_server=model_server,
system_prompt=system_prompt,
messages=messages,
agent_manager=agent_manager,
project_dir=project_dir,
generate_cfg=generate_cfg,
language=language,
mcp_settings=mcp_settings,
robot_type=robot_type,
user_identifier=user_identifier
)
# 准备最终的消息
final_messages = messages.copy()
final_messages = append_user_last_message(final_messages, f"\n\nlanguage:{get_language_text(language)}")
pre_message_list = []
if guideline_reasoning:
# 用###分割guideline_reasoning取最后一段作为Guidelines
guidelines_text = guideline_reasoning.split('###')[-1].strip() if guideline_reasoning else ""
final_messages = append_user_last_message(final_messages, f"\n\nGuidelines:\n{guidelines_text}\nPlease follow these guidelines step by step.")
pre_message_list.append({"role": "assistant","reasoning_content": guideline_reasoning+ "\n"})
# 非流式响应
agent_responses = agent.run_nonstream(final_messages)
final_responses = pre_message_list + agent_responses
if final_responses and len(final_responses) > 0:
# 使用 get_content_from_messages 处理响应,支持 tool_response 参数
content = get_content_from_messages(final_responses, tool_response=tool_response)
# 构造OpenAI格式的响应
return ChatResponse(
choices=[{
"index": 0,
"message": {
"role": "assistant",
"content": content
},
"finish_reason": "stop"
}],
usage={
"prompt_tokens": sum(len(msg.get("content", "")) for msg in messages),
"completion_tokens": len(content),
"total_tokens": sum(len(msg.get("content", "")) for msg in messages) + len(content)
}
)
else:
raise HTTPException(status_code=500, detail="No response from agent")
@router.post("/api/v1/chat/completions")
async def chat_completions(request: ChatRequest, authorization: Optional[str] = Header(None)):
"""
Chat completions API similar to OpenAI, supports both streaming and non-streaming
Args:
request: ChatRequest containing messages, model, optional dataset_ids list, required bot_id, system_prompt, mcp_settings, and files
authorization: Authorization header containing API key (Bearer <API_KEY>)
Returns:
Union[ChatResponse, StreamingResponse]: Chat completion response or stream
Notes:
- dataset_ids: 可选参数当提供时必须是项目ID列表单个项目也使用数组格式
- bot_id: 必需参数机器人ID
- 只有当 robot_type == "catalog_agent" 且 dataset_ids 为非空数组时才会创建机器人项目目录projects/robot/{bot_id}/
- robot_type 为其他值(包括默认的 "agent")时不创建任何目录
- dataset_ids 为空数组 []、None 或未提供时不创建任何目录
- 支持多知识库合并,自动处理文件夹重名冲突
Required Parameters:
- bot_id: str - 目标机器人ID
- messages: List[Message] - 对话消息列表
Optional Parameters:
- dataset_ids: List[str] - 源知识库项目ID列表单个项目也使用数组格式
- robot_type: str - 机器人类型,默认为 "agent"
Example:
{"bot_id": "my-bot-001", "messages": [{"role": "user", "content": "Hello"}]}
{"dataset_ids": ["project-123"], "bot_id": "my-bot-001", "messages": [{"role": "user", "content": "Hello"}]}
{"dataset_ids": ["project-123", "project-456"], "bot_id": "my-bot-002", "messages": [{"role": "user", "content": "Hello"}]}
{"dataset_ids": ["project-123"], "bot_id": "my-catalog-bot", "robot_type": "catalog_agent", "messages": [{"role": "user", "content": "Hello"}]}
"""
try:
# v1接口从Authorization header中提取API key作为模型API密钥
api_key = extract_api_key_from_auth(authorization)
# 获取bot_id必需参数
bot_id = request.bot_id
if not bot_id:
raise HTTPException(status_code=400, detail="bot_id is required")
# 创建项目目录如果有dataset_ids且不是agent类型
project_dir = create_project_directory(request.dataset_ids, bot_id, request.robot_type)
# 收集额外参数作为 generate_cfg
exclude_fields = {'messages', 'model', 'model_server', 'dataset_ids', 'language', 'tool_response', 'system_prompt', 'mcp_settings' ,'stream', 'robot_type', 'bot_id', 'user_identifier'}
generate_cfg = {k: v for k, v in request.model_dump().items() if k not in exclude_fields}
# 处理消息
messages = process_messages(request.messages, request.language)
# 调用公共的agent创建和响应生成逻辑
return await create_agent_and_generate_response(
bot_id=bot_id,
api_key=api_key,
messages=messages,
stream=request.stream,
tool_response=True,
model_name=request.model,
model_server=request.model_server,
language=request.language,
system_prompt=request.system_prompt,
mcp_settings=request.mcp_settings,
robot_type=request.robot_type,
project_dir=project_dir,
generate_cfg=generate_cfg,
user_identifier=request.user_identifier
)
except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error(f"Error in chat_completions: {str(e)}")
logger.error(f"Full traceback: {error_details}")
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
@router.post("/api/v2/chat/completions")
async def chat_completions_v2(request: ChatRequestV2, authorization: Optional[str] = Header(None)):
"""
Chat completions API v2 with simplified parameters.
Only requires messages, stream, tool_response, bot_id, and language parameters.
Other parameters are fetched from the backend bot configuration API.
Args:
request: ChatRequestV2 containing only essential parameters
authorization: Authorization header for authentication (different from v1)
Returns:
Union[ChatResponse, StreamingResponse]: Chat completion response or stream
Required Parameters:
- bot_id: str - 目标机器人ID
- messages: List[Message] - 对话消息列表
Optional Parameters:
- stream: bool - 是否流式输出默认false
- tool_response: bool - 是否包含工具响应默认false
- language: str - 回复语言,默认"ja"
Authentication:
- Requires valid MD5 hash token: MD5(MASTERKEY:bot_id)
- Authorization header should contain: Bearer {token}
- Uses MD5 hash of MASTERKEY:bot_id for backend API authentication
- Optionally uses API key from bot config for model access
"""
try:
# 获取bot_id必需参数
bot_id = request.bot_id
if not bot_id:
raise HTTPException(status_code=400, detail="bot_id is required")
# v2接口鉴权验证
expected_token = generate_v2_auth_token(bot_id)
provided_token = extract_api_key_from_auth(authorization)
if not provided_token:
raise HTTPException(
status_code=401,
detail="Authorization header is required for v2 API"
)
if provided_token != expected_token:
raise HTTPException(
status_code=403,
detail=f"Invalid authentication token. Expected: {expected_token[:8]}..., Provided: {provided_token[:8]}..."
)
# 从后端API获取机器人配置使用v2的鉴权方式
bot_config = await fetch_bot_config(bot_id)
# v2接口API密钥优先从后端配置获取其次才从Authorization header获取
# 注意这里的Authorization header已经用于鉴权不再作为API key使用
api_key = bot_config.get("api_key")
# 创建项目目录从后端配置获取dataset_ids
project_dir = create_project_directory(
bot_config.get("dataset_ids", []),
bot_id,
bot_config.get("robot_type", "general_agent")
)
# 处理消息
messages = process_messages(request.messages, request.language)
# 调用公共的agent创建和响应生成逻辑
return await create_agent_and_generate_response(
bot_id=bot_id,
api_key=api_key,
messages=messages,
stream=request.stream,
tool_response=request.tool_response,
model_name=bot_config.get("model", "qwen/qwen3-next-80b-a3b-instruct"),
model_server=bot_config.get("model_server", ""),
language=request.language or bot_config.get("language", "ja"),
system_prompt=bot_config.get("system_prompt"),
mcp_settings=bot_config.get("mcp_settings", []),
robot_type=bot_config.get("robot_type", "general_agent"),
project_dir=project_dir,
generate_cfg={}, # v2接口不传递额外的generate_cfg
user_identifier=request.user_identifier
)
except HTTPException:
raise
except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error(f"Error in chat_completions_v2: {str(e)}")
logger.error(f"Full traceback: {error_details}")
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")