import json import os import asyncio import shutil import time import traceback from typing import Union, Optional, Any, List, Dict from fastapi import APIRouter, HTTPException, Header, Body from fastapi.responses import StreamingResponse import logging logger = logging.getLogger('app') from utils import ( Message, ChatRequest, ChatResponse, BatchSaveChatRequest, BatchSaveChatResponse ) from utils.api_models import ChatRequestV2 from utils.fastapi_utils import ( process_messages, create_project_directory, extract_api_key_from_auth, generate_v2_auth_token, fetch_bot_config, call_preamble_llm, create_stream_chunk, extract_text_from_content ) from langchain_core.messages import AIMessageChunk, ToolMessage, AIMessage, HumanMessage from utils.settings import MAX_OUTPUT_TOKENS from agent.agent_config import AgentConfig from agent.deep_assistant import init_agent from utils.daytona_sync import sync_sandbox_to_local from utils.settings import DAYTONA_ENABLED from utils.structured_log import emit_question_metric router = APIRouter() async def enhanced_generate_stream_response( config: AgentConfig ): """Enhanced progressive streaming response generator - concurrency-optimized version Args: agent: LangChain agent object config: AgentConfig object containing all parameters """ # Collect the full response content for saving to the database full_response_content = [] # Cancellation management cancel_event = None request_started_at = config.request_started_at or time.monotonic() try: # Create output queue and control events output_queue = asyncio.Queue() preamble_completed = asyncio.Event() # Register cancellation event if config.session_id: from utils.cancel_manager import register_cancel_event, unregister_cancel_event cancel_event = register_cancel_event(config.session_id) # Save user message before streaming starts if config.session_id: asyncio.create_task(_save_user_messages(config)) # Preamble task async def preamble_task(): try: preamble_result = await call_preamble_llm(config) # Only output when preamble_text is non-empty and not "" if preamble_result and preamble_result.strip() and preamble_result != "": preamble_content = f"[PREAMBLE]\n{preamble_result}\n" chunk_data = create_stream_chunk(f"chatcmpl-preamble", config.model_name, preamble_content) await output_queue.put(("preamble", f"data: {json.dumps(chunk_data, ensure_ascii=False)}\n\n")) logger.info(f"Stream mode: Generated preamble text ({len(preamble_result)} chars)") else: logger.info("Stream mode: Skipped empty preamble text") # Mark preamble as completed preamble_completed.set() await output_queue.put(("preamble_done", None)) except Exception as e: logger.error(f"Error generating preamble text: {e}") # Mark completion even on error to avoid blocking preamble_completed.set() await output_queue.put(("preamble_done", None)) # Agent task (setup + streaming) async def agent_task(): checkpointer = None try: # Start streaming logger.info(f"Starting agent stream response") chunk_id = 0 message_tag = "" current_tool_name = "" last_answer_first_char_duration_ms = None waiting_for_answer_first_char = False agent, checkpointer, sandbox = await init_agent(config) async for msg, metadata in agent.astream({"messages": config.messages}, stream_mode="messages", config=config.invoke_config(), max_tokens=MAX_OUTPUT_TOKENS): # Check whether a cancellation signal was received if cancel_event and cancel_event.is_set(): logger.info(f"Agent stream cancelled for session_id={config.session_id}") break new_content = "" if isinstance(msg, AIMessageChunk): # Handle tool calls if msg.tool_call_chunks: message_tag = "TOOL_CALL" waiting_for_answer_first_char = False for tool_call_chunk in msg.tool_call_chunks: chunk_name = tool_call_chunk.get("name") if isinstance(tool_call_chunk, dict) else getattr(tool_call_chunk, "name", None) chunk_args = tool_call_chunk.get("args") if isinstance(tool_call_chunk, dict) else getattr(tool_call_chunk, "args", None) if chunk_name: current_tool_name = chunk_name if config.tool_response: if chunk_name: new_content = f"[{message_tag}] {chunk_name}\n" if chunk_args: new_content += chunk_args # Handle text content elif msg.content: preamble_completed.set() await output_queue.put(("preamble_done", None)) meta_message_tag = metadata.get("message_tag", "ANSWER") # Do not output content for SUMMARY if meta_message_tag == "SUMMARY": continue if meta_message_tag != message_tag: message_tag = meta_message_tag waiting_for_answer_first_char = meta_message_tag == "ANSWER" new_content = f"[{meta_message_tag}]\n" if msg.text: if meta_message_tag == "ANSWER" and waiting_for_answer_first_char and msg.text.strip(): last_answer_first_char_duration_ms = max( int((time.monotonic() - request_started_at) * 1000), 0, ) waiting_for_answer_first_char = False new_content += msg.text # Handle tool responses elif isinstance(msg, ToolMessage) and msg.content: message_tag = "TOOL_RESPONSE" waiting_for_answer_first_char = False # Always output MCP App responses even when tool_response is disabled is_ui_resource = ( msg.text and msg.text.lstrip().startswith('{"') and '"type":"app"' in msg.text ) if config.tool_response or is_ui_resource: new_content = f"[{message_tag}] {msg.name}\n{msg.text}\n" # Collect full content if new_content: full_response_content.append(new_content) # Send content chunk if chunk_id == 0: logger.info("Agent first token generated, starting stream output") chunk_id += 1 chunk_data = create_stream_chunk(f"chatcmpl-{chunk_id}", config.model_name, new_content) await output_queue.put(("agent", f"data: {json.dumps(chunk_data, ensure_ascii=False)}\n\n")) # Send final chunk finish = "cancelled" if (cancel_event and cancel_event.is_set()) else "stop" if last_answer_first_char_duration_ms is not None: emit_question_metric( stage="catalog_agent.final_answer_first_char", status="cancel" if finish == "cancelled" else "success", duration_ms=last_answer_first_char_duration_ms, first_response_time_ms=last_answer_first_char_duration_ms, trace_id=config.trace_id, ai_id=config.bot_id, session_id=config.session_id, robot_type="agent", model=config.model_name, stream=config.stream, extra={ "bot_id": config.bot_id, "tool_response": config.tool_response, "enable_thinking": config.enable_thinking, "response_mode": "final_answer_first_char", }, ) final_chunk = create_stream_chunk(f"chatcmpl-{chunk_id + 1}", config.model_name, finish_reason=finish) await output_queue.put(("agent", f"data: {json.dumps(final_chunk, ensure_ascii=False)}\n\n")) # ============ Execute PostAgent hooks ============ # Note: runs in a separate async task here and does not block streaming full_response = "".join(full_response_content) asyncio.create_task(_execute_post_agent_hooks(config, full_response, sandbox)) # =========================================== await output_queue.put(("agent_done", None)) except Exception as e: logger.error(f"Error in agent task: {e}\n{traceback.format_exc()}") # Send error information to the client await output_queue.put( ("agent", f"data: {json.dumps({'error': str(e)}, ensure_ascii=False)}\n\n") ) # Send completion signal to ensure the output controller exits normally await output_queue.put(("agent_done", None)) # Execute tasks concurrently # Only run the preamble task when enable_thinking is True if config.enable_thinking: preamble_task_handle = asyncio.create_task(preamble_task()) else: # If thinking is disabled, create an empty completed task preamble_task_handle = asyncio.create_task(asyncio.sleep(0)) # Mark preamble as completed directly preamble_completed.set() agent_task_handle = asyncio.create_task(agent_task()) # Output controller: ensure preamble is emitted before the agent stream preamble_output_done = False last_yield_time = time.time() while True: try: # Set a timeout to avoid waiting forever item_type, item_data = await asyncio.wait_for(output_queue.get(), timeout=1.0) if item_type == "preamble": # Emit preamble content immediately if item_data: yield item_data last_yield_time = time.time() preamble_output_done = True elif item_type == "preamble_done": # Preamble completed, mark it and continue preamble_output_done = True elif item_type == "agent": # Agent stream content must wait until preamble output is finished if preamble_output_done: yield item_data last_yield_time = time.time() else: # Preamble has not been emitted yet, put it back into the queue first await output_queue.put((item_type, item_data)) # Wait for preamble completion await preamble_completed.wait() preamble_output_done = True elif item_type == "agent_done": # Agent stream finished, end the loop break except asyncio.TimeoutError: # Check whether a cancellation signal was received if cancel_event and cancel_event.is_set(): logger.info(f"Output loop cancelled for session_id={config.session_id}") break # Check whether any tasks are still running if all(task.done() for task in [preamble_task_handle, agent_task_handle]): # All tasks are done, exit the loop break # Only send a heartbeat if no messages have been output for 15 seconds to keep the connection alive if time.time() - last_yield_time >= 15: heartbeat_chunk = create_stream_chunk(f"chatcmpl-heartbeat", config.model_name, "") yield f"data: {json.dumps(heartbeat_chunk, ensure_ascii=False)}\n\n" last_yield_time = time.time() continue # Send end marker yield "data: [DONE]\n\n" # Clean up cancellation event if config.session_id: from utils.cancel_manager import unregister_cancel_event unregister_cancel_event(config.session_id) logger.info(f"Enhanced stream response completed") # Save AI response after streaming ends if full_response_content and config.session_id: asyncio.create_task(_save_assistant_response(config, "".join(full_response_content))) except Exception as e: logger.error(f"Error in enhanced_generate_stream_response: {e}") yield f'data: {{"error": "{str(e)}"}}\n\n' yield "data: [DONE]\n\n" # Clean up cancellation event if config.session_id: from utils.cancel_manager import unregister_cancel_event unregister_cancel_event(config.session_id) async def create_agent_and_generate_response( config: AgentConfig ) -> Union[ChatResponse, StreamingResponse]: """Shared logic for creating an agent and generating a response Args: config: AgentConfig object containing all parameters """ # Use the enhanced streaming response generator for streaming mode if config.stream: return StreamingResponse( enhanced_generate_stream_response(config), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "Connection": "keep-alive"} ) agent, checkpointer, sandbox = await init_agent(config) # Use the updated messages agent_responses = await agent.ainvoke({"messages": config.messages}, config=config.invoke_config(), max_tokens=MAX_OUTPUT_TOKENS) # ============ Execute PostAgent hooks ============ # Note: runs in a separate async task here and does not block non-streaming responses asyncio.create_task(_execute_post_agent_hooks(config, "", sandbox)) # =========================================== # Scan backward for the first HumanMessage, then send everything after it to append_messages all_messages = agent_responses["messages"] first_human_idx = None for i in range(len(all_messages) - 1, -1, -1): if isinstance(all_messages[i], HumanMessage): first_human_idx = i break if first_human_idx is not None: append_messages = all_messages[first_human_idx + 1:] else: # If no HumanMessage is found, use all messages append_messages = all_messages response_text = "" for msg in append_messages: if isinstance(msg,AIMessage): if len(msg.text)>0: meta_message_tag = msg.additional_kwargs.get("message_tag", "ANSWER") if meta_message_tag == "SUMMARY": continue output_text = msg.text.replace("````","").replace("````","") if meta_message_tag == "THINK" else msg.text response_text += f"[{meta_message_tag}]\n"+output_text+ "\n" if len(msg.tool_calls)>0 and config.tool_response: response_text += "".join([f"[TOOL_CALL] {tool['name']}\n{json.dumps(tool["args"]) if isinstance(tool["args"],dict) else tool["args"]}\n" for tool in msg.tool_calls]) elif isinstance(msg,ToolMessage) and config.tool_response: response_text += f"[TOOL_RESPONSE] {msg.name}\n{msg.text}\n" if len(response_text) > 0: # Build the OpenAI-format response result = ChatResponse( choices=[{ "index": 0, "message": { "role": "assistant", "content": response_text }, "finish_reason": "stop" }], usage={ "prompt_tokens": sum(len(extract_text_from_content(msg.get("content", ""))) for msg in config.messages), "completion_tokens": len(response_text), "total_tokens": sum(len(extract_text_from_content(msg.get("content", ""))) for msg in config.messages) + len(response_text) } ) # Save chat history to the database (same logic as the streaming endpoint) await _save_user_messages(config) await _save_assistant_response(config, response_text) else: raise HTTPException(status_code=500, detail="No response from agent") return result async def _save_user_messages(config: AgentConfig) -> None: """ Save the last user message (for both streaming and non-streaming endpoints). Args: config: AgentConfig object """ # Save only when session_id exists if not config.session_id: return try: from agent.chat_history_manager import get_chat_history_manager from agent.plugin_hook_loader import execute_hooks manager = get_chat_history_manager() # Save only the last user message for msg in reversed(config.messages): if isinstance(msg, dict): role = msg.get("role", "") content = msg.get("content", "") # Flatten multimodal list content to plain text before persisting, # so base64 image data is not stored in chat history. content = extract_text_from_content(content) if role == "user" and content: # ============ Execute PreSave hooks ============ processed_content = await execute_hooks('PreSave', config, content=content, role=role) if processed_content: content = processed_content # ================================================ await manager.manager.save_message( session_id=config.session_id, role=role, content=content, bot_id=config.bot_id, user_identifier=config.user_identifier ) break # Save only the last one, then exit logger.debug(f"Saved last user message for session_id={config.session_id}") except Exception as e: # Save failure should not affect the main flow logger.error(f"Failed to save user messages: {e}") async def _save_assistant_response(config: AgentConfig, assistant_response: str) -> None: """ Save the AI assistant response (for both streaming and non-streaming endpoints). Args: config: AgentConfig object assistant_response: AI assistant response content """ # Save only when session_id exists if not config.session_id: return if not assistant_response: return try: from agent.chat_history_manager import get_chat_history_manager from agent.plugin_hook_loader import execute_hooks manager = get_chat_history_manager() # ============ Execute PreSave hooks ============ processed_response = await execute_hooks('PreSave', config, content=assistant_response, role='assistant') if processed_response: assistant_response = processed_response # ================================================ # Save the AI assistant response await manager.manager.save_message( session_id=config.session_id, role="assistant", content=assistant_response, bot_id=config.bot_id, user_identifier=config.user_identifier ) logger.debug(f"Saved assistant response for session_id={config.session_id}") except Exception as e: # Save failure should not affect the main flow logger.error(f"Failed to save assistant response: {e}") async def _execute_post_agent_hooks(config: AgentConfig, response: str, sandbox=None) -> None: """ Execute PostAgent hooks (after agent execution). Args: config: AgentConfig object response: Full response content from the agent sandbox: Optional DaytonaSandbox instance used for reverse file sync """ try: from agent.plugin_hook_loader import execute_hooks metadata = { "bot_id": config.bot_id, "user_identifier": config.user_identifier, "session_id": config.session_id, "language": config.language, } await execute_hooks('PostAgent', config, response=response, metadata=metadata) logger.debug(f"Executed PostAgent hooks for session_id={config.session_id}") except Exception as e: # Hook execution failure should not affect the main flow logger.error(f"Failed to execute PostAgent hooks: {e}") # Clean up the executable_code/tmp folder await _cleanup_tmp_folder(config) # Daytona: reverse-sync sandbox files to local if sandbox is not None and DAYTONA_ENABLED: try: from pathlib import Path local_workspace = str(Path.cwd() / "projects" / "robot" / config.bot_id) sync_sandbox_to_local(sandbox, local_workspace) except Exception as e: logger.error(f"Failed to sync sandbox to local: {e}") async def _cleanup_tmp_folder(config: AgentConfig) -> None: """ Clean files older than 3 days from the executable_code/tmp folder. Args: config: AgentConfig object """ try: if config.project_dir and config.bot_id: tmp_dir = os.path.join(config.project_dir, "executable_code", "tmp") if os.path.exists(tmp_dir): # Seconds for 3 days ago (3 * 24 * 60 * 60 = 259200) three_days_ago = time.time() - (3 * 24 * 60 * 60) deleted_count = 0 for item in os.listdir(tmp_dir): item_path = os.path.join(tmp_dir, item) # Check modification time if os.path.getmtime(item_path) < three_days_ago: if os.path.isfile(item_path) or os.path.islink(item_path): os.remove(item_path) else: shutil.rmtree(item_path) deleted_count += 1 logger.debug(f"Deleted old item: {item_path}") logger.info(f"Cleaned up {deleted_count} old item(s) from tmp folder: {tmp_dir}") except Exception as e: # Cleanup failure should not affect the main flow logger.error(f"Failed to cleanup tmp folder: {e}") @router.post("/api/v1/chat/completions") async def chat_completions(request: ChatRequest, authorization: Optional[str] = Header(None)): """ Chat completions API similar to OpenAI, supports both streaming and non-streaming Args: request: ChatRequest containing messages, model, optional dataset_ids list, required bot_id, system_prompt, mcp_settings, and files authorization: Authorization header containing API key (Bearer ) Returns: Union[ChatResponse, StreamingResponse]: Chat completion response or stream Notes: - dataset_ids: optional parameter; when provided it must be a project ID list (use array format even for a single project) - bot_id: required parameter, robot ID - no directories are created when dataset_ids is an empty array [], None, or omitted - supports multi-knowledge-base merging and automatically resolves duplicate folder names Required Parameters: - bot_id: str - target robot ID - messages: List[Message] - conversation message list Optional Parameters: - dataset_ids: List[str] - source knowledge-base project ID list (use array format even for a single project) Example: {"bot_id": "my-bot-001", "messages": [{"role": "user", "content": "Hello"}]} {"dataset_ids": ["project-123"], "bot_id": "my-bot-001", "messages": [{"role": "user", "content": "Hello"}]} {"dataset_ids": ["project-123", "project-456"], "bot_id": "my-bot-002", "messages": [{"role": "user", "content": "Hello"}]} {"dataset_ids": ["project-123"], "bot_id": "my-catalog-bot", "messages": [{"role": "user", "content": "Hello"}]} """ request_started_at = time.monotonic() try: # v1 endpoint: extract the API key from the Authorization header as the model API key api_key = extract_api_key_from_auth(authorization) # Get bot_id (required parameter) bot_id = request.bot_id if not bot_id: raise HTTPException(status_code=400, detail="bot_id is required") # Create project directory (if dataset_ids exist and the type is not agent) project_dir = create_project_directory(request.dataset_ids, bot_id, request.skills) # Collect extra parameters as generate_cfg exclude_fields = {'messages', 'model', 'model_server', 'dataset_ids', 'language', 'tool_response', 'system_prompt', 'mcp_settings' ,'stream', 'robot_type', 'bot_id', 'user_identifier', 'session_id', 'enable_thinking', 'skills', 'enable_memory', 'enable_self_knowledge', 'n', 'shell_env', 'max_tokens'} generate_cfg = {k: v for k, v in request.model_dump().items() if k not in exclude_fields} logger.info("chat_completions generate_cfg_keys=%s model=%s", list(generate_cfg.keys()), request.model) # Process messages messages = process_messages(request.messages, request.language) # Create AgentConfig object config = await AgentConfig.from_v1_request(request, api_key, project_dir, generate_cfg, messages) config.request_started_at = request_started_at # Call the shared agent creation and response generation logic return await create_agent_and_generate_response(config) except Exception as e: import traceback error_details = traceback.format_exc() logger.error(f"Error in chat_completions: {str(e)}") logger.error(f"Full traceback: {error_details}") raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.post("/api/v1/chat/warmup") async def chat_warmup_v1(request: ChatRequest, authorization: Optional[str] = Header(None)): """ Warmup endpoint - initializes the agent without processing messages for faster subsequent requests. Args: request: ChatRequest containing configuration (messages will be ignored for warmup) authorization: Authorization header containing API key (Bearer ) Returns: JSON response with warmup status and cache key Required Parameters: - bot_id: str - target robot ID Notes: - this endpoint pre-generates and caches the agent so later chat requests can reuse it - the messages parameter is not processed during warmup and is used only for configuration validation - the warmed-up agent generates a unique cache key based on the provided configuration parameters """ try: # v1 endpoint: extract the API key from the Authorization header as the model API key api_key = extract_api_key_from_auth(authorization) # Get bot_id (required parameter) bot_id = request.bot_id if not bot_id: raise HTTPException(status_code=400, detail="bot_id is required") # Create project directory (if dataset_ids exist and the type is not agent) project_dir = create_project_directory(request.dataset_ids, bot_id, request.skills) # Collect extra parameters as generate_cfg exclude_fields = {'messages', 'model', 'model_server', 'dataset_ids', 'language', 'tool_response', 'system_prompt', 'mcp_settings' ,'stream', 'robot_type', 'bot_id', 'user_identifier', 'session_id', 'enable_thinking', 'skills', 'enable_memory', 'enable_self_knowledge', 'n', 'shell_env'} generate_cfg = {k: v for k, v in request.model_dump().items() if k not in exclude_fields} # Create an empty message list for warmup (actual messages are not processed during warmup) empty_messages = [] # Process messages (even if empty) messages = process_messages(empty_messages, request.language or "ja") # Create AgentConfig object config = await AgentConfig.from_v1_request(request, api_key, project_dir, generate_cfg, messages) # Warm up the mcp_tools cache logger.info(f"Warming up mcp_tools for bot_id: {bot_id}") from agent.deep_assistant import get_tools_from_mcp from agent.prompt_loader import load_mcp_settings_async # Load mcp_settings final_mcp_settings = await load_mcp_settings_async(config) mcp_settings = final_mcp_settings if final_mcp_settings else [] if not isinstance(mcp_settings, list) or len(mcp_settings) == 0: mcp_settings = [] # Warm up mcp_tools (cache logic is already built into get_tools_from_mcp) mcp_tools = await get_tools_from_mcp(mcp_settings) return { "status": "warmed_up", "bot_id": bot_id, "mcp_tools_count": len(mcp_tools), "message": "MCP tools have been cached successfully" } except Exception as e: import traceback error_details = traceback.format_exc() logger.error(f"Error in chat_warmup_v1: {str(e)}") logger.error(f"Full traceback: {error_details}") raise HTTPException(status_code=500, detail=f"Warmup failed: {str(e)}") @router.post("/api/v2/chat/warmup") async def chat_warmup_v2(request: ChatRequestV2, authorization: Optional[str] = Header(None)): """ Warmup endpoint v2 - initializes the agent without processing messages for faster subsequent requests. Uses the same authentication and configuration retrieval flow as /api/v2/chat/completions. Args: request: ChatRequestV2 containing essential parameters (messages will be ignored for warmup) authorization: Authorization header for authentication (same as v2 chat endpoint) Returns: JSON response with warmup status and cache key Required Parameters: - bot_id: str - target robot ID Authentication: - Requires valid MD5 hash token: MD5(MASTERKEY:bot_id) - Authorization header should contain: Bearer {token} Notes: - this endpoint pre-generates and caches the agent so later chat requests can reuse it - the messages parameter is not processed during warmup and is used only for configuration validation - the warmed-up agent generates a unique cache key based on the full configuration fetched from the backend """ try: # Get bot_id (required parameter) bot_id = request.bot_id if not bot_id: raise HTTPException(status_code=400, detail="bot_id is required") # v2 endpoint authentication validation (same auth logic as chat_completions_v2) expected_token = generate_v2_auth_token(bot_id) provided_token = extract_api_key_from_auth(authorization) if not provided_token: raise HTTPException( status_code=401, detail="Authorization header is required for v2 API" ) if provided_token != expected_token: raise HTTPException( status_code=403, detail=f"Invalid authentication token. Expected: {expected_token[:8]}..., Provided: {provided_token[:8]}..." ) # Fetch robot configuration from the backend API (using the v2 auth method) bot_config = await fetch_bot_config(bot_id) # Create project directory (using dataset_ids and skills from backend configuration) project_dir = create_project_directory( bot_config.get("dataset_ids", []), bot_id, bot_config.get("skills") ) # Create an empty message list for warmup (actual messages are not processed during warmup) empty_messages = [] # Process messages messages = process_messages(empty_messages, request.language or "ja") exclude_fields = {'messages', 'dataset_ids', 'language', 'tool_response', 'system_prompt', 'mcp_settings', 'stream', 'robot_type', 'bot_id', 'user_identifier', 'session_id', 'enable_thinking', 'skills', 'enable_memory', 'enable_self_knowledge', 'n', 'model', 'model_server', 'api_key', 'shell_env', 'max_tokens'} generate_cfg = {k: v for k, v in request.model_dump().items() if k not in exclude_fields} logger.info("chat_warmup_v2 generate_cfg_keys=%s requested_model=%s", list(generate_cfg.keys()), request.model) # Extract model/model_server/api_key from the request, with higher priority than bot_config (excluding "whatever" and empty values) req_data = request.model_dump() req_model = req_data.get("model") or "" req_model_server = req_data.get("model_server") or "" req_api_key = req_data.get("api_key") or "" model_name = req_model if req_model and req_model != "whatever" else None model_server = req_model_server if req_model_server and req_model_server != "whatever" else None api_key = req_api_key if req_api_key and req_api_key != "whatever" else None # Create AgentConfig object config = await AgentConfig.from_v2_request(request, bot_config, project_dir, messages, generate_cfg, model_name=model_name, model_server=model_server, api_key=api_key) # Warm up the mcp_tools cache logger.info(f"Warming up mcp_tools for bot_id: {bot_id}") from agent.deep_assistant import get_tools_from_mcp from agent.prompt_loader import load_mcp_settings_async # Load mcp_settings final_mcp_settings = await load_mcp_settings_async(config) mcp_settings = final_mcp_settings if final_mcp_settings else [] if not isinstance(mcp_settings, list) or len(mcp_settings) == 0: mcp_settings = [] # Warm up mcp_tools (cache logic is already built into get_tools_from_mcp) mcp_tools = await get_tools_from_mcp(mcp_settings) return { "status": "warmed_up", "bot_id": bot_id, "mcp_tools_count": len(mcp_tools), "message": "MCP tools have been cached successfully" } except HTTPException: raise except Exception as e: import traceback error_details = traceback.format_exc() logger.error(f"Error in chat_warmup_v2: {str(e)}") logger.error(f"Full traceback: {error_details}") raise HTTPException(status_code=500, detail=f"Warmup failed: {str(e)}") @router.post("/api/v2/chat/completions") async def chat_completions_v2(request: ChatRequestV2, authorization: Optional[str] = Header(None)): """ Chat completions API v2 with simplified parameters. Only requires messages, stream, tool_response, bot_id, and language parameters. Other parameters are fetched from the backend bot configuration API. Args: request: ChatRequestV2 containing only essential parameters authorization: Authorization header for authentication (different from v1) Returns: Union[ChatResponse, StreamingResponse]: Chat completion response or stream Required Parameters: - bot_id: str - target robot ID - messages: List[Message] - conversation message list Optional Parameters: - stream: bool - whether to stream output, default false - tool_response: bool - whether to include tool responses, default false - language: str - response language, default "ja" Authentication: - Requires valid MD5 hash token: MD5(MASTERKEY:bot_id) - Authorization header should contain: Bearer {token} - Uses MD5 hash of MASTERKEY:bot_id for backend API authentication - Optionally uses API key from bot config for model access """ request_started_at = time.monotonic() try: # Get bot_id (required parameter) bot_id = request.bot_id if not bot_id: raise HTTPException(status_code=400, detail="bot_id is required") # v2 endpoint authentication validation expected_token = generate_v2_auth_token(bot_id) provided_token = extract_api_key_from_auth(authorization) if not provided_token: raise HTTPException( status_code=401, detail="Authorization header is required for v2 API" ) if provided_token != expected_token: raise HTTPException( status_code=403, detail=f"Invalid authentication token. Expected: {expected_token[:8]}..., Provided: {provided_token[:8]}..." ) # Fetch robot configuration from the backend API (using the v2 auth method) bot_config = await fetch_bot_config(bot_id) # Create project directory (using dataset_ids and skills from backend configuration) project_dir = create_project_directory( bot_config.get("dataset_ids", []), bot_id, bot_config.get("skills") ) # Process messages messages = process_messages(request.messages, request.language) # Collect extra parameters as generate_cfg exclude_fields = {'messages', 'dataset_ids', 'language', 'tool_response', 'system_prompt', 'mcp_settings', 'stream', 'robot_type', 'bot_id', 'user_identifier', 'session_id', 'enable_thinking', 'skills', 'enable_memory', 'enable_self_knowledge', 'n', 'model', 'model_server', 'api_key', 'shell_env', 'max_tokens'} generate_cfg = {k: v for k, v in request.model_dump().items() if k not in exclude_fields} logger.info("chat_completions_v2 generate_cfg_keys=%s requested_model=%s", list(generate_cfg.keys()), request.model) # Extract model/model_server/api_key from the request, with higher priority than bot_config (excluding "whatever" and empty values) req_data = request.model_dump() req_model = req_data.get("model") or "" req_model_server = req_data.get("model_server") or "" req_api_key = req_data.get("api_key") or "" model_name = req_model if req_model and req_model != "whatever" else None model_server = req_model_server if req_model_server and req_model_server != "whatever" else None api_key = req_api_key if req_api_key and req_api_key != "whatever" else None # Create AgentConfig object config = await AgentConfig.from_v2_request(request, bot_config, project_dir, messages, generate_cfg, model_name=model_name, model_server=model_server, api_key=api_key) config.request_started_at = request_started_at # Call the shared agent creation and response generation logic return await create_agent_and_generate_response(config) except HTTPException: raise except Exception as e: import traceback error_details = traceback.format_exc() logger.error(f"Error in chat_completions_v2: {str(e)}") logger.error(f"Full traceback: {error_details}") raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.post("/api/v1/chat/cancel") async def cancel_chat(session_id: str = Body(..., embed=True)): """ Cancel an ongoing agent inference. Request body: {"session_id": "xxxxx"} Response: {"success": true/false, "message": "..."} """ from utils.cancel_manager import trigger_cancel if not session_id: raise HTTPException(status_code=400, detail="session_id is required") found = trigger_cancel(session_id) if found: return {"success": True, "message": f"Cancel signal sent for session_id={session_id}"} else: return {"success": False, "message": f"No active inference found for session_id={session_id}"} # ============================================================================ # Chat history query endpoints # ============================================================================ @router.get("/api/v1/chat/history", response_model=dict) async def get_chat_history( session_id: str, last_message_id: Optional[str] = None, limit: int = 20 ): """ Get chat history records. Query from the dedicated chat history table and return full original messages (not affected by checkpoint summary). Parameters: session_id: Session ID last_message_id: ID of the last message from the previous page, used to fetch older messages limit: Number of messages returned per request, default 20, maximum 100 Returns: { "messages": [ { "id": "unique message ID", "role": "user or assistant", "content": "message content", "timestamp": "ISO 8601 formatted timestamp" }, ... ], "has_more": true/false // whether more history messages are available } """ try: from agent.chat_history_manager import get_chat_history_manager # Parameter validation limit = min(max(1, limit), 100) manager = get_chat_history_manager() result = await manager.manager.get_history_by_message_id( session_id=session_id, last_message_id=last_message_id, limit=limit ) return { "messages": result["messages"], "has_more": result["has_more"] } except Exception as e: import traceback error_details = traceback.format_exc() logger.error(f"Error in get_chat_history: {str(e)}") logger.error(f"Full traceback: {error_details}") raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.post("/api/v1/chat/history/batch", response_model=BatchSaveChatResponse) async def batch_save_chat_history(request: BatchSaveChatRequest): """ Save chat history in batch. Supports custom batch saving of multiple chat messages to the database. Parameters: session_id: Session ID messages: List of messages to save, each containing role and content bot_id: Robot ID (optional) Request body example: { "session_id": "test-session-123", "messages": [ {"role": "user", "content": "你好"}, {"role": "assistant", "content": "你好!有什么可以帮助你的吗?"}, {"role": "user", "content": "咖啡多少钱一杯"} ], "bot_id": "63069654-7750-409d-9a58-a0960d899a20" } Returns: { "success": true, "message": "Successfully saved 3 messages", "session_id": "test-session-123", "saved_count": 3, "message_ids": ["uuid1", "uuid2", "uuid3"] } """ try: from agent.chat_history_manager import get_chat_history_manager # Parameter validation if not request.session_id: raise HTTPException(status_code=400, detail="session_id is required") if not request.messages or len(request.messages) == 0: raise HTTPException(status_code=400, detail="messages list is empty") # Convert message format messages_dict = [ {"role": msg.role, "content": msg.content} for msg in request.messages ] manager = get_chat_history_manager() message_ids = await manager.manager.save_messages( session_id=request.session_id, messages=messages_dict, bot_id=request.bot_id ) # Filter out None values valid_message_ids = [mid for mid in message_ids if mid is not None] saved_count = len(valid_message_ids) return BatchSaveChatResponse( success=True, message=f"Successfully saved {saved_count} messages", session_id=request.session_id, saved_count=saved_count, message_ids=valid_message_ids ) except HTTPException: raise except Exception as e: import traceback error_details = traceback.format_exc() logger.error(f"Error in batch_save_chat_history: {str(e)}") logger.error(f"Full traceback: {error_details}") raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")