Replace Memori with Mem0 for memory management: - Delete memori_config.py, memori_manager.py, memori_middleware.py - Add mem0_config.py, mem0_manager.py, mem0_middleware.py - Update environment variables (MEMORI_* -> MEM0_*) - Integrate Mem0 with LangGraph middleware - Add sync connection pool for Mem0 in DBPoolManager - Move checkpoint message prep to config creation Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
792 lines
32 KiB
Python
792 lines
32 KiB
Python
import json
|
||
import os
|
||
import asyncio
|
||
from typing import Union, Optional, Any, List, Dict
|
||
from fastapi import APIRouter, HTTPException, Header
|
||
from fastapi.responses import StreamingResponse
|
||
import logging
|
||
|
||
logger = logging.getLogger('app')
|
||
from utils import (
|
||
Message, ChatRequest, ChatResponse, BatchSaveChatRequest, BatchSaveChatResponse
|
||
)
|
||
from utils.api_models import ChatRequestV2
|
||
from utils.fastapi_utils import (
|
||
process_messages,
|
||
create_project_directory, extract_api_key_from_auth, generate_v2_auth_token, fetch_bot_config,
|
||
call_preamble_llm,
|
||
create_stream_chunk
|
||
)
|
||
from langchain_core.messages import AIMessageChunk, ToolMessage, AIMessage, HumanMessage
|
||
from utils.settings import MAX_OUTPUT_TOKENS
|
||
from agent.agent_config import AgentConfig
|
||
from agent.deep_assistant import init_agent
|
||
|
||
router = APIRouter()
|
||
|
||
|
||
async def enhanced_generate_stream_response(
|
||
config: AgentConfig
|
||
):
|
||
"""增强的渐进式流式响应生成器 - 并发优化版本
|
||
|
||
Args:
|
||
agent: LangChain agent 对象
|
||
config: AgentConfig 对象,包含所有参数
|
||
"""
|
||
# 用于收集完整的响应内容,用于保存到数据库
|
||
full_response_content = []
|
||
|
||
try:
|
||
# 创建输出队列和控制事件
|
||
output_queue = asyncio.Queue()
|
||
preamble_completed = asyncio.Event()
|
||
|
||
# 在流式开始前保存用户消息
|
||
if config.session_id:
|
||
asyncio.create_task(_save_user_messages(config))
|
||
|
||
# Preamble 任务
|
||
async def preamble_task():
|
||
try:
|
||
preamble_result = await call_preamble_llm(config)
|
||
# 只有当preamble_text不为空且不为"<empty>"时才输出
|
||
if preamble_result and preamble_result.strip() and preamble_result != "<empty>":
|
||
preamble_content = f"[PREAMBLE]\n{preamble_result}\n"
|
||
chunk_data = create_stream_chunk(f"chatcmpl-preamble", config.model_name, preamble_content)
|
||
await output_queue.put(("preamble", f"data: {json.dumps(chunk_data, ensure_ascii=False)}\n\n"))
|
||
logger.info(f"Stream mode: Generated preamble text ({len(preamble_result)} chars)")
|
||
else:
|
||
logger.info("Stream mode: Skipped empty preamble text")
|
||
|
||
# 标记 preamble 完成
|
||
preamble_completed.set()
|
||
await output_queue.put(("preamble_done", None))
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error generating preamble text: {e}")
|
||
# 即使出错也要标记完成,避免阻塞
|
||
preamble_completed.set()
|
||
await output_queue.put(("preamble_done", None))
|
||
|
||
# Agent 任务(准备 + 流式处理)
|
||
async def agent_task():
|
||
checkpointer = None
|
||
try:
|
||
# 开始流式处理
|
||
logger.info(f"Starting agent stream response")
|
||
chunk_id = 0
|
||
message_tag = ""
|
||
agent, checkpointer = await init_agent(config)
|
||
async for msg, metadata in agent.astream({"messages": config.messages}, stream_mode="messages", config=config.invoke_config(), max_tokens=MAX_OUTPUT_TOKENS):
|
||
new_content = ""
|
||
|
||
if isinstance(msg, AIMessageChunk):
|
||
# 处理工具调用
|
||
if msg.tool_call_chunks:
|
||
message_tag = "TOOL_CALL"
|
||
if config.tool_response:
|
||
for tool_call_chunk in msg.tool_call_chunks:
|
||
if tool_call_chunk["name"]:
|
||
new_content = f"[{message_tag}] {tool_call_chunk['name']}\n"
|
||
if tool_call_chunk['args']:
|
||
new_content += tool_call_chunk['args']
|
||
# 处理文本内容
|
||
elif msg.content:
|
||
preamble_completed.set()
|
||
await output_queue.put(("preamble_done", None))
|
||
meta_message_tag = metadata.get("message_tag", "ANSWER")
|
||
if meta_message_tag != message_tag:
|
||
message_tag = meta_message_tag
|
||
new_content = f"[{meta_message_tag}]\n"
|
||
if msg.text:
|
||
new_content += msg.text
|
||
# 处理工具响应
|
||
elif isinstance(msg, ToolMessage) and msg.content:
|
||
message_tag = "TOOL_RESPONSE"
|
||
if config.tool_response:
|
||
new_content = f"[{message_tag}] {msg.name}\n{msg.text}\n"
|
||
|
||
# 收集完整内容
|
||
if new_content:
|
||
full_response_content.append(new_content)
|
||
|
||
# 发送内容块
|
||
if chunk_id == 0:
|
||
logger.info(f"Agent首个Token已生成, 开始流式输出")
|
||
chunk_id += 1
|
||
chunk_data = create_stream_chunk(f"chatcmpl-{chunk_id}", config.model_name, new_content)
|
||
await output_queue.put(("agent", f"data: {json.dumps(chunk_data, ensure_ascii=False)}\n\n"))
|
||
|
||
# 发送最终chunk
|
||
final_chunk = create_stream_chunk(f"chatcmpl-{chunk_id + 1}", config.model_name, finish_reason="stop")
|
||
await output_queue.put(("agent", f"data: {json.dumps(final_chunk, ensure_ascii=False)}\n\n"))
|
||
await output_queue.put(("agent_done", None))
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error in agent task: {e}")
|
||
# 发送错误信息给客户端
|
||
await output_queue.put(("agent", f'data: {{"error": "{str(e)}"}}\n\n'))
|
||
# 发送完成信号,确保输出控制器能正常结束
|
||
await output_queue.put(("agent_done", None))
|
||
|
||
# 并发执行任务
|
||
# 只有在 enable_thinking 为 True 时才执行 preamble 任务
|
||
if config.enable_thinking:
|
||
preamble_task_handle = asyncio.create_task(preamble_task())
|
||
else:
|
||
# 如果不启用 thinking,创建一个空的已完成任务
|
||
preamble_task_handle = asyncio.create_task(asyncio.sleep(0))
|
||
# 直接标记 preamble 完成
|
||
preamble_completed.set()
|
||
|
||
agent_task_handle = asyncio.create_task(agent_task())
|
||
|
||
# 输出控制器:确保 preamble 先输出,然后是 agent stream
|
||
preamble_output_done = False
|
||
|
||
while True:
|
||
try:
|
||
# 设置超时避免无限等待
|
||
item_type, item_data = await asyncio.wait_for(output_queue.get(), timeout=1.0)
|
||
|
||
if item_type == "preamble":
|
||
# 立即输出 preamble 内容
|
||
if item_data:
|
||
yield item_data
|
||
preamble_output_done = True
|
||
|
||
elif item_type == "preamble_done":
|
||
# Preamble 已完成,标记并继续
|
||
preamble_output_done = True
|
||
|
||
elif item_type == "agent":
|
||
# Agent stream 内容,需要等待 preamble 输出完成
|
||
if preamble_output_done:
|
||
yield item_data
|
||
else:
|
||
# preamble 还没输出,先放回队列
|
||
await output_queue.put((item_type, item_data))
|
||
# 等待 preamble 完成
|
||
await preamble_completed.wait()
|
||
preamble_output_done = True
|
||
|
||
elif item_type == "agent_done":
|
||
# Agent stream 完成,结束循环
|
||
break
|
||
|
||
except asyncio.TimeoutError:
|
||
# 检查是否还有任务在运行
|
||
if all(task.done() for task in [preamble_task_handle, agent_task_handle]):
|
||
# 所有任务都完成了,退出循环
|
||
break
|
||
continue
|
||
|
||
# 发送结束标记
|
||
yield "data: [DONE]\n\n"
|
||
logger.info(f"Enhanced stream response completed")
|
||
|
||
# 流式结束后保存 AI 响应
|
||
if full_response_content and config.session_id:
|
||
asyncio.create_task(_save_assistant_response(config, "".join(full_response_content)))
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error in enhanced_generate_stream_response: {e}")
|
||
yield f'data: {{"error": "{str(e)}"}}\n\n'
|
||
yield "data: [DONE]\n\n"
|
||
|
||
|
||
async def create_agent_and_generate_response(
|
||
config: AgentConfig
|
||
) -> Union[ChatResponse, StreamingResponse]:
|
||
"""创建agent并生成响应的公共逻辑
|
||
|
||
Args:
|
||
config: AgentConfig 对象,包含所有参数
|
||
"""
|
||
# 如果是流式模式,使用增强的流式响应生成器
|
||
if config.stream:
|
||
return StreamingResponse(
|
||
enhanced_generate_stream_response(config),
|
||
media_type="text/event-stream",
|
||
headers={"Cache-Control": "no-cache", "Connection": "keep-alive"}
|
||
)
|
||
|
||
agent, checkpointer = await init_agent(config)
|
||
# 使用更新后的 messages
|
||
agent_responses = await agent.ainvoke({"messages": config.messages}, config=config.invoke_config(), max_tokens=MAX_OUTPUT_TOKENS)
|
||
|
||
# 从后往前找第一个 HumanMessage,之后的内容都给 append_messages
|
||
all_messages = agent_responses["messages"]
|
||
first_human_idx = None
|
||
for i in range(len(all_messages) - 1, -1, -1):
|
||
if isinstance(all_messages[i], HumanMessage):
|
||
first_human_idx = i
|
||
break
|
||
|
||
if first_human_idx is not None:
|
||
append_messages = all_messages[first_human_idx + 1:]
|
||
else:
|
||
# 如果没找到 HumanMessage,取所有消息
|
||
append_messages = all_messages
|
||
response_text = ""
|
||
for msg in append_messages:
|
||
if isinstance(msg,AIMessage):
|
||
if len(msg.text)>0:
|
||
meta_message_tag = msg.additional_kwargs.get("message_tag", "ANSWER")
|
||
output_text = msg.text.replace("````","").replace("````","") if meta_message_tag == "THINK" else msg.text
|
||
response_text += f"[{meta_message_tag}]\n"+output_text+ "\n"
|
||
if len(msg.tool_calls)>0 and config.tool_response:
|
||
response_text += "".join([f"[TOOL_CALL] {tool['name']}\n{json.dumps(tool["args"]) if isinstance(tool["args"],dict) else tool["args"]}\n" for tool in msg.tool_calls])
|
||
elif isinstance(msg,ToolMessage) and config.tool_response:
|
||
response_text += f"[TOOL_RESPONSE] {msg.name}\n{msg.text}\n"
|
||
|
||
if len(response_text) > 0:
|
||
# 构造OpenAI格式的响应
|
||
result = ChatResponse(
|
||
choices=[{
|
||
"index": 0,
|
||
"message": {
|
||
"role": "assistant",
|
||
"content": response_text
|
||
},
|
||
"finish_reason": "stop"
|
||
}],
|
||
usage={
|
||
"prompt_tokens": sum(len(msg.get("content", "")) for msg in config.messages),
|
||
"completion_tokens": len(response_text),
|
||
"total_tokens": sum(len(msg.get("content", "")) for msg in config.messages) + len(response_text)
|
||
}
|
||
)
|
||
|
||
# 保存聊天历史到数据库(与流式接口保持一致的逻辑)
|
||
await _save_user_messages(config)
|
||
await _save_assistant_response(config, response_text)
|
||
else:
|
||
raise HTTPException(status_code=500, detail="No response from agent")
|
||
|
||
return result
|
||
|
||
async def _save_user_messages(config: AgentConfig) -> None:
|
||
"""
|
||
保存最后一条用户消息(用于流式和非流式接口)
|
||
|
||
Args:
|
||
config: AgentConfig 对象
|
||
"""
|
||
# 只有在 session_id 存在时才保存
|
||
if not config.session_id:
|
||
return
|
||
|
||
try:
|
||
from agent.chat_history_manager import get_chat_history_manager
|
||
|
||
manager = get_chat_history_manager()
|
||
|
||
# 只保存最后一条 user 消息
|
||
for msg in reversed(config.messages):
|
||
if isinstance(msg, dict):
|
||
role = msg.get("role", "")
|
||
content = msg.get("content", "")
|
||
if role == "user" and content:
|
||
await manager.manager.save_message(
|
||
session_id=config.session_id,
|
||
role=role,
|
||
content=content,
|
||
bot_id=config.bot_id,
|
||
user_identifier=config.user_identifier
|
||
)
|
||
break # 只保存最后一条,然后退出
|
||
|
||
logger.debug(f"Saved last user message for session_id={config.session_id}")
|
||
except Exception as e:
|
||
# 保存失败不影响主流程
|
||
logger.error(f"Failed to save user messages: {e}")
|
||
|
||
|
||
async def _save_assistant_response(config: AgentConfig, assistant_response: str) -> None:
|
||
"""
|
||
保存 AI 助手的响应(用于流式和非流式接口)
|
||
|
||
Args:
|
||
config: AgentConfig 对象
|
||
assistant_response: AI 助手的响应内容
|
||
"""
|
||
# 只有在 session_id 存在时才保存
|
||
if not config.session_id:
|
||
return
|
||
|
||
if not assistant_response:
|
||
return
|
||
|
||
try:
|
||
from agent.chat_history_manager import get_chat_history_manager
|
||
|
||
manager = get_chat_history_manager()
|
||
|
||
# 保存 AI 助手的响应
|
||
await manager.manager.save_message(
|
||
session_id=config.session_id,
|
||
role="assistant",
|
||
content=assistant_response,
|
||
bot_id=config.bot_id,
|
||
user_identifier=config.user_identifier
|
||
)
|
||
|
||
logger.debug(f"Saved assistant response for session_id={config.session_id}")
|
||
except Exception as e:
|
||
# 保存失败不影响主流程
|
||
logger.error(f"Failed to save assistant response: {e}")
|
||
|
||
|
||
@router.post("/api/v1/chat/completions")
|
||
async def chat_completions(request: ChatRequest, authorization: Optional[str] = Header(None)):
|
||
"""
|
||
Chat completions API similar to OpenAI, supports both streaming and non-streaming
|
||
|
||
Args:
|
||
request: ChatRequest containing messages, model, optional dataset_ids list, required bot_id, system_prompt, mcp_settings, and files
|
||
authorization: Authorization header containing API key (Bearer <API_KEY>)
|
||
|
||
Returns:
|
||
Union[ChatResponse, StreamingResponse]: Chat completion response or stream
|
||
|
||
Notes:
|
||
- dataset_ids: 可选参数,当提供时必须是项目ID列表(单个项目也使用数组格式)
|
||
- bot_id: 必需参数,机器人ID
|
||
- 只有当 robot_type == "catalog_agent" 且 dataset_ids 为非空数组时才会创建机器人项目目录:projects/robot/{bot_id}/
|
||
- robot_type 为其他值(包括默认的 "agent")时不创建任何目录
|
||
- dataset_ids 为空数组 []、None 或未提供时不创建任何目录
|
||
- 支持多知识库合并,自动处理文件夹重名冲突
|
||
|
||
Required Parameters:
|
||
- bot_id: str - 目标机器人ID
|
||
- messages: List[Message] - 对话消息列表
|
||
Optional Parameters:
|
||
- dataset_ids: List[str] - 源知识库项目ID列表(单个项目也使用数组格式)
|
||
- robot_type: str - 机器人类型,默认为 "agent"
|
||
|
||
Example:
|
||
{"bot_id": "my-bot-001", "messages": [{"role": "user", "content": "Hello"}]}
|
||
{"dataset_ids": ["project-123"], "bot_id": "my-bot-001", "messages": [{"role": "user", "content": "Hello"}]}
|
||
{"dataset_ids": ["project-123", "project-456"], "bot_id": "my-bot-002", "messages": [{"role": "user", "content": "Hello"}]}
|
||
{"dataset_ids": ["project-123"], "bot_id": "my-catalog-bot", "robot_type": "catalog_agent", "messages": [{"role": "user", "content": "Hello"}]}
|
||
"""
|
||
try:
|
||
# v1接口:从Authorization header中提取API key作为模型API密钥
|
||
api_key = extract_api_key_from_auth(authorization)
|
||
|
||
# 获取bot_id(必需参数)
|
||
bot_id = request.bot_id
|
||
if not bot_id:
|
||
raise HTTPException(status_code=400, detail="bot_id is required")
|
||
|
||
# 创建项目目录(如果有dataset_ids且不是agent类型)
|
||
project_dir = create_project_directory(request.dataset_ids, bot_id, request.robot_type, request.skills)
|
||
|
||
# 收集额外参数作为 generate_cfg
|
||
exclude_fields = {'messages', 'model', 'model_server', 'dataset_ids', 'language', 'tool_response', 'system_prompt', 'mcp_settings' ,'stream', 'robot_type', 'bot_id', 'user_identifier', 'session_id', 'enable_thinking', 'skills'}
|
||
generate_cfg = {k: v for k, v in request.model_dump().items() if k not in exclude_fields}
|
||
# 处理消息
|
||
messages = process_messages(request.messages, request.language)
|
||
# 创建 AgentConfig 对象
|
||
config = await AgentConfig.from_v1_request(request, api_key, project_dir, generate_cfg, messages)
|
||
# 调用公共的agent创建和响应生成逻辑
|
||
return await create_agent_and_generate_response(config)
|
||
|
||
except Exception as e:
|
||
import traceback
|
||
error_details = traceback.format_exc()
|
||
logger.error(f"Error in chat_completions: {str(e)}")
|
||
logger.error(f"Full traceback: {error_details}")
|
||
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
|
||
|
||
|
||
@router.post("/api/v1/chat/warmup")
|
||
async def chat_warmup_v1(request: ChatRequest, authorization: Optional[str] = Header(None)):
|
||
"""
|
||
预热接口 - 初始化agent但不处理消息,用于后续请求的快速响应
|
||
|
||
Args:
|
||
request: ChatRequest containing configuration (messages will be ignored for warmup)
|
||
authorization: Authorization header containing API key (Bearer <API_KEY>)
|
||
|
||
Returns:
|
||
JSON response with warmup status and cache key
|
||
|
||
Required Parameters:
|
||
- bot_id: str - 目标机器人ID
|
||
|
||
Notes:
|
||
- 此接口会预先生成并缓存agent,后续的chat请求可以复用缓存的agent
|
||
- messages参数在预热阶段不会被处理,仅用于配置验证
|
||
- 预热的agent会基于提供的配置参数生成唯一的缓存键
|
||
"""
|
||
try:
|
||
# v1接口:从Authorization header中提取API key作为模型API密钥
|
||
api_key = extract_api_key_from_auth(authorization)
|
||
|
||
# 获取bot_id(必需参数)
|
||
bot_id = request.bot_id
|
||
if not bot_id:
|
||
raise HTTPException(status_code=400, detail="bot_id is required")
|
||
|
||
# 创建项目目录(如果有dataset_ids且不是agent类型)
|
||
project_dir = create_project_directory(request.dataset_ids, bot_id, request.robot_type, request.skills)
|
||
|
||
# 收集额外参数作为 generate_cfg
|
||
exclude_fields = {'messages', 'model', 'model_server', 'dataset_ids', 'language', 'tool_response', 'system_prompt', 'mcp_settings' ,'stream', 'robot_type', 'bot_id', 'user_identifier', 'session_id', 'enable_thinking', 'skills'}
|
||
generate_cfg = {k: v for k, v in request.model_dump().items() if k not in exclude_fields}
|
||
|
||
# 创建一个空的消息列表用于预热(实际消息不会在warmup中处理)
|
||
empty_messages = []
|
||
|
||
# 处理消息(即使是空的)
|
||
messages = process_messages(empty_messages, request.language or "ja")
|
||
|
||
# 创建 AgentConfig 对象
|
||
config = await AgentConfig.from_v1_request(request, api_key, project_dir, generate_cfg, messages)
|
||
|
||
# 预热 mcp_tools 缓存
|
||
logger.info(f"Warming up mcp_tools for bot_id: {bot_id}")
|
||
from agent.deep_assistant import get_tools_from_mcp
|
||
from agent.prompt_loader import load_mcp_settings_async
|
||
|
||
# 加载 mcp_settings
|
||
final_mcp_settings = await load_mcp_settings_async(
|
||
config.project_dir, config.mcp_settings, config.bot_id, config.robot_type
|
||
)
|
||
mcp_settings = final_mcp_settings if final_mcp_settings else []
|
||
if not isinstance(mcp_settings, list) or len(mcp_settings) == 0:
|
||
mcp_settings = []
|
||
|
||
# 预热 mcp_tools(缓存逻辑已内置到 get_tools_from_mcp)
|
||
mcp_tools = await get_tools_from_mcp(mcp_settings)
|
||
|
||
return {
|
||
"status": "warmed_up",
|
||
"bot_id": bot_id,
|
||
"mcp_tools_count": len(mcp_tools),
|
||
"message": "MCP tools have been cached successfully"
|
||
}
|
||
|
||
except Exception as e:
|
||
import traceback
|
||
error_details = traceback.format_exc()
|
||
logger.error(f"Error in chat_warmup_v1: {str(e)}")
|
||
logger.error(f"Full traceback: {error_details}")
|
||
raise HTTPException(status_code=500, detail=f"Warmup failed: {str(e)}")
|
||
|
||
|
||
@router.post("/api/v2/chat/warmup")
|
||
async def chat_warmup_v2(request: ChatRequestV2, authorization: Optional[str] = Header(None)):
|
||
"""
|
||
预热接口 v2 - 初始化agent但不处理消息,用于后续请求的快速响应
|
||
使用与 /api/v2/chat/completions 相同的认证和配置获取方式
|
||
|
||
Args:
|
||
request: ChatRequestV2 containing essential parameters (messages will be ignored for warmup)
|
||
authorization: Authorization header for authentication (same as v2 chat endpoint)
|
||
|
||
Returns:
|
||
JSON response with warmup status and cache key
|
||
|
||
Required Parameters:
|
||
- bot_id: str - 目标机器人ID
|
||
|
||
Authentication:
|
||
- Requires valid MD5 hash token: MD5(MASTERKEY:bot_id)
|
||
- Authorization header should contain: Bearer {token}
|
||
|
||
Notes:
|
||
- 此接口会预先生成并缓存agent,后续的chat请求可以复用缓存的agent
|
||
- messages参数在预热阶段不会被处理,仅用于配置验证
|
||
- 预热的agent会基于从后端获取的完整配置生成唯一的缓存键
|
||
"""
|
||
try:
|
||
# 获取bot_id(必需参数)
|
||
bot_id = request.bot_id
|
||
if not bot_id:
|
||
raise HTTPException(status_code=400, detail="bot_id is required")
|
||
|
||
# v2接口鉴权验证(与chat_completions_v2相同的认证逻辑)
|
||
expected_token = generate_v2_auth_token(bot_id)
|
||
provided_token = extract_api_key_from_auth(authorization)
|
||
|
||
if not provided_token:
|
||
raise HTTPException(
|
||
status_code=401,
|
||
detail="Authorization header is required for v2 API"
|
||
)
|
||
|
||
if provided_token != expected_token:
|
||
raise HTTPException(
|
||
status_code=403,
|
||
detail=f"Invalid authentication token. Expected: {expected_token[:8]}..., Provided: {provided_token[:8]}..."
|
||
)
|
||
|
||
# 从后端API获取机器人配置(使用v2的鉴权方式)
|
||
bot_config = await fetch_bot_config(bot_id)
|
||
# 创建项目目录(从后端配置获取dataset_ids和skills)
|
||
project_dir = create_project_directory(
|
||
bot_config.get("dataset_ids", []),
|
||
bot_id,
|
||
bot_config.get("robot_type", "general_agent"),
|
||
bot_config.get("skills")
|
||
)
|
||
|
||
# 创建一个空的消息列表用于预热(实际消息不会在warmup中处理)
|
||
empty_messages = []
|
||
|
||
# 处理消息
|
||
messages = process_messages(empty_messages, request.language or "ja")
|
||
|
||
# 创建 AgentConfig 对象
|
||
config = await AgentConfig.from_v2_request(request, bot_config, project_dir, messages)
|
||
|
||
# 预热 mcp_tools 缓存
|
||
logger.info(f"Warming up mcp_tools for bot_id: {bot_id}")
|
||
from agent.deep_assistant import get_tools_from_mcp
|
||
from agent.prompt_loader import load_mcp_settings_async
|
||
|
||
# 加载 mcp_settings
|
||
final_mcp_settings = await load_mcp_settings_async(
|
||
config.project_dir, config.mcp_settings, config.bot_id, config.robot_type
|
||
)
|
||
mcp_settings = final_mcp_settings if final_mcp_settings else []
|
||
if not isinstance(mcp_settings, list) or len(mcp_settings) == 0:
|
||
mcp_settings = []
|
||
|
||
# 预热 mcp_tools(缓存逻辑已内置到 get_tools_from_mcp)
|
||
mcp_tools = await get_tools_from_mcp(mcp_settings)
|
||
|
||
return {
|
||
"status": "warmed_up",
|
||
"bot_id": bot_id,
|
||
"mcp_tools_count": len(mcp_tools),
|
||
"message": "MCP tools have been cached successfully"
|
||
}
|
||
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
import traceback
|
||
error_details = traceback.format_exc()
|
||
logger.error(f"Error in chat_warmup_v2: {str(e)}")
|
||
logger.error(f"Full traceback: {error_details}")
|
||
raise HTTPException(status_code=500, detail=f"Warmup failed: {str(e)}")
|
||
|
||
|
||
@router.post("/api/v2/chat/completions")
|
||
async def chat_completions_v2(request: ChatRequestV2, authorization: Optional[str] = Header(None)):
|
||
"""
|
||
Chat completions API v2 with simplified parameters.
|
||
Only requires messages, stream, tool_response, bot_id, and language parameters.
|
||
Other parameters are fetched from the backend bot configuration API.
|
||
|
||
Args:
|
||
request: ChatRequestV2 containing only essential parameters
|
||
authorization: Authorization header for authentication (different from v1)
|
||
|
||
Returns:
|
||
Union[ChatResponse, StreamingResponse]: Chat completion response or stream
|
||
|
||
Required Parameters:
|
||
- bot_id: str - 目标机器人ID
|
||
- messages: List[Message] - 对话消息列表
|
||
|
||
Optional Parameters:
|
||
- stream: bool - 是否流式输出,默认false
|
||
- tool_response: bool - 是否包含工具响应,默认false
|
||
- language: str - 回复语言,默认"ja"
|
||
|
||
Authentication:
|
||
- Requires valid MD5 hash token: MD5(MASTERKEY:bot_id)
|
||
- Authorization header should contain: Bearer {token}
|
||
- Uses MD5 hash of MASTERKEY:bot_id for backend API authentication
|
||
- Optionally uses API key from bot config for model access
|
||
"""
|
||
try:
|
||
# 获取bot_id(必需参数)
|
||
bot_id = request.bot_id
|
||
if not bot_id:
|
||
raise HTTPException(status_code=400, detail="bot_id is required")
|
||
|
||
# v2接口鉴权验证
|
||
expected_token = generate_v2_auth_token(bot_id)
|
||
provided_token = extract_api_key_from_auth(authorization)
|
||
|
||
if not provided_token:
|
||
raise HTTPException(
|
||
status_code=401,
|
||
detail="Authorization header is required for v2 API"
|
||
)
|
||
|
||
if provided_token != expected_token:
|
||
raise HTTPException(
|
||
status_code=403,
|
||
detail=f"Invalid authentication token. Expected: {expected_token[:8]}..., Provided: {provided_token[:8]}..."
|
||
)
|
||
|
||
# 从后端API获取机器人配置(使用v2的鉴权方式)
|
||
bot_config = await fetch_bot_config(bot_id)
|
||
# 创建项目目录(从后端配置获取dataset_ids和skills)
|
||
project_dir = create_project_directory(
|
||
bot_config.get("dataset_ids", []),
|
||
bot_id,
|
||
bot_config.get("robot_type", "general_agent"),
|
||
bot_config.get("skills")
|
||
)
|
||
# 处理消息
|
||
messages = process_messages(request.messages, request.language)
|
||
# 创建 AgentConfig 对象
|
||
config = await AgentConfig.from_v2_request(request, bot_config, project_dir, messages)
|
||
# 调用公共的agent创建和响应生成逻辑
|
||
return await create_agent_and_generate_response(config)
|
||
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
import traceback
|
||
error_details = traceback.format_exc()
|
||
logger.error(f"Error in chat_completions_v2: {str(e)}")
|
||
logger.error(f"Full traceback: {error_details}")
|
||
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
|
||
|
||
|
||
# ============================================================================
|
||
# 聊天历史查询接口
|
||
# ============================================================================
|
||
|
||
@router.get("/api/v1/chat/history", response_model=dict)
|
||
async def get_chat_history(
|
||
session_id: str,
|
||
last_message_id: Optional[str] = None,
|
||
limit: int = 20
|
||
):
|
||
"""
|
||
获取聊天历史记录
|
||
|
||
从独立的聊天历史表查询,返回完整的原始消息(不受 checkpoint summary 影响)
|
||
|
||
参数:
|
||
session_id: 会话ID
|
||
last_message_id: 上一页最后一条消息的ID,用于获取更早的消息
|
||
limit: 每次返回的消息数量,默认 20,最大 100
|
||
|
||
返回:
|
||
{
|
||
"messages": [
|
||
{
|
||
"id": "唯一消息ID",
|
||
"role": "user 或 assistant",
|
||
"content": "消息内容",
|
||
"timestamp": "ISO 8601 格式的时间戳"
|
||
},
|
||
...
|
||
],
|
||
"has_more": true/false // 是否还有更多历史消息
|
||
}
|
||
"""
|
||
try:
|
||
from agent.chat_history_manager import get_chat_history_manager
|
||
|
||
# 参数验证
|
||
limit = min(max(1, limit), 100)
|
||
|
||
manager = get_chat_history_manager()
|
||
result = await manager.manager.get_history_by_message_id(
|
||
session_id=session_id,
|
||
last_message_id=last_message_id,
|
||
limit=limit
|
||
)
|
||
|
||
return {
|
||
"messages": result["messages"],
|
||
"has_more": result["has_more"]
|
||
}
|
||
|
||
except Exception as e:
|
||
import traceback
|
||
error_details = traceback.format_exc()
|
||
logger.error(f"Error in get_chat_history: {str(e)}")
|
||
logger.error(f"Full traceback: {error_details}")
|
||
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
|
||
|
||
|
||
@router.post("/api/v1/chat/history/batch", response_model=BatchSaveChatResponse)
|
||
async def batch_save_chat_history(request: BatchSaveChatRequest):
|
||
"""
|
||
批量保存聊天记录
|
||
|
||
支持自定义批量保存多条聊天消息到数据库
|
||
|
||
参数:
|
||
session_id: 会话ID
|
||
messages: 要保存的消息列表,每条消息包含 role 和 content
|
||
bot_id: 机器人ID(可选)
|
||
|
||
请求体示例:
|
||
{
|
||
"session_id": "test-session-123",
|
||
"messages": [
|
||
{"role": "user", "content": "你好"},
|
||
{"role": "assistant", "content": "你好!有什么可以帮助你的吗?"},
|
||
{"role": "user", "content": "咖啡多少钱一杯"}
|
||
],
|
||
"bot_id": "63069654-7750-409d-9a58-a0960d899a20"
|
||
}
|
||
|
||
返回:
|
||
{
|
||
"success": true,
|
||
"message": "成功保存 3 条消息",
|
||
"session_id": "test-session-123",
|
||
"saved_count": 3,
|
||
"message_ids": ["uuid1", "uuid2", "uuid3"]
|
||
}
|
||
"""
|
||
try:
|
||
from agent.chat_history_manager import get_chat_history_manager
|
||
|
||
# 参数验证
|
||
if not request.session_id:
|
||
raise HTTPException(status_code=400, detail="session_id is required")
|
||
|
||
if not request.messages or len(request.messages) == 0:
|
||
raise HTTPException(status_code=400, detail="messages list is empty")
|
||
|
||
# 转换消息格式
|
||
messages_dict = [
|
||
{"role": msg.role, "content": msg.content}
|
||
for msg in request.messages
|
||
]
|
||
|
||
manager = get_chat_history_manager()
|
||
message_ids = await manager.manager.save_messages(
|
||
session_id=request.session_id,
|
||
messages=messages_dict,
|
||
bot_id=request.bot_id
|
||
)
|
||
|
||
# 过滤掉 None 值
|
||
valid_message_ids = [mid for mid in message_ids if mid is not None]
|
||
saved_count = len(valid_message_ids)
|
||
|
||
return BatchSaveChatResponse(
|
||
success=True,
|
||
message=f"成功保存 {saved_count} 条消息",
|
||
session_id=request.session_id,
|
||
saved_count=saved_count,
|
||
message_ids=valid_message_ids
|
||
)
|
||
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
import traceback
|
||
error_details = traceback.format_exc()
|
||
logger.error(f"Error in batch_save_chat_history: {str(e)}")
|
||
logger.error(f"Full traceback: {error_details}")
|
||
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
|