import json import os import asyncio from typing import Union, Optional from fastapi import APIRouter, HTTPException, Header from fastapi.responses import StreamingResponse from pydantic import BaseModel import logging logger = logging.getLogger('app') from utils import ( Message, ChatRequest, ChatResponse ) from agent.sharded_agent_manager import init_global_sharded_agent_manager from utils.api_models import ChatRequestV2 from utils.fastapi_utils import ( process_messages, create_project_directory, extract_api_key_from_auth, generate_v2_auth_token, fetch_bot_config, call_preamble_llm, get_preamble_text, create_stream_chunk ) from langchain_core.messages import AIMessageChunk, HumanMessage, ToolMessage, AIMessage from utils.settings import MAX_OUTPUT_TOKENS, MAX_CACHED_AGENTS, SHARD_COUNT from agent.agent_config import AgentConfig router = APIRouter() # 初始化全局助手管理器 agent_manager = init_global_sharded_agent_manager( max_cached_agents=MAX_CACHED_AGENTS, shard_count=SHARD_COUNT ) def append_user_last_message(messages: list, content: str) -> bool: """向最后一条用户消息追加内容 Args: messages: 消息列表 content: 要追加的内容 condition: 可选条件,如果提供则检查消息角色是否匹配此条件 Returns: bool: 是否成功追加内容 """ if not messages or len(messages) == 0: return messages last_message = messages[-1] if last_message and last_message.get('role') == 'user': messages[-1]['content'] += content return messages def append_assistant_last_message(messages: list, content: str) -> bool: """向最后一条用户消息追加内容 Args: messages: 消息列表 content: 要追加的内容 condition: 可选条件,如果提供则检查消息角色是否匹配此条件 Returns: bool: 是否成功追加内容 """ if not messages or len(messages) == 0: return messages last_message = messages[-1] if last_message and last_message.get('role') == 'assistant': messages[-1]['content'] += content else: messages.append({"role":"assistant","content":content}) return messages def get_user_last_message_content(messages: list) -> str: """获取最后一条用户消息的内容""" if not messages: return "" for msg in reversed(messages): if msg.get('role') == 'user': return msg.get('content', '') return "" def format_messages_to_chat_history(messages: list) -> str: """将消息格式化为聊天历史字符串""" chat_history = "" for msg in messages: role = msg.get('role', '') content = msg.get('content', '') if role == 'user': chat_history += f"用户: {content}\n" elif role == 'assistant': chat_history += f"助手: {content}\n" return chat_history async def enhanced_generate_stream_response( agent_manager, config: AgentConfig ): """增强的渐进式流式响应生成器 - 并发优化版本 Args: agent_manager: agent管理器 config: AgentConfig 对象,包含所有参数 """ try: # 创建输出队列和控制事件 output_queue = asyncio.Queue() preamble_completed = asyncio.Event() # Preamble 任务 async def preamble_task(): try: preamble_result = await call_preamble_llm(config) # 只有当preamble_text不为空且不为""时才输出 if preamble_result and preamble_result.strip() and preamble_result != "": preamble_content = f"[PREAMBLE]\n{preamble_result}\n" chunk_data = create_stream_chunk(f"chatcmpl-preamble", config.model_name, preamble_content) await output_queue.put(("preamble", f"data: {json.dumps(chunk_data, ensure_ascii=False)}\n\n")) logger.info(f"Stream mode: Generated preamble text ({len(preamble_result)} chars)") else: logger.info("Stream mode: Skipped empty preamble text") # 标记 preamble 完成 preamble_completed.set() await output_queue.put(("preamble_done", None)) except Exception as e: logger.error(f"Error generating preamble text: {e}") # 即使出错也要标记完成,避免阻塞 preamble_completed.set() await output_queue.put(("preamble_done", None)) # Agent 任务(准备 + 流式处理) async def agent_task(): try: # 准备 agent agent = await agent_manager.get_or_create_agent(config) # 开始流式处理 logger.info(f"Starting agent stream response") chunk_id = 0 message_tag = "" async for msg, metadata in agent.astream({"messages": config.messages}, stream_mode="messages", config=config.invoke_config(), max_tokens=MAX_OUTPUT_TOKENS): new_content = "" if isinstance(msg, AIMessageChunk): # 处理工具调用 if msg.tool_call_chunks: message_tag = "TOOL_CALL" for tool_call_chunk in msg.tool_call_chunks: if tool_call_chunk["name"]: new_content = f"[{message_tag}] {tool_call_chunk['name']}\n" if tool_call_chunk['args']: new_content += tool_call_chunk['args'] # 处理文本内容 elif msg.content: preamble_completed.set() await output_queue.put(("preamble_done", None)) meta_message_tag = metadata.get("message_tag", "ANSWER") if meta_message_tag != message_tag: message_tag = meta_message_tag new_content = f"[{meta_message_tag}]\n" if msg.text: new_content += msg.text # 处理工具响应 elif isinstance(msg, ToolMessage) and config.tool_response and msg.content: message_tag = "TOOL_RESPONSE" new_content = f"[{message_tag}] {msg.name}\n{msg.text}\n" # 发送内容块 if new_content: if chunk_id == 0: logger.info(f"Agent首个Token已生成, 开始流式输出") chunk_id += 1 chunk_data = create_stream_chunk(f"chatcmpl-{chunk_id}", config.model_name, new_content) await output_queue.put(("agent", f"data: {json.dumps(chunk_data, ensure_ascii=False)}\n\n")) # 发送最终chunk final_chunk = create_stream_chunk(f"chatcmpl-{chunk_id + 1}", config.model_name, finish_reason="stop") await output_queue.put(("agent", f"data: {json.dumps(final_chunk, ensure_ascii=False)}\n\n")) await output_queue.put(("agent_done", None)) except Exception as e: logger.error(f"Error in agent task: {e}") await output_queue.put(("agent_done", None)) # 并发执行任务 # 只有在 enable_thinking 为 True 时才执行 preamble 任务 if config.enable_thinking: preamble_task_handle = asyncio.create_task(preamble_task()) else: # 如果不启用 thinking,创建一个空的已完成任务 preamble_task_handle = asyncio.create_task(asyncio.sleep(0)) # 直接标记 preamble 完成 preamble_completed.set() agent_task_handle = asyncio.create_task(agent_task()) # 输出控制器:确保 preamble 先输出,然后是 agent stream preamble_output_done = False while True: try: # 设置超时避免无限等待 item_type, item_data = await asyncio.wait_for(output_queue.get(), timeout=1.0) if item_type == "preamble": # 立即输出 preamble 内容 if item_data: yield item_data preamble_output_done = True elif item_type == "preamble_done": # Preamble 已完成,标记并继续 preamble_output_done = True elif item_type == "agent": # Agent stream 内容,需要等待 preamble 输出完成 if preamble_output_done: yield item_data else: # preamble 还没输出,先放回队列 await output_queue.put((item_type, item_data)) # 等待 preamble 完成 await preamble_completed.wait() preamble_output_done = True elif item_type == "agent_done": # Agent stream 完成,结束循环 break except asyncio.TimeoutError: # 检查是否还有任务在运行 if all(task.done() for task in [preamble_task_handle, agent_task_handle]): # 所有任务都完成了,退出循环 break continue # 发送结束标记 yield "data: [DONE]\n\n" logger.info(f"Enhanced stream response completed") except Exception as e: import traceback error_details = traceback.format_exc() logger.error(f"Error in enhanced_generate_stream_response: {str(e)}") logger.error(f"Full traceback: {error_details}") error_data = { "error": { "message": f"Stream error: {str(e)}", "type": "internal_error" } } yield f"data: {json.dumps(error_data, ensure_ascii=False)}\n\n" async def create_agent_and_generate_response( config: AgentConfig ) -> Union[ChatResponse, StreamingResponse]: """创建agent并生成响应的公共逻辑 Args: config: AgentConfig 对象,包含所有参数 """ config.safe_print() config.preamble_text, config.system_prompt = get_preamble_text(config.language, config.system_prompt) # 如果是流式模式,使用增强的流式响应生成器 if config.stream: return StreamingResponse( enhanced_generate_stream_response( agent_manager=agent_manager, config=config ), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "Connection": "keep-alive"} ) messages = config.messages # 使用公共函数处理所有逻辑 agent = await agent_manager.get_or_create_agent(config) agent_responses = await agent.ainvoke({"messages": messages}, config=config.invoke_config(), max_tokens=MAX_OUTPUT_TOKENS) append_messages = agent_responses["messages"][len(messages):] response_text = "" for msg in append_messages: if isinstance(msg,AIMessage): if len(msg.text)>0: meta_message_tag = msg.additional_kwargs.get("message_tag", "ANSWER") output_text = msg.text.replace("","").replace("","") if meta_message_tag == "THINK" else msg.text response_text += f"[{meta_message_tag}]\n"+output_text+ "\n" if len(msg.tool_calls)>0: response_text += "".join([f"[TOOL_CALL] {tool['name']}\n{json.dumps(tool["args"]) if isinstance(tool["args"],dict) else tool["args"]}\n" for tool in msg.tool_calls]) elif isinstance(msg,ToolMessage) and config.tool_response: response_text += f"[TOOL_RESPONSE] {msg.name}\n{msg.text}\n" if len(response_text) > 0: # 构造OpenAI格式的响应 return ChatResponse( choices=[{ "index": 0, "message": { "role": "assistant", "content": response_text }, "finish_reason": "stop" }], usage={ "prompt_tokens": sum(len(msg.get("content", "")) for msg in messages), "completion_tokens": len(response_text), "total_tokens": sum(len(msg.get("content", "")) for msg in messages) + len(response_text) } ) else: raise HTTPException(status_code=500, detail="No response from agent") @router.post("/api/v1/chat/completions") async def chat_completions(request: ChatRequest, authorization: Optional[str] = Header(None)): """ Chat completions API similar to OpenAI, supports both streaming and non-streaming Args: request: ChatRequest containing messages, model, optional dataset_ids list, required bot_id, system_prompt, mcp_settings, and files authorization: Authorization header containing API key (Bearer ) Returns: Union[ChatResponse, StreamingResponse]: Chat completion response or stream Notes: - dataset_ids: 可选参数,当提供时必须是项目ID列表(单个项目也使用数组格式) - bot_id: 必需参数,机器人ID - 只有当 robot_type == "catalog_agent" 且 dataset_ids 为非空数组时才会创建机器人项目目录:projects/robot/{bot_id}/ - robot_type 为其他值(包括默认的 "agent")时不创建任何目录 - dataset_ids 为空数组 []、None 或未提供时不创建任何目录 - 支持多知识库合并,自动处理文件夹重名冲突 Required Parameters: - bot_id: str - 目标机器人ID - messages: List[Message] - 对话消息列表 Optional Parameters: - dataset_ids: List[str] - 源知识库项目ID列表(单个项目也使用数组格式) - robot_type: str - 机器人类型,默认为 "agent" Example: {"bot_id": "my-bot-001", "messages": [{"role": "user", "content": "Hello"}]} {"dataset_ids": ["project-123"], "bot_id": "my-bot-001", "messages": [{"role": "user", "content": "Hello"}]} {"dataset_ids": ["project-123", "project-456"], "bot_id": "my-bot-002", "messages": [{"role": "user", "content": "Hello"}]} {"dataset_ids": ["project-123"], "bot_id": "my-catalog-bot", "robot_type": "catalog_agent", "messages": [{"role": "user", "content": "Hello"}]} """ try: # v1接口:从Authorization header中提取API key作为模型API密钥 api_key = extract_api_key_from_auth(authorization) # 获取bot_id(必需参数) bot_id = request.bot_id if not bot_id: raise HTTPException(status_code=400, detail="bot_id is required") # 创建项目目录(如果有dataset_ids且不是agent类型) project_dir = create_project_directory(request.dataset_ids, bot_id, request.robot_type) # 收集额外参数作为 generate_cfg exclude_fields = {'messages', 'model', 'model_server', 'dataset_ids', 'language', 'tool_response', 'system_prompt', 'mcp_settings' ,'stream', 'robot_type', 'bot_id', 'user_identifier', 'session_id', 'enable_thinking'} generate_cfg = {k: v for k, v in request.model_dump().items() if k not in exclude_fields} # 处理消息 messages = process_messages(request.messages, request.language) # 创建 AgentConfig 对象 config = AgentConfig.from_v1_request(request, api_key, project_dir, generate_cfg, messages) # 调用公共的agent创建和响应生成逻辑 return await create_agent_and_generate_response( config=config ) except Exception as e: import traceback error_details = traceback.format_exc() logger.error(f"Error in chat_completions: {str(e)}") logger.error(f"Full traceback: {error_details}") raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.post("/api/v2/chat/completions") async def chat_completions_v2(request: ChatRequestV2, authorization: Optional[str] = Header(None)): """ Chat completions API v2 with simplified parameters. Only requires messages, stream, tool_response, bot_id, and language parameters. Other parameters are fetched from the backend bot configuration API. Args: request: ChatRequestV2 containing only essential parameters authorization: Authorization header for authentication (different from v1) Returns: Union[ChatResponse, StreamingResponse]: Chat completion response or stream Required Parameters: - bot_id: str - 目标机器人ID - messages: List[Message] - 对话消息列表 Optional Parameters: - stream: bool - 是否流式输出,默认false - tool_response: bool - 是否包含工具响应,默认false - language: str - 回复语言,默认"ja" Authentication: - Requires valid MD5 hash token: MD5(MASTERKEY:bot_id) - Authorization header should contain: Bearer {token} - Uses MD5 hash of MASTERKEY:bot_id for backend API authentication - Optionally uses API key from bot config for model access """ try: # 获取bot_id(必需参数) bot_id = request.bot_id if not bot_id: raise HTTPException(status_code=400, detail="bot_id is required") # v2接口鉴权验证 expected_token = generate_v2_auth_token(bot_id) provided_token = extract_api_key_from_auth(authorization) if not provided_token: raise HTTPException( status_code=401, detail="Authorization header is required for v2 API" ) if provided_token != expected_token: raise HTTPException( status_code=403, detail=f"Invalid authentication token. Expected: {expected_token[:8]}..., Provided: {provided_token[:8]}..." ) # 从后端API获取机器人配置(使用v2的鉴权方式) bot_config = await fetch_bot_config(bot_id) # 创建项目目录(从后端配置获取dataset_ids) project_dir = create_project_directory( bot_config.get("dataset_ids", []), bot_id, bot_config.get("robot_type", "general_agent") ) # 处理消息 messages = process_messages(request.messages, request.language) # 创建 AgentConfig 对象 config = AgentConfig.from_v2_request(request, bot_config, project_dir, messages) # 调用公共的agent创建和响应生成逻辑 return await create_agent_and_generate_response( config=config ) except HTTPException: raise except Exception as e: import traceback error_details = traceback.format_exc() logger.error(f"Error in chat_completions_v2: {str(e)}") logger.error(f"Full traceback: {error_details}") raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")