This commit is contained in:
朱潮 2025-12-18 00:38:04 +08:00
parent 96d9ee5af7
commit 6bad1743b3
5 changed files with 261 additions and 32 deletions

View File

@ -30,6 +30,7 @@ class AgentConfig:
project_dir: Optional[str] = None
user_identifier: Optional[str] = None
session_id: Optional[str] = None
dataset_ids: Optional[List[str]] = field(default_factory=list)
# 响应控制参数
stream: bool = False
@ -56,6 +57,7 @@ class AgentConfig:
'project_dir': self.project_dir,
'user_identifier': self.user_identifier,
'session_id': self.session_id,
'dataset_ids': self.dataset_ids,
'stream': self.stream,
'tool_response': self.tool_response,
'preamble_text': self.preamble_text,
@ -77,7 +79,7 @@ class AgentConfig:
from utils.fastapi_utils import get_preamble_text
if messages is None:
messages = []
preamble_text, system_prompt = get_preamble_text(request.language, request.system_prompt)
config = cls(
bot_id=request.bot_id,
@ -99,6 +101,7 @@ class AgentConfig:
messages=messages,
_origin_messages=messages,
preamble_text=preamble_text,
dataset_ids=request.dataset_ids,
)
config.safe_print()
return config
@ -127,13 +130,14 @@ class AgentConfig:
session_id=request.session_id,
enable_thinking=request.enable_thinking,
project_dir=project_dir,
stream=request.stream,
stream=request.stream,
tool_response=request.tool_response,
generate_cfg={}, # v2接口不传递额外的generate_cfg
logging_handler=LoggingCallbackHandler(),
messages=messages,
_origin_messages=messages,
preamble_text=preamble_text,
dataset_ids=bot_config.get("dataset_ids", []), # 从后端配置获取dataset_ids
)
config.safe_print()
return config
@ -151,7 +155,7 @@ class AgentConfig:
"""
生成唯一的缓存键
基于session_id, bot_id, system_prompt, mcp_settings生成唯一的缓存键
基于session_id, bot_id, system_prompt, mcp_settings, dataset_ids等生成唯一的缓存键
如果没有session_id返回None表示不使用缓存
Returns:
@ -168,6 +172,8 @@ class AgentConfig:
'enable_thinking': self.enable_thinking,
'user_identifier': self.user_identifier,
'session_id': self.session_id,
'dataset_ids': self.dataset_ids, # 添加dataset_ids到缓存键生成
'project_dir': self.project_dir, # 也应该包含project_dir因为dataset_ids影响project_dir
}
# 将组件转换为字符串并连接

View File

@ -95,4 +95,13 @@ def update_agent_config_for_checkpoint(
Returns:
List[Dict]: 更新后的消息列表
"""
return prepare_messages_for_agent(config_messages, has_history)
return prepare_messages_for_agent(config_messages, has_history)
async def prepare_checkpoint_message(config,checkpointer):
# 如果有 checkpointer检查是否有历史记录
if config.session_id and checkpointer and len(config.messages) > 0:
has_history = await check_checkpoint_history(checkpointer, config.session_id)
config.messages = prepare_messages_for_agent(config.messages, has_history)
logger.info(f"Session {config.session_id}: has_history={has_history}, sending {len(config.messages)} messages")
else:
logger.debug(f"No session_id provided, skipping checkpoint check")

View File

