import json import os import asyncio import shutil import time from typing import Union, Optional, Any, List, Dict from fastapi import APIRouter, HTTPException, Header, Body from fastapi.responses import StreamingResponse import logging logger = logging.getLogger('app') from utils import ( Message, ChatRequest, ChatResponse, BatchSaveChatRequest, BatchSaveChatResponse ) from utils.api_models import ChatRequestV2, ChatRequestV3 from utils.fastapi_utils import ( process_messages, create_project_directory, extract_api_key_from_auth, generate_v2_auth_token, fetch_bot_config, fetch_bot_config_from_db, call_preamble_llm, create_stream_chunk ) from langchain_core.messages import AIMessageChunk, ToolMessage, AIMessage, HumanMessage from utils.settings import MAX_OUTPUT_TOKENS from agent.agent_config import AgentConfig from agent.deep_assistant import init_agent router = APIRouter() async def enhanced_generate_stream_response( config: AgentConfig ): """增强的渐进式流式响应生成器 - 并发优化版本 Args: agent: LangChain agent 对象 config: AgentConfig 对象,包含所有参数 """ # 用于收集完整的响应内容,用于保存到数据库 full_response_content = [] # 取消管理 cancel_event = None try: # 创建输出队列和控制事件 output_queue = asyncio.Queue() preamble_completed = asyncio.Event() # 注册取消事件 if config.session_id: from utils.cancel_manager import register_cancel_event, unregister_cancel_event cancel_event = register_cancel_event(config.session_id) # 在流式开始前保存用户消息 if config.session_id: asyncio.create_task(_save_user_messages(config)) # Preamble 任务 async def preamble_task(): try: preamble_result = await call_preamble_llm(config) # 只有当preamble_text不为空且不为""时才输出 if preamble_result and preamble_result.strip() and preamble_result != "": preamble_content = f"[PREAMBLE]\n{preamble_result}\n" chunk_data = create_stream_chunk(f"chatcmpl-preamble", config.model_name, preamble_content) await output_queue.put(("preamble", f"data: {json.dumps(chunk_data, ensure_ascii=False)}\n\n")) logger.info(f"Stream mode: Generated preamble text ({len(preamble_result)} chars)") else: logger.info("Stream mode: Skipped empty preamble text") # 标记 preamble 完成 preamble_completed.set() await output_queue.put(("preamble_done", None)) except Exception as e: logger.error(f"Error generating preamble text: {e}") # 即使出错也要标记完成,避免阻塞 preamble_completed.set() await output_queue.put(("preamble_done", None)) # Agent 任务(准备 + 流式处理) async def agent_task(): checkpointer = None try: # 开始流式处理 logger.info(f"Starting agent stream response") chunk_id = 0 message_tag = "" agent, checkpointer = await init_agent(config) async for msg, metadata in agent.astream({"messages": config.messages}, stream_mode="messages", config=config.invoke_config(), max_tokens=MAX_OUTPUT_TOKENS): # 检查是否收到取消信号 if cancel_event and cancel_event.is_set(): logger.info(f"Agent stream cancelled for session_id={config.session_id}") break new_content = "" if isinstance(msg, AIMessageChunk): # 处理工具调用 if msg.tool_call_chunks: message_tag = "TOOL_CALL" if config.tool_response: for tool_call_chunk in msg.tool_call_chunks: chunk_name = tool_call_chunk.get("name") if isinstance(tool_call_chunk, dict) else getattr(tool_call_chunk, "name", None) chunk_args = tool_call_chunk.get("args") if isinstance(tool_call_chunk, dict) else getattr(tool_call_chunk, "args", None) if chunk_name: new_content = f"[{message_tag}] {chunk_name}\n" if chunk_args: new_content += chunk_args # 处理文本内容 elif msg.content: preamble_completed.set() await output_queue.put(("preamble_done", None)) meta_message_tag = metadata.get("message_tag", "ANSWER") # SUMMARY 不输出内容 if meta_message_tag == "SUMMARY": continue if meta_message_tag != message_tag: message_tag = meta_message_tag new_content = f"[{meta_message_tag}]\n" if msg.text: new_content += msg.text # 处理工具响应 elif isinstance(msg, ToolMessage) and msg.content: message_tag = "TOOL_RESPONSE" if config.tool_response: new_content = f"[{message_tag}] {msg.name}\n{msg.text}\n" # 收集完整内容 if new_content: full_response_content.append(new_content) # 发送内容块 if chunk_id == 0: logger.info(f"Agent首个Token已生成, 开始流式输出") chunk_id += 1 chunk_data = create_stream_chunk(f"chatcmpl-{chunk_id}", config.model_name, new_content) await output_queue.put(("agent", f"data: {json.dumps(chunk_data, ensure_ascii=False)}\n\n")) # 发送最终chunk finish = "cancelled" if (cancel_event and cancel_event.is_set()) else "stop" final_chunk = create_stream_chunk(f"chatcmpl-{chunk_id + 1}", config.model_name, finish_reason=finish) await output_queue.put(("agent", f"data: {json.dumps(final_chunk, ensure_ascii=False)}\n\n")) # ============ 执行 PostAgent hooks ============ # 注意:这里在单独的异步任务中执行,不阻塞流式输出 full_response = "".join(full_response_content) asyncio.create_task(_execute_post_agent_hooks(config, full_response)) # =========================================== await output_queue.put(("agent_done", None)) except Exception as e: logger.error(f"Error in agent task: {e}") # 发送错误信息给客户端 await output_queue.put(("agent", f'data: {{"error": "{str(e)}"}}\n\n')) # 发送完成信号,确保输出控制器能正常结束 await output_queue.put(("agent_done", None)) # 并发执行任务 # 只有在 enable_thinking 为 True 时才执行 preamble 任务 if config.enable_thinking: preamble_task_handle = asyncio.create_task(preamble_task()) else: # 如果不启用 thinking,创建一个空的已完成任务 preamble_task_handle = asyncio.create_task(asyncio.sleep(0)) # 直接标记 preamble 完成 preamble_completed.set() agent_task_handle = asyncio.create_task(agent_task()) # 输出控制器:确保 preamble 先输出,然后是 agent stream preamble_output_done = False last_yield_time = time.time() while True: try: # 设置超时避免无限等待 item_type, item_data = await asyncio.wait_for(output_queue.get(), timeout=1.0) if item_type == "preamble": # 立即输出 preamble 内容 if item_data: yield item_data last_yield_time = time.time() preamble_output_done = True elif item_type == "preamble_done": # Preamble 已完成,标记并继续 preamble_output_done = True elif item_type == "agent": # Agent stream 内容,需要等待 preamble 输出完成 if preamble_output_done: yield item_data last_yield_time = time.time() else: # preamble 还没输出,先放回队列 await output_queue.put((item_type, item_data)) # 等待 preamble 完成 await preamble_completed.wait() preamble_output_done = True elif item_type == "agent_done": # Agent stream 完成,结束循环 break except asyncio.TimeoutError: # 检查是否收到取消信号 if cancel_event and cancel_event.is_set(): logger.info(f"Output loop cancelled for session_id={config.session_id}") break # 检查是否还有任务在运行 if all(task.done() for task in [preamble_task_handle, agent_task_handle]): # 所有任务都完成了,退出循环 break # 15秒无消息输出时才发送心跳包保持连接活跃 if time.time() - last_yield_time >= 15: heartbeat_chunk = create_stream_chunk(f"chatcmpl-heartbeat", config.model_name, "") yield f"data: {json.dumps(heartbeat_chunk, ensure_ascii=False)}\n\n" last_yield_time = time.time() continue # 发送结束标记 yield "data: [DONE]\n\n" # 清理取消事件 if config.session_id: from utils.cancel_manager import unregister_cancel_event unregister_cancel_event(config.session_id) logger.info(f"Enhanced stream response completed") # 流式结束后保存 AI 响应 if full_response_content and config.session_id: asyncio.create_task(_save_assistant_response(config, "".join(full_response_content))) except Exception as e: logger.error(f"Error in enhanced_generate_stream_response: {e}") yield f'data: {{"error": "{str(e)}"}}\n\n' yield "data: [DONE]\n\n" # 清理取消事件 if config.session_id: from utils.cancel_manager import unregister_cancel_event unregister_cancel_event(config.session_id) async def create_agent_and_generate_response( config: AgentConfig ) -> Union[ChatResponse, StreamingResponse]: """创建agent并生成响应的公共逻辑 Args: config: AgentConfig 对象,包含所有参数 """ # 如果是流式模式,使用增强的流式响应生成器 if config.stream: return StreamingResponse( enhanced_generate_stream_response(config), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "Connection": "keep-alive"} ) agent, checkpointer = await init_agent(config) # 使用更新后的 messages agent_responses = await agent.ainvoke({"messages": config.messages}, config=config.invoke_config(), max_tokens=MAX_OUTPUT_TOKENS) # ============ 执行 PostAgent hooks ============ # 注意:这里在非流式模式下同步执行hooks await _execute_post_agent_hooks(config, "") # =========================================== # 从后往前找第一个 HumanMessage,之后的内容都给 append_messages all_messages = agent_responses["messages"] first_human_idx = None for i in range(len(all_messages) - 1, -1, -1): if isinstance(all_messages[i], HumanMessage): first_human_idx = i break if first_human_idx is not None: append_messages = all_messages[first_human_idx + 1:] else: # 如果没找到 HumanMessage,取所有消息 append_messages = all_messages response_text = "" for msg in append_messages: if isinstance(msg,AIMessage): if len(msg.text)>0: meta_message_tag = msg.additional_kwargs.get("message_tag", "ANSWER") if meta_message_tag == "SUMMARY": continue output_text = msg.text.replace("````","").replace("````","") if meta_message_tag == "THINK" else msg.text response_text += f"[{meta_message_tag}]\n"+output_text+ "\n" if len(msg.tool_calls)>0 and config.tool_response: response_text += "".join([f"[TOOL_CALL] {tool['name']}\n{json.dumps(tool["args"]) if isinstance(tool["args"],dict) else tool["args"]}\n" for tool in msg.tool_calls]) elif isinstance(msg,ToolMessage) and config.tool_response: response_text += f"[TOOL_RESPONSE] {msg.name}\n{msg.text}\n" if len(response_text) > 0: # 构造OpenAI格式的响应 result = ChatResponse( choices=[{ "index": 0, "message": { "role": "assistant", "content": response_text }, "finish_reason": "stop" }], usage={ "prompt_tokens": sum(len(msg.get("content", "")) for msg in config.messages), "completion_tokens": len(response_text), "total_tokens": sum(len(msg.get("content", "")) for msg in config.messages) + len(response_text) } ) # 保存聊天历史到数据库(与流式接口保持一致的逻辑) await _save_user_messages(config) await _save_assistant_response(config, response_text) else: raise HTTPException(status_code=500, detail="No response from agent") return result async def _save_user_messages(config: AgentConfig) -> None: """ 保存最后一条用户消息(用于流式和非流式接口) Args: config: AgentConfig 对象 """ # 只有在 session_id 存在时才保存 if not config.session_id: return try: from agent.chat_history_manager import get_chat_history_manager from agent.plugin_hook_loader import execute_hooks manager = get_chat_history_manager() # 只保存最后一条 user 消息 for msg in reversed(config.messages): if isinstance(msg, dict): role = msg.get("role", "") content = msg.get("content", "") if role == "user" and content: # ============ 执行 PreSave hooks ============ processed_content = await execute_hooks('PreSave', config, content=content, role=role) if processed_content: content = processed_content # ================================================ await manager.manager.save_message( session_id=config.session_id, role=role, content=content, bot_id=config.bot_id, user_identifier=config.user_identifier ) break # 只保存最后一条,然后退出 logger.debug(f"Saved last user message for session_id={config.session_id}") except Exception as e: # 保存失败不影响主流程 logger.error(f"Failed to save user messages: {e}") async def _save_assistant_response(config: AgentConfig, assistant_response: str) -> None: """ 保存 AI 助手的响应(用于流式和非流式接口) Args: config: AgentConfig 对象 assistant_response: AI 助手的响应内容 """ # 只有在 session_id 存在时才保存 if not config.session_id: return if not assistant_response: return try: from agent.chat_history_manager import get_chat_history_manager from agent.plugin_hook_loader import execute_hooks manager = get_chat_history_manager() # ============ 执行 PreSave hooks ============ processed_response = await execute_hooks('PreSave', config, content=assistant_response, role='assistant') if processed_response: assistant_response = processed_response # ================================================ # 保存 AI 助手的响应 await manager.manager.save_message( session_id=config.session_id, role="assistant", content=assistant_response, bot_id=config.bot_id, user_identifier=config.user_identifier ) logger.debug(f"Saved assistant response for session_id={config.session_id}") except Exception as e: # 保存失败不影响主流程 logger.error(f"Failed to save assistant response: {e}") async def _execute_post_agent_hooks(config: AgentConfig, response: str) -> None: """ 执行 PostAgent hooks(在agent执行后) Args: config: AgentConfig 对象 response: Agent 的完整响应内容 """ try: from agent.plugin_hook_loader import execute_hooks metadata = { "bot_id": config.bot_id, "user_identifier": config.user_identifier, "session_id": config.session_id, "language": config.language, } await execute_hooks('PostAgent', config, response=response, metadata=metadata) logger.debug(f"Executed PostAgent hooks for session_id={config.session_id}") except Exception as e: # hook执行失败不影响主流程 logger.error(f"Failed to execute PostAgent hooks: {e}") # 清理 executable_code/tmp 文件夹 await _cleanup_tmp_folder(config) async def _cleanup_tmp_folder(config: AgentConfig) -> None: """ 清理 executable_code/tmp 文件夹中 3 天前的文件 Args: config: AgentConfig 对象 """ try: if config.project_dir and config.bot_id: tmp_dir = os.path.join(config.project_dir, "executable_code", "tmp") if os.path.exists(tmp_dir): # 3 天前的秒数(3 * 24 * 60 * 60 = 259200) three_days_ago = time.time() - (3 * 24 * 60 * 60) deleted_count = 0 for item in os.listdir(tmp_dir): item_path = os.path.join(tmp_dir, item) # 检查修改时间 if os.path.getmtime(item_path) < three_days_ago: if os.path.isfile(item_path) or os.path.islink(item_path): os.remove(item_path) else: shutil.rmtree(item_path) deleted_count += 1 logger.debug(f"Deleted old item: {item_path}") logger.info(f"Cleaned up {deleted_count} old item(s) from tmp folder: {tmp_dir}") except Exception as e: # 清理失败不影响主流程 logger.error(f"Failed to cleanup tmp folder: {e}") @router.post("/api/v1/chat/completions") async def chat_completions(request: ChatRequest, authorization: Optional[str] = Header(None)): """ Chat completions API similar to OpenAI, supports both streaming and non-streaming Args: request: ChatRequest containing messages, model, optional dataset_ids list, required bot_id, system_prompt, mcp_settings, and files authorization: Authorization header containing API key (Bearer ) Returns: Union[ChatResponse, StreamingResponse]: Chat completion response or stream Notes: - dataset_ids: 可选参数,当提供时必须是项目ID列表(单个项目也使用数组格式) - bot_id: 必需参数,机器人ID - dataset_ids 为空数组 []、None 或未提供时不创建任何目录 - 支持多知识库合并,自动处理文件夹重名冲突 Required Parameters: - bot_id: str - 目标机器人ID - messages: List[Message] - 对话消息列表 Optional Parameters: - dataset_ids: List[str] - 源知识库项目ID列表(单个项目也使用数组格式) Example: {"bot_id": "my-bot-001", "messages": [{"role": "user", "content": "Hello"}]} {"dataset_ids": ["project-123"], "bot_id": "my-bot-001", "messages": [{"role": "user", "content": "Hello"}]} {"dataset_ids": ["project-123", "project-456"], "bot_id": "my-bot-002", "messages": [{"role": "user", "content": "Hello"}]} {"dataset_ids": ["project-123"], "bot_id": "my-catalog-bot", "messages": [{"role": "user", "content": "Hello"}]} """ try: # v1接口:从Authorization header中提取API key作为模型API密钥 api_key = extract_api_key_from_auth(authorization) # 获取bot_id(必需参数) bot_id = request.bot_id if not bot_id: raise HTTPException(status_code=400, detail="bot_id is required") # 创建项目目录(如果有dataset_ids且不是agent类型) project_dir = create_project_directory(request.dataset_ids, bot_id, request.skills) # 收集额外参数作为 generate_cfg exclude_fields = {'messages', 'model', 'model_server', 'dataset_ids', 'language', 'tool_response', 'system_prompt', 'mcp_settings' ,'stream', 'robot_type', 'bot_id', 'user_identifier', 'session_id', 'enable_thinking', 'skills', 'enable_memory', 'n', 'shell_env'} generate_cfg = {k: v for k, v in request.model_dump().items() if k not in exclude_fields} # 处理消息 messages = process_messages(request.messages, request.language) # 创建 AgentConfig 对象 config = await AgentConfig.from_v1_request(request, api_key, project_dir, generate_cfg, messages) # 调用公共的agent创建和响应生成逻辑 return await create_agent_and_generate_response(config) except Exception as e: import traceback error_details = traceback.format_exc() logger.error(f"Error in chat_completions: {str(e)}") logger.error(f"Full traceback: {error_details}") raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.post("/api/v1/chat/warmup") async def chat_warmup_v1(request: ChatRequest, authorization: Optional[str] = Header(None)): """ 预热接口 - 初始化agent但不处理消息,用于后续请求的快速响应 Args: request: ChatRequest containing configuration (messages will be ignored for warmup) authorization: Authorization header containing API key (Bearer ) Returns: JSON response with warmup status and cache key Required Parameters: - bot_id: str - 目标机器人ID Notes: - 此接口会预先生成并缓存agent,后续的chat请求可以复用缓存的agent - messages参数在预热阶段不会被处理,仅用于配置验证 - 预热的agent会基于提供的配置参数生成唯一的缓存键 """ try: # v1接口:从Authorization header中提取API key作为模型API密钥 api_key = extract_api_key_from_auth(authorization) # 获取bot_id(必需参数) bot_id = request.bot_id if not bot_id: raise HTTPException(status_code=400, detail="bot_id is required") # 创建项目目录(如果有dataset_ids且不是agent类型) project_dir = create_project_directory(request.dataset_ids, bot_id, request.skills) # 收集额外参数作为 generate_cfg exclude_fields = {'messages', 'model', 'model_server', 'dataset_ids', 'language', 'tool_response', 'system_prompt', 'mcp_settings' ,'stream', 'robot_type', 'bot_id', 'user_identifier', 'session_id', 'enable_thinking', 'skills', 'enable_memory', 'n', 'shell_env'} generate_cfg = {k: v for k, v in request.model_dump().items() if k not in exclude_fields} # 创建一个空的消息列表用于预热(实际消息不会在warmup中处理) empty_messages = [] # 处理消息(即使是空的) messages = process_messages(empty_messages, request.language or "ja") # 创建 AgentConfig 对象 config = await AgentConfig.from_v1_request(request, api_key, project_dir, generate_cfg, messages) # 预热 mcp_tools 缓存 logger.info(f"Warming up mcp_tools for bot_id: {bot_id}") from agent.deep_assistant import get_tools_from_mcp from agent.prompt_loader import load_mcp_settings_async # 加载 mcp_settings final_mcp_settings = await load_mcp_settings_async(config) mcp_settings = final_mcp_settings if final_mcp_settings else [] if not isinstance(mcp_settings, list) or len(mcp_settings) == 0: mcp_settings = [] # 预热 mcp_tools(缓存逻辑已内置到 get_tools_from_mcp) mcp_tools = await get_tools_from_mcp(mcp_settings) return { "status": "warmed_up", "bot_id": bot_id, "mcp_tools_count": len(mcp_tools), "message": "MCP tools have been cached successfully" } except Exception as e: import traceback error_details = traceback.format_exc() logger.error(f"Error in chat_warmup_v1: {str(e)}") logger.error(f"Full traceback: {error_details}") raise HTTPException(status_code=500, detail=f"Warmup failed: {str(e)}") @router.post("/api/v2/chat/warmup") async def chat_warmup_v2(request: ChatRequestV2, authorization: Optional[str] = Header(None)): """ 预热接口 v2 - 初始化agent但不处理消息,用于后续请求的快速响应 使用与 /api/v2/chat/completions 相同的认证和配置获取方式 Args: request: ChatRequestV2 containing essential parameters (messages will be ignored for warmup) authorization: Authorization header for authentication (same as v2 chat endpoint) Returns: JSON response with warmup status and cache key Required Parameters: - bot_id: str - 目标机器人ID Authentication: - Requires valid MD5 hash token: MD5(MASTERKEY:bot_id) - Authorization header should contain: Bearer {token} Notes: - 此接口会预先生成并缓存agent,后续的chat请求可以复用缓存的agent - messages参数在预热阶段不会被处理,仅用于配置验证 - 预热的agent会基于从后端获取的完整配置生成唯一的缓存键 """ try: # 获取bot_id(必需参数) bot_id = request.bot_id if not bot_id: raise HTTPException(status_code=400, detail="bot_id is required") # v2接口鉴权验证(与chat_completions_v2相同的认证逻辑) expected_token = generate_v2_auth_token(bot_id) provided_token = extract_api_key_from_auth(authorization) if not provided_token: raise HTTPException( status_code=401, detail="Authorization header is required for v2 API" ) if provided_token != expected_token: raise HTTPException( status_code=403, detail=f"Invalid authentication token. Expected: {expected_token[:8]}..., Provided: {provided_token[:8]}..." ) # 从后端API获取机器人配置(使用v2的鉴权方式) bot_config = await fetch_bot_config(bot_id) # 创建项目目录(从后端配置获取dataset_ids和skills) project_dir = create_project_directory( bot_config.get("dataset_ids", []), bot_id, bot_config.get("skills") ) # 创建一个空的消息列表用于预热(实际消息不会在warmup中处理) empty_messages = [] # 处理消息 messages = process_messages(empty_messages, request.language or "ja") # 收集额外参数作为 generate_cfg exclude_fields = {'messages', 'stream', 'tool_response', 'bot_id', 'language', 'user_identifier', 'session_id', 'n', 'model', 'model_server', 'api_key', 'shell_env'} generate_cfg = {k: v for k, v in request.model_dump().items() if k not in exclude_fields} # 从请求中提取 model/model_server/api_key,优先级高于 bot_config(排除 "whatever" 和空值) req_data = request.model_dump() req_model = req_data.get("model") or "" req_model_server = req_data.get("model_server") or "" req_api_key = req_data.get("api_key") or "" model_name = req_model if req_model and req_model != "whatever" else None model_server = req_model_server if req_model_server and req_model_server != "whatever" else None api_key = req_api_key if req_api_key and req_api_key != "whatever" else None # 创建 AgentConfig 对象 config = await AgentConfig.from_v2_request(request, bot_config, project_dir, messages, generate_cfg, model_name=model_name, model_server=model_server, api_key=api_key) # 预热 mcp_tools 缓存 logger.info(f"Warming up mcp_tools for bot_id: {bot_id}") from agent.deep_assistant import get_tools_from_mcp from agent.prompt_loader import load_mcp_settings_async # 加载 mcp_settings final_mcp_settings = await load_mcp_settings_async(config) mcp_settings = final_mcp_settings if final_mcp_settings else [] if not isinstance(mcp_settings, list) or len(mcp_settings) == 0: mcp_settings = [] # 预热 mcp_tools(缓存逻辑已内置到 get_tools_from_mcp) mcp_tools = await get_tools_from_mcp(mcp_settings) return { "status": "warmed_up", "bot_id": bot_id, "mcp_tools_count": len(mcp_tools), "message": "MCP tools have been cached successfully" } except HTTPException: raise except Exception as e: import traceback error_details = traceback.format_exc() logger.error(f"Error in chat_warmup_v2: {str(e)}") logger.error(f"Full traceback: {error_details}") raise HTTPException(status_code=500, detail=f"Warmup failed: {str(e)}") @router.post("/api/v2/chat/completions") async def chat_completions_v2(request: ChatRequestV2, authorization: Optional[str] = Header(None)): """ Chat completions API v2 with simplified parameters. Only requires messages, stream, tool_response, bot_id, and language parameters. Other parameters are fetched from the backend bot configuration API. Args: request: ChatRequestV2 containing only essential parameters authorization: Authorization header for authentication (different from v1) Returns: Union[ChatResponse, StreamingResponse]: Chat completion response or stream Required Parameters: - bot_id: str - 目标机器人ID - messages: List[Message] - 对话消息列表 Optional Parameters: - stream: bool - 是否流式输出,默认false - tool_response: bool - 是否包含工具响应,默认false - language: str - 回复语言,默认"ja" Authentication: - Requires valid MD5 hash token: MD5(MASTERKEY:bot_id) - Authorization header should contain: Bearer {token} - Uses MD5 hash of MASTERKEY:bot_id for backend API authentication - Optionally uses API key from bot config for model access """ try: # 获取bot_id(必需参数) bot_id = request.bot_id if not bot_id: raise HTTPException(status_code=400, detail="bot_id is required") # v2接口鉴权验证 expected_token = generate_v2_auth_token(bot_id) provided_token = extract_api_key_from_auth(authorization) if not provided_token: raise HTTPException( status_code=401, detail="Authorization header is required for v2 API" ) if provided_token != expected_token: raise HTTPException( status_code=403, detail=f"Invalid authentication token. Expected: {expected_token[:8]}..., Provided: {provided_token[:8]}..." ) # 从后端API获取机器人配置(使用v2的鉴权方式) bot_config = await fetch_bot_config(bot_id) # 创建项目目录(从后端配置获取dataset_ids和skills) project_dir = create_project_directory( bot_config.get("dataset_ids", []), bot_id, bot_config.get("skills") ) # 处理消息 messages = process_messages(request.messages, request.language) # 收集额外参数作为 generate_cfg exclude_fields = {'messages', 'dataset_ids', 'language', 'tool_response', 'system_prompt', 'mcp_settings', 'stream', 'robot_type', 'bot_id', 'user_identifier', 'session_id', 'enable_thinking', 'skills', 'enable_memory', 'n', 'model', 'model_server', 'api_key', 'shell_env'} generate_cfg = {k: v for k, v in request.model_dump().items() if k not in exclude_fields} # 从请求中提取 model/model_server/api_key,优先级高于 bot_config(排除 "whatever" 和空值) req_data = request.model_dump() req_model = req_data.get("model") or "" req_model_server = req_data.get("model_server") or "" req_api_key = req_data.get("api_key") or "" model_name = req_model if req_model and req_model != "whatever" else None model_server = req_model_server if req_model_server and req_model_server != "whatever" else None api_key = req_api_key if req_api_key and req_api_key != "whatever" else None # 创建 AgentConfig 对象 config = await AgentConfig.from_v2_request(request, bot_config, project_dir, messages, generate_cfg, model_name=model_name, model_server=model_server, api_key=api_key) # 调用公共的agent创建和响应生成逻辑 return await create_agent_and_generate_response(config) except HTTPException: raise except Exception as e: import traceback error_details = traceback.format_exc() logger.error(f"Error in chat_completions_v2: {str(e)}") logger.error(f"Full traceback: {error_details}") raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.post("/api/v1/chat/cancel") async def cancel_chat(session_id: str = Body(..., embed=True)): """ 取消正在进行的 agent 推理 请求体: {"session_id": "xxxxx"} 响应: {"success": true/false, "message": "..."} """ from utils.cancel_manager import trigger_cancel if not session_id: raise HTTPException(status_code=400, detail="session_id is required") found = trigger_cancel(session_id) if found: return {"success": True, "message": f"Cancel signal sent for session_id={session_id}"} else: return {"success": False, "message": f"No active inference found for session_id={session_id}"} @router.post("/api/v3/chat/completions") async def chat_completions_v3(request: ChatRequestV3, authorization: Optional[str] = Header(None)): """ Chat completions API v3 - 从数据库读取配置 与 v2 相比,v3 从本地数据库读取所有配置参数,而不是从后端 API。 前端只需要传递 bot_id 和 messages,其他配置从数据库自动读取。 Args: request: ChatRequestV3 包含 bot_id, messages, stream, session_id authorization: 可选的认证头 Returns: Union[ChatResponse, StreamingResponse]: Chat completion response or stream Required Parameters: - bot_id: str - 目标机器人ID(用户创建时填写的ID) - messages: List[Message] - 对话消息列表 Optional Parameters: - stream: bool - 是否流式输出,默认false - session_id: str - 会话ID,用于保存聊天历史 Configuration (from database): - model: 模型名称 - api_key: API密钥 - model_server: 模型服务器地址 - language: 回复语言 - tool_response: 是否包含工具响应 - system_prompt: 系统提示词 - dataset_ids: 数据集ID列表 - mcp_settings: MCP服务器配置 - user_identifier: 用户标识符 Authentication: - 可选的 Authorization header(如果需要验证) """ try: # 获取bot_id(必需参数) bot_id = request.bot_id if not bot_id: raise HTTPException(status_code=400, detail="bot_id is required") # 可选的鉴权验证(如果传递了 authorization header) if authorization: expected_token = generate_v2_auth_token(bot_id) provided_token = extract_api_key_from_auth(authorization) if provided_token and provided_token != expected_token: logger.warning(f"Invalid auth token provided for v3 API, but continuing anyway") # 从数据库获取机器人配置 bot_config = await fetch_bot_config_from_db(bot_id, request.user_identifier) # 构造类 v2 的请求格式 # 从数据库配置中提取参数 language = bot_config.get("language", "zh") # 创建项目目录(从数据库配置获取) project_dir = create_project_directory( bot_config.get("dataset_ids", []), bot_id, bot_config.get("skills", []) ) # 处理消息 messages = process_messages(request.messages, language) # 创建 AgentConfig 对象 # 需要构造一个兼容 v2 的配置对象 config = await AgentConfig.from_v3_request( request, bot_config, project_dir, messages, language ) # 调用公共的agent创建和响应生成逻辑 return await create_agent_and_generate_response(config) except HTTPException: raise except Exception as e: import traceback error_details = traceback.format_exc() logger.error(f"Error in chat_completions_v3: {str(e)}") logger.error(f"Full traceback: {error_details}") raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") # ============================================================================ # 聊天历史查询接口 # ============================================================================ @router.get("/api/v1/chat/history", response_model=dict) async def get_chat_history( session_id: str, last_message_id: Optional[str] = None, limit: int = 20 ): """ 获取聊天历史记录 从独立的聊天历史表查询,返回完整的原始消息(不受 checkpoint summary 影响) 参数: session_id: 会话ID last_message_id: 上一页最后一条消息的ID,用于获取更早的消息 limit: 每次返回的消息数量,默认 20,最大 100 返回: { "messages": [ { "id": "唯一消息ID", "role": "user 或 assistant", "content": "消息内容", "timestamp": "ISO 8601 格式的时间戳" }, ... ], "has_more": true/false // 是否还有更多历史消息 } """ try: from agent.chat_history_manager import get_chat_history_manager # 参数验证 limit = min(max(1, limit), 100) manager = get_chat_history_manager() result = await manager.manager.get_history_by_message_id( session_id=session_id, last_message_id=last_message_id, limit=limit ) return { "messages": result["messages"], "has_more": result["has_more"] } except Exception as e: import traceback error_details = traceback.format_exc() logger.error(f"Error in get_chat_history: {str(e)}") logger.error(f"Full traceback: {error_details}") raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.post("/api/v1/chat/history/batch", response_model=BatchSaveChatResponse) async def batch_save_chat_history(request: BatchSaveChatRequest): """ 批量保存聊天记录 支持自定义批量保存多条聊天消息到数据库 参数: session_id: 会话ID messages: 要保存的消息列表,每条消息包含 role 和 content bot_id: 机器人ID(可选) 请求体示例: { "session_id": "test-session-123", "messages": [ {"role": "user", "content": "你好"}, {"role": "assistant", "content": "你好!有什么可以帮助你的吗?"}, {"role": "user", "content": "咖啡多少钱一杯"} ], "bot_id": "63069654-7750-409d-9a58-a0960d899a20" } 返回: { "success": true, "message": "成功保存 3 条消息", "session_id": "test-session-123", "saved_count": 3, "message_ids": ["uuid1", "uuid2", "uuid3"] } """ try: from agent.chat_history_manager import get_chat_history_manager # 参数验证 if not request.session_id: raise HTTPException(status_code=400, detail="session_id is required") if not request.messages or len(request.messages) == 0: raise HTTPException(status_code=400, detail="messages list is empty") # 转换消息格式 messages_dict = [ {"role": msg.role, "content": msg.content} for msg in request.messages ] manager = get_chat_history_manager() message_ids = await manager.manager.save_messages( session_id=request.session_id, messages=messages_dict, bot_id=request.bot_id ) # 过滤掉 None 值 valid_message_ids = [mid for mid in message_ids if mid is not None] saved_count = len(valid_message_ids) return BatchSaveChatResponse( success=True, message=f"成功保存 {saved_count} 条消息", session_id=request.session_id, saved_count=saved_count, message_ids=valid_message_ids ) except HTTPException: raise except Exception as e: import traceback error_details = traceback.format_exc() logger.error(f"Error in batch_save_chat_history: {str(e)}") logger.error(f"Full traceback: {error_details}") raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")