qwen_agent/routes/chat.py
2026-02-04 17:39:51 +08:00

797 lines
32 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import os
import asyncio
from typing import Union, Optional, Any, List, Dict
from fastapi import APIRouter, HTTPException, Header
from fastapi.responses import StreamingResponse
import logging
logger = logging.getLogger('app')
from utils import (
Message, ChatRequest, ChatResponse, BatchSaveChatRequest, BatchSaveChatResponse
)
from utils.api_models import ChatRequestV2
from utils.fastapi_utils import (
process_messages,
create_project_directory, extract_api_key_from_auth, generate_v2_auth_token, fetch_bot_config,
call_preamble_llm,
create_stream_chunk
)
from langchain_core.messages import AIMessageChunk, ToolMessage, AIMessage, HumanMessage
from utils.settings import MAX_OUTPUT_TOKENS
from agent.agent_config import AgentConfig
from agent.deep_assistant import init_agent
router = APIRouter()
async def enhanced_generate_stream_response(
config: AgentConfig
):
"""增强的渐进式流式响应生成器 - 并发优化版本
Args:
agent: LangChain agent 对象
config: AgentConfig 对象,包含所有参数
"""
# 用于收集完整的响应内容,用于保存到数据库
full_response_content = []
try:
# 创建输出队列和控制事件
output_queue = asyncio.Queue()
preamble_completed = asyncio.Event()
# 在流式开始前保存用户消息
if config.session_id:
asyncio.create_task(_save_user_messages(config))
# Preamble 任务
async def preamble_task():
try:
preamble_result = await call_preamble_llm(config)
# 只有当preamble_text不为空且不为"<empty>"时才输出
if preamble_result and preamble_result.strip() and preamble_result != "<empty>":
preamble_content = f"[PREAMBLE]\n{preamble_result}\n"
chunk_data = create_stream_chunk(f"chatcmpl-preamble", config.model_name, preamble_content)
await output_queue.put(("preamble", f"data: {json.dumps(chunk_data, ensure_ascii=False)}\n\n"))
logger.info(f"Stream mode: Generated preamble text ({len(preamble_result)} chars)")
else:
logger.info("Stream mode: Skipped empty preamble text")
# 标记 preamble 完成
preamble_completed.set()
await output_queue.put(("preamble_done", None))
except Exception as e:
logger.error(f"Error generating preamble text: {e}")
# 即使出错也要标记完成,避免阻塞
preamble_completed.set()
await output_queue.put(("preamble_done", None))
# Agent 任务(准备 + 流式处理)
async def agent_task():
checkpointer = None
try:
# 开始流式处理
logger.info(f"Starting agent stream response")
chunk_id = 0
message_tag = ""
agent, checkpointer = await init_agent(config)
async for msg, metadata in agent.astream({"messages": config.messages}, stream_mode="messages", config=config.invoke_config(), max_tokens=MAX_OUTPUT_TOKENS):
new_content = ""
if isinstance(msg, AIMessageChunk):
# 处理工具调用
if msg.tool_call_chunks:
message_tag = "TOOL_CALL"
if config.tool_response:
for tool_call_chunk in msg.tool_call_chunks:
if tool_call_chunk["name"]:
new_content = f"[{message_tag}] {tool_call_chunk['name']}\n"
if tool_call_chunk['args']:
new_content += tool_call_chunk['args']
# 处理文本内容
elif msg.content:
preamble_completed.set()
await output_queue.put(("preamble_done", None))
meta_message_tag = metadata.get("message_tag", "ANSWER")
# SUMMARY 不输出内容
if meta_message_tag == "SUMMARY":
continue
if meta_message_tag != message_tag:
message_tag = meta_message_tag
new_content = f"[{meta_message_tag}]\n"
if msg.text:
new_content += msg.text
# 处理工具响应
elif isinstance(msg, ToolMessage) and msg.content:
message_tag = "TOOL_RESPONSE"
if config.tool_response:
new_content = f"[{message_tag}] {msg.name}\n{msg.text}\n"
# 收集完整内容
if new_content:
full_response_content.append(new_content)
# 发送内容块
if chunk_id == 0:
logger.info(f"Agent首个Token已生成, 开始流式输出")
chunk_id += 1
chunk_data = create_stream_chunk(f"chatcmpl-{chunk_id}", config.model_name, new_content)
await output_queue.put(("agent", f"data: {json.dumps(chunk_data, ensure_ascii=False)}\n\n"))
# 发送最终chunk
final_chunk = create_stream_chunk(f"chatcmpl-{chunk_id + 1}", config.model_name, finish_reason="stop")
await output_queue.put(("agent", f"data: {json.dumps(final_chunk, ensure_ascii=False)}\n\n"))
await output_queue.put(("agent_done", None))
except Exception as e:
logger.error(f"Error in agent task: {e}")
# 发送错误信息给客户端
await output_queue.put(("agent", f'data: {{"error": "{str(e)}"}}\n\n'))
# 发送完成信号,确保输出控制器能正常结束
await output_queue.put(("agent_done", None))
# 并发执行任务
# 只有在 enable_thinking 为 True 时才执行 preamble 任务
if config.enable_thinking:
preamble_task_handle = asyncio.create_task(preamble_task())
else:
# 如果不启用 thinking创建一个空的已完成任务
preamble_task_handle = asyncio.create_task(asyncio.sleep(0))
# 直接标记 preamble 完成
preamble_completed.set()
agent_task_handle = asyncio.create_task(agent_task())
# 输出控制器:确保 preamble 先输出,然后是 agent stream
preamble_output_done = False
while True:
try:
# 设置超时避免无限等待
item_type, item_data = await asyncio.wait_for(output_queue.get(), timeout=1.0)
if item_type == "preamble":
# 立即输出 preamble 内容
if item_data:
yield item_data
preamble_output_done = True
elif item_type == "preamble_done":
# Preamble 已完成,标记并继续
preamble_output_done = True
elif item_type == "agent":
# Agent stream 内容,需要等待 preamble 输出完成
if preamble_output_done:
yield item_data
else:
# preamble 还没输出,先放回队列
await output_queue.put((item_type, item_data))
# 等待 preamble 完成
await preamble_completed.wait()
preamble_output_done = True
elif item_type == "agent_done":
# Agent stream 完成,结束循环
break
except asyncio.TimeoutError:
# 检查是否还有任务在运行
if all(task.done() for task in [preamble_task_handle, agent_task_handle]):
# 所有任务都完成了,退出循环
break
continue
# 发送结束标记
yield "data: [DONE]\n\n"
logger.info(f"Enhanced stream response completed")
# 流式结束后保存 AI 响应
if full_response_content and config.session_id:
asyncio.create_task(_save_assistant_response(config, "".join(full_response_content)))
except Exception as e:
logger.error(f"Error in enhanced_generate_stream_response: {e}")
yield f'data: {{"error": "{str(e)}"}}\n\n'
yield "data: [DONE]\n\n"
async def create_agent_and_generate_response(
config: AgentConfig
) -> Union[ChatResponse, StreamingResponse]:
"""创建agent并生成响应的公共逻辑
Args:
config: AgentConfig 对象,包含所有参数
"""
# 如果是流式模式,使用增强的流式响应生成器
if config.stream:
return StreamingResponse(
enhanced_generate_stream_response(config),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "Connection": "keep-alive"}
)
agent, checkpointer = await init_agent(config)
# 使用更新后的 messages
agent_responses = await agent.ainvoke({"messages": config.messages}, config=config.invoke_config(), max_tokens=MAX_OUTPUT_TOKENS)
# 从后往前找第一个 HumanMessage之后的内容都给 append_messages
all_messages = agent_responses["messages"]
first_human_idx = None
for i in range(len(all_messages) - 1, -1, -1):
if isinstance(all_messages[i], HumanMessage):
first_human_idx = i
break
if first_human_idx is not None:
append_messages = all_messages[first_human_idx + 1:]
else:
# 如果没找到 HumanMessage取所有消息
append_messages = all_messages
response_text = ""
for msg in append_messages:
if isinstance(msg,AIMessage):
if len(msg.text)>0:
meta_message_tag = msg.additional_kwargs.get("message_tag", "ANSWER")
if meta_message_tag == "SUMMARY":
continue
output_text = msg.text.replace("````","").replace("````","") if meta_message_tag == "THINK" else msg.text
response_text += f"[{meta_message_tag}]\n"+output_text+ "\n"
if len(msg.tool_calls)>0 and config.tool_response:
response_text += "".join([f"[TOOL_CALL] {tool['name']}\n{json.dumps(tool["args"]) if isinstance(tool["args"],dict) else tool["args"]}\n" for tool in msg.tool_calls])
elif isinstance(msg,ToolMessage) and config.tool_response:
response_text += f"[TOOL_RESPONSE] {msg.name}\n{msg.text}\n"
if len(response_text) > 0:
# 构造OpenAI格式的响应
result = ChatResponse(
choices=[{
"index": 0,
"message": {
"role": "assistant",
"content": response_text
},
"finish_reason": "stop"
}],
usage={
"prompt_tokens": sum(len(msg.get("content", "")) for msg in config.messages),
"completion_tokens": len(response_text),
"total_tokens": sum(len(msg.get("content", "")) for msg in config.messages) + len(response_text)
}
)
# 保存聊天历史到数据库(与流式接口保持一致的逻辑)
await _save_user_messages(config)
await _save_assistant_response(config, response_text)
else:
raise HTTPException(status_code=500, detail="No response from agent")
return result
async def _save_user_messages(config: AgentConfig) -> None:
"""
保存最后一条用户消息(用于流式和非流式接口)
Args:
config: AgentConfig 对象
"""
# 只有在 session_id 存在时才保存
if not config.session_id:
return
try:
from agent.chat_history_manager import get_chat_history_manager
manager = get_chat_history_manager()
# 只保存最后一条 user 消息
for msg in reversed(config.messages):
if isinstance(msg, dict):
role = msg.get("role", "")
content = msg.get("content", "")
if role == "user" and content:
await manager.manager.save_message(
session_id=config.session_id,
role=role,
content=content,
bot_id=config.bot_id,
user_identifier=config.user_identifier
)
break # 只保存最后一条,然后退出
logger.debug(f"Saved last user message for session_id={config.session_id}")
except Exception as e:
# 保存失败不影响主流程
logger.error(f"Failed to save user messages: {e}")
async def _save_assistant_response(config: AgentConfig, assistant_response: str) -> None:
"""
保存 AI 助手的响应(用于流式和非流式接口)
Args:
config: AgentConfig 对象
assistant_response: AI 助手的响应内容
"""
# 只有在 session_id 存在时才保存
if not config.session_id:
return
if not assistant_response:
return
try:
from agent.chat_history_manager import get_chat_history_manager
manager = get_chat_history_manager()
# 保存 AI 助手的响应
await manager.manager.save_message(
session_id=config.session_id,
role="assistant",
content=assistant_response,
bot_id=config.bot_id,
user_identifier=config.user_identifier
)
logger.debug(f"Saved assistant response for session_id={config.session_id}")
except Exception as e:
# 保存失败不影响主流程
logger.error(f"Failed to save assistant response: {e}")
@router.post("/api/v1/chat/completions")
async def chat_completions(request: ChatRequest, authorization: Optional[str] = Header(None)):
"""
Chat completions API similar to OpenAI, supports both streaming and non-streaming
Args:
request: ChatRequest containing messages, model, optional dataset_ids list, required bot_id, system_prompt, mcp_settings, and files
authorization: Authorization header containing API key (Bearer <API_KEY>)
Returns:
Union[ChatResponse, StreamingResponse]: Chat completion response or stream
Notes:
- dataset_ids: 可选参数当提供时必须是项目ID列表单个项目也使用数组格式
- bot_id: 必需参数机器人ID
- 只有当 robot_type == "catalog_agent" 且 dataset_ids 为非空数组时才会创建机器人项目目录projects/robot/{bot_id}/
- robot_type 为其他值(包括默认的 "agent")时不创建任何目录
- dataset_ids 为空数组 []、None 或未提供时不创建任何目录
- 支持多知识库合并,自动处理文件夹重名冲突
Required Parameters:
- bot_id: str - 目标机器人ID
- messages: List[Message] - 对话消息列表
Optional Parameters:
- dataset_ids: List[str] - 源知识库项目ID列表单个项目也使用数组格式
- robot_type: str - 机器人类型,默认为 "agent"
Example:
{"bot_id": "my-bot-001", "messages": [{"role": "user", "content": "Hello"}]}
{"dataset_ids": ["project-123"], "bot_id": "my-bot-001", "messages": [{"role": "user", "content": "Hello"}]}
{"dataset_ids": ["project-123", "project-456"], "bot_id": "my-bot-002", "messages": [{"role": "user", "content": "Hello"}]}
{"dataset_ids": ["project-123"], "bot_id": "my-catalog-bot", "robot_type": "catalog_agent", "messages": [{"role": "user", "content": "Hello"}]}
"""
try:
# v1接口从Authorization header中提取API key作为模型API密钥
api_key = extract_api_key_from_auth(authorization)
# 获取bot_id必需参数
bot_id = request.bot_id
if not bot_id:
raise HTTPException(status_code=400, detail="bot_id is required")
# 创建项目目录如果有dataset_ids且不是agent类型
project_dir = create_project_directory(request.dataset_ids, bot_id, request.robot_type, request.skills)
# 收集额外参数作为 generate_cfg
exclude_fields = {'messages', 'model', 'model_server', 'dataset_ids', 'language', 'tool_response', 'system_prompt', 'mcp_settings' ,'stream', 'robot_type', 'bot_id', 'user_identifier', 'session_id', 'enable_thinking', 'skills', 'enable_memory'}
generate_cfg = {k: v for k, v in request.model_dump().items() if k not in exclude_fields}
# 处理消息
messages = process_messages(request.messages, request.language)
# 创建 AgentConfig 对象
config = await AgentConfig.from_v1_request(request, api_key, project_dir, generate_cfg, messages)
# 调用公共的agent创建和响应生成逻辑
return await create_agent_and_generate_response(config)
except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error(f"Error in chat_completions: {str(e)}")
logger.error(f"Full traceback: {error_details}")
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
@router.post("/api/v1/chat/warmup")
async def chat_warmup_v1(request: ChatRequest, authorization: Optional[str] = Header(None)):
"""
预热接口 - 初始化agent但不处理消息用于后续请求的快速响应
Args:
request: ChatRequest containing configuration (messages will be ignored for warmup)
authorization: Authorization header containing API key (Bearer <API_KEY>)
Returns:
JSON response with warmup status and cache key
Required Parameters:
- bot_id: str - 目标机器人ID
Notes:
- 此接口会预先生成并缓存agent后续的chat请求可以复用缓存的agent
- messages参数在预热阶段不会被处理仅用于配置验证
- 预热的agent会基于提供的配置参数生成唯一的缓存键
"""
try:
# v1接口从Authorization header中提取API key作为模型API密钥
api_key = extract_api_key_from_auth(authorization)
# 获取bot_id必需参数
bot_id = request.bot_id
if not bot_id:
raise HTTPException(status_code=400, detail="bot_id is required")
# 创建项目目录如果有dataset_ids且不是agent类型
project_dir = create_project_directory(request.dataset_ids, bot_id, request.robot_type, request.skills)
# 收集额外参数作为 generate_cfg
exclude_fields = {'messages', 'model', 'model_server', 'dataset_ids', 'language', 'tool_response', 'system_prompt', 'mcp_settings' ,'stream', 'robot_type', 'bot_id', 'user_identifier', 'session_id', 'enable_thinking', 'skills', 'enable_memory'}
generate_cfg = {k: v for k, v in request.model_dump().items() if k not in exclude_fields}
# 创建一个空的消息列表用于预热实际消息不会在warmup中处理
empty_messages = []
# 处理消息(即使是空的)
messages = process_messages(empty_messages, request.language or "ja")
# 创建 AgentConfig 对象
config = await AgentConfig.from_v1_request(request, api_key, project_dir, generate_cfg, messages)
# 预热 mcp_tools 缓存
logger.info(f"Warming up mcp_tools for bot_id: {bot_id}")
from agent.deep_assistant import get_tools_from_mcp
from agent.prompt_loader import load_mcp_settings_async
# 加载 mcp_settings
final_mcp_settings = await load_mcp_settings_async(
config.project_dir, config.mcp_settings, config.bot_id, config.robot_type
)
mcp_settings = final_mcp_settings if final_mcp_settings else []
if not isinstance(mcp_settings, list) or len(mcp_settings) == 0:
mcp_settings = []
# 预热 mcp_tools缓存逻辑已内置到 get_tools_from_mcp
mcp_tools = await get_tools_from_mcp(mcp_settings)
return {
"status": "warmed_up",
"bot_id": bot_id,
"mcp_tools_count": len(mcp_tools),
"message": "MCP tools have been cached successfully"
}
except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error(f"Error in chat_warmup_v1: {str(e)}")
logger.error(f"Full traceback: {error_details}")
raise HTTPException(status_code=500, detail=f"Warmup failed: {str(e)}")
@router.post("/api/v2/chat/warmup")
async def chat_warmup_v2(request: ChatRequestV2, authorization: Optional[str] = Header(None)):
"""
预热接口 v2 - 初始化agent但不处理消息用于后续请求的快速响应
使用与 /api/v2/chat/completions 相同的认证和配置获取方式
Args:
request: ChatRequestV2 containing essential parameters (messages will be ignored for warmup)
authorization: Authorization header for authentication (same as v2 chat endpoint)
Returns:
JSON response with warmup status and cache key
Required Parameters:
- bot_id: str - 目标机器人ID
Authentication:
- Requires valid MD5 hash token: MD5(MASTERKEY:bot_id)
- Authorization header should contain: Bearer {token}
Notes:
- 此接口会预先生成并缓存agent后续的chat请求可以复用缓存的agent
- messages参数在预热阶段不会被处理仅用于配置验证
- 预热的agent会基于从后端获取的完整配置生成唯一的缓存键
"""
try:
# 获取bot_id必需参数
bot_id = request.bot_id
if not bot_id:
raise HTTPException(status_code=400, detail="bot_id is required")
# v2接口鉴权验证与chat_completions_v2相同的认证逻辑
expected_token = generate_v2_auth_token(bot_id)
provided_token = extract_api_key_from_auth(authorization)
if not provided_token:
raise HTTPException(
status_code=401,
detail="Authorization header is required for v2 API"
)
if provided_token != expected_token:
raise HTTPException(
status_code=403,
detail=f"Invalid authentication token. Expected: {expected_token[:8]}..., Provided: {provided_token[:8]}..."
)
# 从后端API获取机器人配置使用v2的鉴权方式
bot_config = await fetch_bot_config(bot_id)
# 创建项目目录从后端配置获取dataset_ids和skills
project_dir = create_project_directory(
bot_config.get("dataset_ids", []),
bot_id,
bot_config.get("robot_type", "general_agent"),
bot_config.get("skills")
)
# 创建一个空的消息列表用于预热实际消息不会在warmup中处理
empty_messages = []
# 处理消息
messages = process_messages(empty_messages, request.language or "ja")
# 创建 AgentConfig 对象
config = await AgentConfig.from_v2_request(request, bot_config, project_dir, messages)
# 预热 mcp_tools 缓存
logger.info(f"Warming up mcp_tools for bot_id: {bot_id}")
from agent.deep_assistant import get_tools_from_mcp
from agent.prompt_loader import load_mcp_settings_async
# 加载 mcp_settings
final_mcp_settings = await load_mcp_settings_async(
config.project_dir, config.mcp_settings, config.bot_id, config.robot_type
)
mcp_settings = final_mcp_settings if final_mcp_settings else []
if not isinstance(mcp_settings, list) or len(mcp_settings) == 0:
mcp_settings = []
# 预热 mcp_tools缓存逻辑已内置到 get_tools_from_mcp
mcp_tools = await get_tools_from_mcp(mcp_settings)
return {
"status": "warmed_up",
"bot_id": bot_id,
"mcp_tools_count": len(mcp_tools),
"message": "MCP tools have been cached successfully"
}
except HTTPException:
raise
except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error(f"Error in chat_warmup_v2: {str(e)}")
logger.error(f"Full traceback: {error_details}")
raise HTTPException(status_code=500, detail=f"Warmup failed: {str(e)}")
@router.post("/api/v2/chat/completions")
async def chat_completions_v2(request: ChatRequestV2, authorization: Optional[str] = Header(None)):
"""
Chat completions API v2 with simplified parameters.
Only requires messages, stream, tool_response, bot_id, and language parameters.
Other parameters are fetched from the backend bot configuration API.
Args:
request: ChatRequestV2 containing only essential parameters
authorization: Authorization header for authentication (different from v1)
Returns:
Union[ChatResponse, StreamingResponse]: Chat completion response or stream
Required Parameters:
- bot_id: str - 目标机器人ID
- messages: List[Message] - 对话消息列表
Optional Parameters:
- stream: bool - 是否流式输出默认false
- tool_response: bool - 是否包含工具响应默认false
- language: str - 回复语言,默认"ja"
Authentication:
- Requires valid MD5 hash token: MD5(MASTERKEY:bot_id)
- Authorization header should contain: Bearer {token}
- Uses MD5 hash of MASTERKEY:bot_id for backend API authentication
- Optionally uses API key from bot config for model access
"""
try:
# 获取bot_id必需参数
bot_id = request.bot_id
if not bot_id:
raise HTTPException(status_code=400, detail="bot_id is required")
# v2接口鉴权验证
expected_token = generate_v2_auth_token(bot_id)
provided_token = extract_api_key_from_auth(authorization)
if not provided_token:
raise HTTPException(
status_code=401,
detail="Authorization header is required for v2 API"
)
if provided_token != expected_token:
raise HTTPException(
status_code=403,
detail=f"Invalid authentication token. Expected: {expected_token[:8]}..., Provided: {provided_token[:8]}..."
)
# 从后端API获取机器人配置使用v2的鉴权方式
bot_config = await fetch_bot_config(bot_id)
# 创建项目目录从后端配置获取dataset_ids和skills
project_dir = create_project_directory(
bot_config.get("dataset_ids", []),
bot_id,
bot_config.get("robot_type", "general_agent"),
bot_config.get("skills")
)
# 处理消息
messages = process_messages(request.messages, request.language)
# 创建 AgentConfig 对象
config = await AgentConfig.from_v2_request(request, bot_config, project_dir, messages)
# 调用公共的agent创建和响应生成逻辑
return await create_agent_and_generate_response(config)
except HTTPException:
raise
except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error(f"Error in chat_completions_v2: {str(e)}")
logger.error(f"Full traceback: {error_details}")
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
# ============================================================================
# 聊天历史查询接口
# ============================================================================
@router.get("/api/v1/chat/history", response_model=dict)
async def get_chat_history(
session_id: str,
last_message_id: Optional[str] = None,
limit: int = 20
):
"""
获取聊天历史记录
从独立的聊天历史表查询,返回完整的原始消息(不受 checkpoint summary 影响)
参数:
session_id: 会话ID
last_message_id: 上一页最后一条消息的ID用于获取更早的消息
limit: 每次返回的消息数量,默认 20最大 100
返回:
{
"messages": [
{
"id": "唯一消息ID",
"role": "user 或 assistant",
"content": "消息内容",
"timestamp": "ISO 8601 格式的时间戳"
},
...
],
"has_more": true/false // 是否还有更多历史消息
}
"""
try:
from agent.chat_history_manager import get_chat_history_manager
# 参数验证
limit = min(max(1, limit), 100)
manager = get_chat_history_manager()
result = await manager.manager.get_history_by_message_id(
session_id=session_id,
last_message_id=last_message_id,
limit=limit
)
return {
"messages": result["messages"],
"has_more": result["has_more"]
}
except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error(f"Error in get_chat_history: {str(e)}")
logger.error(f"Full traceback: {error_details}")
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
@router.post("/api/v1/chat/history/batch", response_model=BatchSaveChatResponse)
async def batch_save_chat_history(request: BatchSaveChatRequest):
"""
批量保存聊天记录
支持自定义批量保存多条聊天消息到数据库
参数:
session_id: 会话ID
messages: 要保存的消息列表,每条消息包含 role 和 content
bot_id: 机器人ID可选
请求体示例:
{
"session_id": "test-session-123",
"messages": [
{"role": "user", "content": "你好"},
{"role": "assistant", "content": "你好!有什么可以帮助你的吗?"},
{"role": "user", "content": "咖啡多少钱一杯"}
],
"bot_id": "63069654-7750-409d-9a58-a0960d899a20"
}
返回:
{
"success": true,
"message": "成功保存 3 条消息",
"session_id": "test-session-123",
"saved_count": 3,
"message_ids": ["uuid1", "uuid2", "uuid3"]
}
"""
try:
from agent.chat_history_manager import get_chat_history_manager
# 参数验证
if not request.session_id:
raise HTTPException(status_code=400, detail="session_id is required")
if not request.messages or len(request.messages) == 0:
raise HTTPException(status_code=400, detail="messages list is empty")
# 转换消息格式
messages_dict = [
{"role": msg.role, "content": msg.content}
for msg in request.messages
]
manager = get_chat_history_manager()
message_ids = await manager.manager.save_messages(
session_id=request.session_id,
messages=messages_dict,
bot_id=request.bot_id
)
# 过滤掉 None 值
valid_message_ids = [mid for mid in message_ids if mid is not None]
saved_count = len(valid_message_ids)
return BatchSaveChatResponse(
success=True,
message=f"成功保存 {saved_count} 条消息",
session_id=request.session_id,
saved_count=saved_count,
message_ids=valid_message_ids
)
except HTTPException:
raise
except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error(f"Error in batch_save_chat_history: {str(e)}")
logger.error(f"Full traceback: {error_details}")
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")