From 6bad1743b3d18bb2563c841340eda6fc33a37d96 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E6=BD=AE?= Date: Thu, 18 Dec 2025 00:38:04 +0800 Subject: [PATCH] warm_up --- agent/agent_config.py | 12 ++- agent/checkpoint_utils.py | 11 ++- agent/deep_assistant.py | 15 ++-- routes/chat.py | 182 +++++++++++++++++++++++++++++++++----- test_warmup.sh | 73 +++++++++++++++ 5 files changed, 261 insertions(+), 32 deletions(-) create mode 100755 test_warmup.sh diff --git a/agent/agent_config.py b/agent/agent_config.py index 5315abf..2ffa312 100644 --- a/agent/agent_config.py +++ b/agent/agent_config.py @@ -30,6 +30,7 @@ class AgentConfig: project_dir: Optional[str] = None user_identifier: Optional[str] = None session_id: Optional[str] = None + dataset_ids: Optional[List[str]] = field(default_factory=list) # 响应控制参数 stream: bool = False @@ -56,6 +57,7 @@ class AgentConfig: 'project_dir': self.project_dir, 'user_identifier': self.user_identifier, 'session_id': self.session_id, + 'dataset_ids': self.dataset_ids, 'stream': self.stream, 'tool_response': self.tool_response, 'preamble_text': self.preamble_text, @@ -77,7 +79,7 @@ class AgentConfig: from utils.fastapi_utils import get_preamble_text if messages is None: messages = [] - + preamble_text, system_prompt = get_preamble_text(request.language, request.system_prompt) config = cls( bot_id=request.bot_id, @@ -99,6 +101,7 @@ class AgentConfig: messages=messages, _origin_messages=messages, preamble_text=preamble_text, + dataset_ids=request.dataset_ids, ) config.safe_print() return config @@ -127,13 +130,14 @@ class AgentConfig: session_id=request.session_id, enable_thinking=request.enable_thinking, project_dir=project_dir, - stream=request.stream, + stream=request.stream, tool_response=request.tool_response, generate_cfg={}, # v2接口不传递额外的generate_cfg logging_handler=LoggingCallbackHandler(), messages=messages, _origin_messages=messages, preamble_text=preamble_text, + dataset_ids=bot_config.get("dataset_ids", []), # 从后端配置获取dataset_ids ) config.safe_print() return config @@ -151,7 +155,7 @@ class AgentConfig: """ 生成唯一的缓存键 - 基于session_id, bot_id, system_prompt, mcp_settings生成唯一的缓存键。 + 基于session_id, bot_id, system_prompt, mcp_settings, dataset_ids等生成唯一的缓存键。 如果没有session_id,返回None,表示不使用缓存。 Returns: @@ -168,6 +172,8 @@ class AgentConfig: 'enable_thinking': self.enable_thinking, 'user_identifier': self.user_identifier, 'session_id': self.session_id, + 'dataset_ids': self.dataset_ids, # 添加dataset_ids到缓存键生成 + 'project_dir': self.project_dir, # 也应该包含project_dir因为dataset_ids影响project_dir } # 将组件转换为字符串并连接 diff --git a/agent/checkpoint_utils.py b/agent/checkpoint_utils.py index d4ee1da..8bc781a 100644 --- a/agent/checkpoint_utils.py +++ b/agent/checkpoint_utils.py @@ -95,4 +95,13 @@ def update_agent_config_for_checkpoint( Returns: List[Dict]: 更新后的消息列表 """ - return prepare_messages_for_agent(config_messages, has_history) \ No newline at end of file + return prepare_messages_for_agent(config_messages, has_history) + +async def prepare_checkpoint_message(config,checkpointer): + # 如果有 checkpointer,检查是否有历史记录 + if config.session_id and checkpointer and len(config.messages) > 0: + has_history = await check_checkpoint_history(checkpointer, config.session_id) + config.messages = prepare_messages_for_agent(config.messages, has_history) + logger.info(f"Session {config.session_id}: has_history={has_history}, sending {len(config.messages)} messages") + else: + logger.debug(f"No session_id provided, skipping checkpoint check") diff --git a/agent/deep_assistant.py b/agent/deep_assistant.py index 9c6acc1..b7971ab 100644 --- a/agent/deep_assistant.py +++ b/agent/deep_assistant.py @@ -15,6 +15,7 @@ from utils.settings import SUMMARIZATION_MAX_TOKENS, TOOL_OUTPUT_MAX_LENGTH from agent.agent_config import AgentConfig from agent.prompt_loader import load_system_prompt_async, load_mcp_settings_async from agent.agent_memory_cache import get_memory_cache_manager +from .checkpoint_utils import prepare_checkpoint_message logger = logging.getLogger('app') @@ -70,6 +71,12 @@ async def init_agent(config: AgentConfig): config: AgentConfig 对象,包含所有初始化参数 mcp: MCP配置(如果为None则使用配置中的mcp_settings) """ + + # 初始化 checkpointer 和中间件 + checkpointer = None + if config.session_id: + checkpointer = _global_checkpointer + await prepare_checkpoint_message(config, checkpointer) # 获取缓存管理器 cache_manager = get_memory_cache_manager() @@ -137,11 +144,7 @@ async def init_agent(config: AgentConfig): ) middleware.append(tool_output_middleware) - # 初始化 checkpointer 和中间件 - checkpointer = None - - if config.session_id: - checkpointer = _global_checkpointer + if checkpointer: summarization_middleware = SummarizationMiddleware( model=llm_instance, max_tokens_before_summary=SUMMARIZATION_MAX_TOKENS, @@ -157,7 +160,7 @@ async def init_agent(config: AgentConfig): middleware=middleware, checkpointer=checkpointer # 传入 checkpointer 以启用持久化 ) - + # 如果有缓存键,将 agent 加入缓存 if cache_key: # 使用 DiskCache 缓存管理器存储 agent diff --git a/routes/chat.py b/routes/chat.py index d50e153..4ea6e72 100644 --- a/routes/chat.py +++ b/routes/chat.py @@ -91,7 +91,6 @@ def format_messages_to_chat_history(messages: list) -> str: async def enhanced_generate_stream_response( - agent, config: AgentConfig ): """增强的渐进式流式响应生成器 - 并发优化版本 @@ -135,7 +134,7 @@ async def enhanced_generate_stream_response( logger.info(f"Starting agent stream response") chunk_id = 0 message_tag = "" - + agent = await init_agent(config) async for msg, metadata in agent.astream({"messages": config.messages}, stream_mode="messages", config=config.invoke_config(), max_tokens=MAX_OUTPUT_TOKENS): new_content = "" @@ -261,32 +260,15 @@ async def create_agent_and_generate_response( Args: config: AgentConfig 对象,包含所有参数 """ - # 获取或创建 agent(需要先创建 agent 才能访问 checkpointer) - agent = await init_agent(config) - - # 如果有 checkpointer,检查是否有历史记录 - if config.session_id and agent.checkpointer: - from agent.checkpoint_utils import check_checkpoint_history, prepare_messages_for_agent - has_history = await check_checkpoint_history(agent.checkpointer, config.session_id) - - # 更新 config.messages - config.messages = prepare_messages_for_agent(config.messages, has_history) - - logger.info(f"Session {config.session_id}: has_history={has_history}, sending {len(config.messages)} messages") - else: - logger.debug(f"No session_id provided, skipping checkpoint check") - # 如果是流式模式,使用增强的流式响应生成器 if config.stream: return StreamingResponse( - enhanced_generate_stream_response( - agent=agent, - config=config - ), + enhanced_generate_stream_response(config), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "Connection": "keep-alive"} ) - + + agent = await init_agent(config) # 使用更新后的 messages agent_responses = await agent.ainvoke({"messages": config.messages}, config=config.invoke_config(), max_tokens=MAX_OUTPUT_TOKENS) append_messages = agent_responses["messages"][len(config.messages):] @@ -386,6 +368,162 @@ async def chat_completions(request: ChatRequest, authorization: Optional[str] = raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") +@router.post("/api/v1/chat/warmup") +async def chat_warmup_v1(request: ChatRequest, authorization: Optional[str] = Header(None)): + """ + 预热接口 - 初始化agent但不处理消息,用于后续请求的快速响应 + + Args: + request: ChatRequest containing configuration (messages will be ignored for warmup) + authorization: Authorization header containing API key (Bearer ) + + Returns: + JSON response with warmup status and cache key + + Required Parameters: + - bot_id: str - 目标机器人ID + + Notes: + - 此接口会预先生成并缓存agent,后续的chat请求可以复用缓存的agent + - messages参数在预热阶段不会被处理,仅用于配置验证 + - 预热的agent会基于提供的配置参数生成唯一的缓存键 + """ + try: + # v1接口:从Authorization header中提取API key作为模型API密钥 + api_key = extract_api_key_from_auth(authorization) + + # 获取bot_id(必需参数) + bot_id = request.bot_id + if not bot_id: + raise HTTPException(status_code=400, detail="bot_id is required") + + # 创建项目目录(如果有dataset_ids且不是agent类型) + project_dir = create_project_directory(request.dataset_ids, bot_id, request.robot_type) + + # 收集额外参数作为 generate_cfg + exclude_fields = {'messages', 'model', 'model_server', 'dataset_ids', 'language', 'tool_response', 'system_prompt', 'mcp_settings' ,'stream', 'robot_type', 'bot_id', 'user_identifier', 'session_id', 'enable_thinking'} + generate_cfg = {k: v for k, v in request.model_dump().items() if k not in exclude_fields} + + # 创建一个空的消息列表用于预热(实际消息不会在warmup中处理) + empty_messages = [] + + # 处理消息(即使是空的) + messages = process_messages(empty_messages, request.language or "ja") + + # 创建 AgentConfig 对象 + config = AgentConfig.from_v1_request(request, api_key, project_dir, generate_cfg, messages) + + # 预热:初始化agent(这会触发缓存) + logger.info(f"Warming up agent for bot_id: {bot_id}") + agent = await init_agent(config) + + # 获取缓存键 + cache_key = config.get_unique_cache_id() if hasattr(config, 'get_unique_cache_id') else None + + return { + "status": "warmed_up", + "bot_id": bot_id, + "cache_key": cache_key, + "message": "Agent has been initialized and cached successfully" + } + + except Exception as e: + import traceback + error_details = traceback.format_exc() + logger.error(f"Error in chat_warmup_v1: {str(e)}") + logger.error(f"Full traceback: {error_details}") + raise HTTPException(status_code=500, detail=f"Warmup failed: {str(e)}") + + +@router.post("/api/v2/chat/warmup") +async def chat_warmup_v2(request: ChatRequestV2, authorization: Optional[str] = Header(None)): + """ + 预热接口 v2 - 初始化agent但不处理消息,用于后续请求的快速响应 + 使用与 /api/v2/chat/completions 相同的认证和配置获取方式 + + Args: + request: ChatRequestV2 containing essential parameters (messages will be ignored for warmup) + authorization: Authorization header for authentication (same as v2 chat endpoint) + + Returns: + JSON response with warmup status and cache key + + Required Parameters: + - bot_id: str - 目标机器人ID + + Authentication: + - Requires valid MD5 hash token: MD5(MASTERKEY:bot_id) + - Authorization header should contain: Bearer {token} + + Notes: + - 此接口会预先生成并缓存agent,后续的chat请求可以复用缓存的agent + - messages参数在预热阶段不会被处理,仅用于配置验证 + - 预热的agent会基于从后端获取的完整配置生成唯一的缓存键 + """ + try: + # 获取bot_id(必需参数) + bot_id = request.bot_id + if not bot_id: + raise HTTPException(status_code=400, detail="bot_id is required") + + # v2接口鉴权验证(与chat_completions_v2相同的认证逻辑) + expected_token = generate_v2_auth_token(bot_id) + provided_token = extract_api_key_from_auth(authorization) + + if not provided_token: + raise HTTPException( + status_code=401, + detail="Authorization header is required for v2 API" + ) + + if provided_token != expected_token: + raise HTTPException( + status_code=403, + detail=f"Invalid authentication token. Expected: {expected_token[:8]}..., Provided: {provided_token[:8]}..." + ) + + # 从后端API获取机器人配置(使用v2的鉴权方式) + bot_config = await fetch_bot_config(bot_id) + # 创建项目目录(从后端配置获取dataset_ids) + project_dir = create_project_directory( + bot_config.get("dataset_ids", []), + bot_id, + bot_config.get("robot_type", "general_agent") + ) + + # 创建一个空的消息列表用于预热(实际消息不会在warmup中处理) + empty_messages = [] + + # 处理消息 + messages = process_messages(empty_messages, request.language or "ja") + + # 创建 AgentConfig 对象 + config = AgentConfig.from_v2_request(request, bot_config, project_dir, messages) + + # 预热:初始化agent(这会触发缓存) + logger.info(f"Warming up agent for bot_id: {bot_id}") + agent = await init_agent(config) + + # 获取缓存键 + cache_key = config.get_unique_cache_id() if hasattr(config, 'get_unique_cache_id') else None + + return { + "status": "warmed_up", + "bot_id": bot_id, + "cache_key": cache_key, + "message": "Agent has been initialized and cached successfully" + } + + except HTTPException: + raise + except Exception as e: + import traceback + error_details = traceback.format_exc() + logger.error(f"Error in chat_warmup_v2: {str(e)}") + logger.error(f"Full traceback: {error_details}") + raise HTTPException(status_code=500, detail=f"Warmup failed: {str(e)}") + + @router.post("/api/v2/chat/completions") async def chat_completions_v2(request: ChatRequestV2, authorization: Optional[str] = Header(None)): """ diff --git a/test_warmup.sh b/test_warmup.sh new file mode 100755 index 0000000..6944473 --- /dev/null +++ b/test_warmup.sh @@ -0,0 +1,73 @@ +#!/bin/bash + +echo "Testing v1 warmup endpoint..." +echo "=======================================" + +# Test v1 warmup endpoint +curl --request POST \ + --url http://localhost:8001/api/v1/chat/warmup \ + --header 'authorization: Bearer your-api-key-here' \ + --header 'content-type: application/json' \ + --data '{ + "bot_id": "test-bot-001", + "model": "gpt-4", + "messages": [{"role": "user", "content": "This message will be ignored"}], + "dataset_ids": ["project-123"], + "robot_type": "catalog_agent" +}' + +echo -e "\n\nTesting v2 warmup endpoint..." +echo "=======================================" + +# Test v2 warmup endpoint (using same auth as chat_completions_v2) +curl --request POST \ + --url http://localhost:8001/api/v2/chat/warmup \ + --header 'authorization: Bearer a21c99620a8ef61d69563afe05ccce89' \ + --header 'content-type: application/json' \ + --data '{ + "bot_id": "63069654-7750-409d-9a58-a0960d899a20", + "messages": [{"role": "user", "content": "This message will be ignored"}], + "stream": false, + "language": "ja", + "tool_response": true, + "user_identifier": "test-user" +}' + +echo -e "\n\nNow testing actual chat endpoints to see if they use cached agents..." +echo "===============================================================" + +echo "Testing v1 chat completions (should be faster after warmup)..." +curl --request POST \ + --url http://localhost:8001/api/v1/chat/completions \ + --header 'authorization: Bearer your-api-key-here' \ + --header 'content-type: application/json' \ + --data '{ + "bot_id": "test-bot-001", + "model": "gpt-4", + "messages": [{"role": "user", "content": "Hello, how are you?"}], + "dataset_ids": ["project-123"], + "robot_type": "catalog_agent", + "stream": false +}' | jq -r '.choices[0].message.content' | head -c 100 + +echo -e "\n\nTesting v2 chat completions (should be faster after warmup)..." +curl --request POST \ + --url http://localhost:8001/api/v2/chat/completions \ + --header 'authorization: Bearer a21c99620a8ef61d69563afe05ccce89' \ + --header 'content-type: application/json' \ + --data '{ + "messages": [ + { + "role": "user", + "content": "咖啡多少钱一杯" + } + ], + "stream": false, + "model": "whatever", + "language": "ja", + "bot_id": "63069654-7750-409d-9a58-a0960d899a20", + "tool_response": false, + "user_identifier": "及川" +}' | jq -r '.choices[0].message.content' | head -c 100 + +echo -e "\n\nDone!" \ No newline at end of file