@ -15,6 +15,7 @@ from utils.settings import SUMMARIZATION_MAX_TOKENS, TOOL_OUTPUT_MAX_LENGTH
from agent.agent_config import AgentConfig
from agent.prompt_loader import load_system_prompt_async, load_mcp_settings_async
from agent.agent_memory_cache import get_memory_cache_manager
from .checkpoint_utils import prepare_checkpoint_message
logger = logging.getLogger('app')
@ -70,6 +71,12 @@ async def init_agent(config: AgentConfig):
config: AgentConfig 对象包含所有初始化参数
mcp: MCP配置如果为None则使用配置中的mcp_settings
"""
# 初始化 checkpointer 和中间件
checkpointer = None
if config.session_id:
checkpointer = _global_checkpointer
await prepare_checkpoint_message(config, checkpointer)
# 获取缓存管理器
cache_manager = get_memory_cache_manager()
@ -137,11 +144,7 @@ async def init_agent(config: AgentConfig):
)
middleware.append(tool_output_middleware)
# 初始化 checkpointer 和中间件
checkpointer = None
if config.session_id:
checkpointer = _global_checkpointer
if checkpointer:
summarization_middleware = SummarizationMiddleware(
model=llm_instance,
max_tokens_before_summary=SUMMARIZATION_MAX_TOKENS,
@ -157,7 +160,7 @@ async def init_agent(config: AgentConfig):
middleware=middleware,
checkpointer=checkpointer # 传入 checkpointer 以启用持久化
)
# 如果有缓存键,将 agent 加入缓存
if cache_key:
# 使用 DiskCache 缓存管理器存储 agent

View File

@ -91,7 +91,6 @@ def format_messages_to_chat_history(messages: list) -> str:
async def enhanced_generate_stream_response(
agent,
config: AgentConfig
):
"""增强的渐进式流式响应生成器 - 并发优化版本
@ -135,7 +134,7 @@ async def enhanced_generate_stream_response(
logger.info(f"Starting agent stream response")
chunk_id = 0
message_tag = ""
agent = await init_agent(config)
async for msg, metadata in agent.astream({"messages": config.messages}, stream_mode="messages", config=config.invoke_config(), max_tokens=MAX_OUTPUT_TOKENS):
new_content = ""
@ -261,32 +260,15 @@ async def create_agent_and_generate_response(
Args:
config: AgentConfig 对象包含所有参数
"""
# 获取或创建 agent需要先创建 agent 才能访问 checkpointer
agent = await init_agent(config)
# 如果有 checkpointer检查是否有历史记录
if config.session_id and agent.checkpointer:
from agent.checkpoint_utils import check_checkpoint_history, prepare_messages_for_agent
has_history = await check_checkpoint_history(agent.checkpointer, config.session_id)
# 更新 config.messages
config.messages = prepare_messages_for_agent(config.messages, has_history)
logger.info(f"Session {config.session_id}: has_history={has_history}, sending {len(config.messages)} messages")
else:
logger.debug(f"No session_id provided, skipping checkpoint check")
# 如果是流式模式,使用增强的流式响应生成器
if config.stream:
return StreamingResponse(
enhanced_generate_stream_response(
agent=agent,
config=config
),
enhanced_generate_stream_response(config),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "Connection": "keep-alive"}
)
agent = await init_agent(config)
# 使用更新后的 messages
agent_responses = await agent.ainvoke({"messages": config.messages}, config=config.invoke_config(), max_tokens=MAX_OUTPUT_TOKENS)
append_messages = agent_responses["messages"][len(config.messages):]
@ -386,6 +368,162 @@ async def chat_completions(request: ChatRequest, authorization: Optional[str] =
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
@router.post("/api/v1/chat/warmup")
async def chat_warmup_v1(request: ChatRequest, authorization: Optional[str] = Header(None)):
"""
预热接口 - 初始化agent但不处理消息用于后续请求的快速响应
Args:
request: ChatRequest containing configuration (messages will be ignored for warmup)
authorization: Authorization header containing API key (Bearer <API_KEY>)
Returns:
JSON response with warmup status and cache key
Required Parameters:
- bot_id: str - 目标机器人ID
Notes:
- 此接口会预先生成并缓存agent后续的chat请求可以复用缓存的agent
- messages参数在预热阶段不会被处理仅用于配置验证
- 预热的agent会基于提供的配置参数生成唯一的缓存键
"""
try:
# v1接口从Authorization header中提取API key作为模型API密钥
api_key = extract_api_key_from_auth(authorization)
# 获取bot_id必需参数
bot_id = request.bot_id
if not bot_id:
raise HTTPException(status_code=400, detail="bot_id is required")
# 创建项目目录如果有dataset_ids且不是agent类型
project_dir = create_project_directory(request.dataset_ids, bot_id, request.robot_type)
# 收集额外参数作为 generate_cfg
exclude_fields = {'messages', 'model', 'model_server', 'dataset_ids', 'language', 'tool_response', 'system_prompt', 'mcp_settings' ,'stream', 'robot_type', 'bot_id', 'user_identifier', 'session_id', 'enable_thinking'}
generate_cfg = {k: v for k, v in request.model_dump().items() if k not in exclude_fields}
# 创建一个空的消息列表用于预热实际消息不会在warmup中处理
empty_messages = []
# 处理消息(即使是空的)
messages = process_messages(empty_messages, request.language or "ja")
# 创建 AgentConfig 对象
config = AgentConfig.from_v1_request(request, api_key, project_dir, generate_cfg, messages)
# 预热初始化agent这会触发缓存
logger.info(f"Warming up agent for bot_id: {bot_id}")
agent = await init_agent(config)
# 获取缓存键
cache_key = config.get_unique_cache_id() if hasattr(config, 'get_unique_cache_id') else None
return {
"status": "warmed_up",
"bot_id": bot_id,
"cache_key": cache_key,
"message": "Agent has been initialized and cached successfully"
}
except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error(f"Error in chat_warmup_v1: {str(e)}")
logger.error(f"Full traceback: {error_details}")
raise HTTPException(status_code=500, detail=f"Warmup failed: {str(e)}")
@router.post("/api/v2/chat/warmup")
async def chat_warmup_v2(request: ChatRequestV2, authorization: Optional[str] = Header(None)):
"""
预热接口 v2 - 初始化agent但不处理消息用于后续请求的快速响应
使用与 /api/v2/chat/completions 相同的认证和配置获取方式
Args:
request: ChatRequestV2 containing essential parameters (messages will be ignored for warmup)
authorization: Authorization header for authentication (same as v2 chat endpoint)
Returns:
JSON response with warmup status and cache key
Required Parameters:
- bot_id: str - 目标机器人ID
Authentication:
- Requires valid MD5 hash token: MD5(MASTERKEY:bot_id)
- Authorization header should contain: Bearer {token}
Notes:
- 此接口会预先生成并缓存agent后续的chat请求可以复用缓存的agent
- messages参数在预热阶段不会被处理仅用于配置验证
- 预热的agent会基于从后端获取的完整配置生成唯一的缓存键
"""
try:
# 获取bot_id必需参数
bot_id = request.bot_id
if not bot_id:
raise HTTPException(status_code=400, detail="bot_id is required")
# v2接口鉴权验证与chat_completions_v2相同的认证逻辑
expected_token = generate_v2_auth_token(bot_id)
provided_token = extract_api_key_from_auth(authorization)
if not provided_token:
raise HTTPException(
status_code=401,
detail="Authorization header is required for v2 API"
)
if provided_token != expected_token:
raise HTTPException(
status_code=403,
detail=f"Invalid authentication token. Expected: {expected_token[:8]}..., Provided: {provided_token[:8]}..."
)
# 从后端API获取机器人配置使用v2的鉴权方式
bot_config = await fetch_bot_config(bot_id)
# 创建项目目录从后端配置获取dataset_ids
project_dir = create_project_directory(
bot_config.get("dataset_ids", []),
bot_id,
bot_config.get("robot_type", "general_agent")
)
# 创建一个空的消息列表用于预热实际消息不会在warmup中处理
empty_messages = []
# 处理消息
messages = process_messages(empty_messages, request.language or "ja")
# 创建 AgentConfig 对象
config = AgentConfig.from_v2_request(request, bot_config, project_dir, messages)
# 预热初始化agent这会触发缓存
logger.info(f"Warming up agent for bot_id: {bot_id}")
agent = await init_agent(config)
# 获取缓存键
cache_key = config.get_unique_cache_id() if hasattr(config, 'get_unique_cache_id') else None
return {
"status": "warmed_up",
"bot_id": bot_id,
"cache_key": cache_key,
"message": "Agent has been initialized and cached successfully"
}
except HTTPException:
raise
except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error(f"Error in chat_warmup_v2: {str(e)}")
logger.error(f"Full traceback: {error_details}")
raise HTTPException(status_code=500, detail=f"Warmup failed: {str(e)}")
@router.post("/api/v2/chat/completions")
async def chat_completions_v2(request: ChatRequestV2, authorization: Optional[str] = Header(None)):
"""

73
test_warmup.sh Executable file
View File

@ -0,0 +1,73 @@
#!/bin/bash
echo "Testing v1 warmup endpoint..."
echo "======================================="
# Test v1 warmup endpoint
curl --request POST \
--url http://localhost:8001/api/v1/chat/warmup \
--header 'authorization: Bearer your-api-key-here' \
--header 'content-type: application/json' \
--data '{
"bot_id": "test-bot-001",
"model": "gpt-4",
"messages": [{"role": "user", "content": "This message will be ignored"}],
"dataset_ids": ["project-123"],
"robot_type": "catalog_agent"
}'
echo -e "\n\nTesting v2 warmup endpoint..."
echo "======================================="
# Test v2 warmup endpoint (using same auth as chat_completions_v2)
curl --request POST \
--url http://localhost:8001/api/v2/chat/warmup \
--header 'authorization: Bearer a21c99620a8ef61d69563afe05ccce89' \
--header 'content-type: application/json' \
--data '{
"bot_id": "63069654-7750-409d-9a58-a0960d899a20",
"messages": [{"role": "user", "content": "This message will be ignored"}],
"stream": false,
"language": "ja",
"tool_response": true,
"user_identifier": "test-user"
}'
echo -e "\n\nNow testing actual chat endpoints to see if they use cached agents..."
echo "==============================================================="
echo "Testing v1 chat completions (should be faster after warmup)..."
curl --request POST \
--url http://localhost:8001/api/v1/chat/completions \
--header 'authorization: Bearer your-api-key-here' \
--header 'content-type: application/json' \
--data '{
"bot_id": "test-bot-001",
"model": "gpt-4",
"messages": [{"role": "user", "content": "Hello, how are you?"}],
"dataset_ids": ["project-123"],
"robot_type": "catalog_agent",
"stream": false
}' | jq -r '.choices[0].message.content' | head -c 100
echo -e "\n\nTesting v2 chat completions (should be faster after warmup)..."
curl --request POST \
--url http://localhost:8001/api/v2/chat/completions \
--header 'authorization: Bearer a21c99620a8ef61d69563afe05ccce89' \
--header 'content-type: application/json' \
--data '{
"messages": [
{
"role": "user",
"content": "咖啡多少钱一杯"
}
],
"stream": false,
"model": "whatever",
"language": "ja",
"bot_id": "63069654-7750-409d-9a58-a0960d899a20",
"tool_response": false,
"user_identifier": "及川"
}' | jq -r '.choices[0].message.content' | head -c 100
echo -e "\n\nDone!"