This commit is contained in:
朱潮 2025-11-28 00:17:48 +08:00
parent 95577c07a8
commit db71607ff1

View File

@ -148,6 +148,300 @@ async def generate_stream_response(agent, messages, pre_message_list, tool_respo
yield f"data: {json.dumps(error_data, ensure_ascii=False)}\n\n"
async def process_guidelines_and_terms(
bot_id: str,
api_key: str,
model_name: str,
model_server: str,
system_prompt: str,
messages: list,
agent_manager,
project_dir: Optional[str],
generate_cfg: Optional[dict],
language: str,
mcp_settings: Optional[list],
robot_type: str,
user_identifier: Optional[str]
) -> tuple:
"""
公共函数处理guideline分析和terms处理返回agent和analysis结果
Returns:
tuple: (agent, processed_system_prompt, guideline_analysis, terms_analysis)
"""
# 提取system_prompt中的guideline和terms
processed_system_prompt, guidelines_list, terms_list = extract_block_from_system_prompt(system_prompt)
# 处理terms
terms_analysis = ""
if terms_list:
logger.info(f"Processing terms: {len(terms_list)} terms")
try:
from embedding.embedding import process_terms_with_embedding
query_text = get_user_last_message_content(messages)
terms_analysis = process_terms_with_embedding(terms_list, bot_id, query_text)
if terms_analysis:
processed_system_prompt = processed_system_prompt.replace("{terms}", terms_analysis)
logger.info(f"Terms analysis completed: {len(terms_analysis)} chars")
except Exception as e:
logger.error(f"Error processing terms with embedding: {e}")
terms_analysis = ""
else:
# 当terms_list为空时删除对应的pkl缓存文件
try:
import os
cache_file = f"projects/cache/{bot_id}_terms.pkl"
if os.path.exists(cache_file):
os.remove(cache_file)
logger.info(f"Removed empty terms cache file: {cache_file}")
except Exception as e:
logger.error(f"Error removing terms cache file: {e}")
# 处理guidelines
guideline_analysis = ""
agent = None
if guidelines_list:
logger.info(f"Processing guidelines: {len(guidelines_list)} guidelines")
chat_history = format_messages_to_chat_history(messages)
guidelines_count = len(guidelines_list)
batch_count = _get_optimal_batch_size(guidelines_count)
guidelines_per_batch = max(1, guidelines_count // batch_count)
# 分批处理guidelines
batches = []
for i in range(0, guidelines_count, guidelines_per_batch):
batch_guidelines = guidelines_list[i:i + guidelines_per_batch]
batch_strings = []
for guideline in batch_guidelines:
guideline_str = f"{guideline['id']}) Condition: {guideline['condition']} Action: {guideline['action']}"
batch_strings.append(guideline_str)
batches.append(batch_strings)
# 确保批次数量不超过要求的并发数
while len(batches) > batch_count:
batches[-2].extend(batches[-1])
batches.pop()
# 创建所有任务
tasks = []
# 添加guideline批次任务
for batch in batches:
task = process_guideline_batch(
guidelines_batch=batch,
chat_history=chat_history,
terms=terms_analysis,
model_name=model_name,
api_key=api_key,
model_server=model_server
)
tasks.append(task)
# 添加agent创建任务
agent_task = agent_manager.get_or_create_agent(
bot_id=bot_id,
project_dir=project_dir,
model_name=model_name,
api_key=api_key,
model_server=model_server,
generate_cfg=generate_cfg,
language=language,
system_prompt=processed_system_prompt,
mcp_settings=mcp_settings,
robot_type=robot_type,
user_identifier=user_identifier
)
tasks.append(agent_task)
# 并发执行所有任务
all_results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理结果
agent = all_results[-1] # agent创建的结果
batch_results = all_results[:-1] # guideline批次的结果
# 合并guideline分析结果
all_checks = []
for i, result in enumerate(batch_results):
if isinstance(result, Exception):
logger.error(f"Guideline batch {i} failed: {result}")
continue
if result and isinstance(result, dict) and 'checks' in result:
applicable_checks = [check for check in result['checks'] if check.get('applies') is True]
all_checks.extend(applicable_checks)
elif result and isinstance(result, str) and result.strip():
logger.info(f"Non-JSON result from batch {i}: {result}")
if all_checks:
guideline_analysis = "\n".join([item["rationale"] for item in all_checks])
logger.info(f"Guideline analysis completed: {len(guideline_analysis)} chars")
else:
# 没有guidelines直接创建agent
agent = await agent_manager.get_or_create_agent(
bot_id=bot_id,
project_dir=project_dir,
model_name=model_name,
api_key=api_key,
model_server=model_server,
generate_cfg=generate_cfg,
language=language,
system_prompt=processed_system_prompt,
mcp_settings=mcp_settings,
robot_type=robot_type,
user_identifier=user_identifier
)
return agent, processed_system_prompt, guideline_analysis, terms_analysis
async def enhanced_generate_stream_response(
agent_manager,
bot_id: str,
api_key: str,
messages: list,
tool_response: bool,
model_name: str,
model_server: str,
language: str,
system_prompt: str,
mcp_settings: Optional[list],
robot_type: str,
project_dir: Optional[str],
generate_cfg: Optional[dict],
user_identifier: Optional[str],
pre_message_list: Optional[list]
):
"""增强的渐进式流式响应生成器"""
try:
# 第一阶段立即传输preamble_text
if pre_message_list:
chunk_data = {
"id": f"chatcmpl-preamble",
"object": "chat.completion.chunk",
"created": int(__import__('time').time()),
"model": model_name,
"choices": [{
"index": 0,
"delta": {
"content": get_content_from_messages(pre_message_list, tool_response=tool_response)
},
"finish_reason": None
}]
}
yield f"data: {json.dumps(chunk_data, ensure_ascii=False)}\n\n"
# 第二阶段使用公共函数处理guideline分析和agent创建
agent, _, guideline_analysis, _ = await process_guidelines_and_terms(
bot_id=bot_id,
api_key=api_key,
model_name=model_name,
model_server=model_server,
system_prompt=system_prompt,
messages=messages,
agent_manager=agent_manager,
project_dir=project_dir,
generate_cfg=generate_cfg,
language=language,
mcp_settings=mcp_settings,
robot_type=robot_type,
user_identifier=user_identifier
)
# 立即发送guideline_analysis
if guideline_analysis:
chunk_data = {
"id": f"chatcmpl-guideline",
"object": "chat.completion.chunk",
"created": int(__import__('time').time()),
"model": model_name,
"choices": [{
"index": 0,
"delta": {
"content": get_content_from_messages([{"role": "assistant","reasoning_content": guideline_analysis}], tool_response=tool_response)
},
"finish_reason": None
}]
}
yield f"data: {json.dumps(chunk_data, ensure_ascii=False)}\n\n"
# 准备最终的消息
final_messages = messages.copy()
final_messages = append_user_last_message(final_messages, f"\n\nlanguage:{get_language_text(language)}")
if guideline_analysis:
final_messages = append_user_last_message(final_messages, f"\n\nActive Guidelines:\n{guideline_analysis}\nPlease follow these guidelines in your response.")
# 第三阶段agent响应流式传输
logger.info(f"Starting agent stream response")
accumulated_content = ""
chunk_id = 0
for response in agent.run(messages=final_messages):
previous_content = accumulated_content
accumulated_content = get_content_from_messages(response, tool_response=tool_response)
# 计算新增的内容
if accumulated_content.startswith(previous_content):
new_content = accumulated_content[len(previous_content):]
else:
new_content = accumulated_content
previous_content = ""
# 只有当有新内容时才发送chunk
if new_content:
if chunk_id == 0:
logger.info(f"Agent首个Token已生成, 开始流式输出")
chunk_id += 1
chunk_data = {
"id": f"chatcmpl-{chunk_id}",
"object": "chat.completion.chunk",
"created": int(__import__('time').time()),
"model": model_name,
"choices": [{
"index": 0,
"delta": {
"content": new_content
},
"finish_reason": None
}]
}
yield f"data: {json.dumps(chunk_data, ensure_ascii=False)}\n\n"
# 发送最终完成标记
final_chunk = {
"id": f"chatcmpl-{chunk_id + 1}",
"object": "chat.completion.chunk",
"created": int(__import__('time').time()),
"model": model_name,
"choices": [{
"index": 0,
"delta": {},
"finish_reason": "stop"
}]
}
yield f"data: {json.dumps(final_chunk, ensure_ascii=False)}\n\n"
# 发送结束标记
yield "data: [DONE]\n\n"
logger.info(f"Enhanced stream response completed, total chunks: {chunk_id}")
except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error(f"Error in enhanced_generate_stream_response: {str(e)}")
logger.error(f"Full traceback: {error_details}")
error_data = {
"error": {
"message": f"Stream error: {str(e)}",
"type": "internal_error"
}
}
yield f"data: {json.dumps(error_data, ensure_ascii=False)}\n\n"
async def create_agent_and_generate_response(
bot_id: str,
api_key: str,
@ -167,192 +461,94 @@ async def create_agent_and_generate_response(
"""创建agent并生成响应的公共逻辑"""
if generate_cfg is None:
generate_cfg = {}
pre_message_list = []
query_text = get_user_last_message_content(messages)
chat_history = format_messages_to_chat_history(messages)
preamble_text = await call_preamble_llm(chat_history, query_text, get_preamble_text(language), language, model_name, api_key, model_server)
if preamble_text != '':
# 只在stream=True时生成preamble_text
preamble_text = ""
if stream:
query_text = get_user_last_message_content(messages)
chat_history = format_messages_to_chat_history(messages)
preamble_text = await call_preamble_llm(chat_history, query_text, get_preamble_text(language), language, model_name, api_key, model_server)
pre_message_list.append({"role": "assistant","content": preamble_text})
# 1. 从system_prompt提取guideline和terms内容
system_prompt, guidelines_list, terms_list = extract_block_from_system_prompt(system_prompt)
# 2. 如果有terms内容先进行embeddingembedding需要缓存起来这个可以tmp文件缓存以{bot_id}_terms作为keyembedding实现情参考 @embedding/embedding.py 文件,可以在里面实现。拿到embedding后可以进行相似性检索检索方式先使用cos相似度找到阈值相似性>0.7的匹配项重新整理为terms_analysis格式1) Name: term_name1, Description: desc, Synonyms: syn1, syn2。
terms_analysis = ""
if terms_list:
logger.info(f"terms_list: {terms_list}")
# 使用embedding进行terms处理
try:
from embedding.embedding import process_terms_with_embedding
terms_analysis = process_terms_with_embedding(terms_list, bot_id, query_text)
if terms_analysis:
# 将terms分析结果也添加到消息中
system_prompt = system_prompt.replace("{terms}", terms_analysis)
logger.info(f"Generated terms analysis: {terms_analysis[:200]}...") # 只打印前200个字符
except Exception as e:
logger.error(f"Error processing terms with embedding: {e}")
terms_analysis = ""
else:
# 当terms_list为空时删除对应的pkl缓存文件
try:
import os
cache_file = f"projects/cache/{bot_id}_terms.pkl"
if os.path.exists(cache_file):
os.remove(cache_file)
logger.info(f"Removed empty terms cache file: {cache_file}")
except Exception as e:
logger.error(f"Error removing terms cache file: {e}")
# 3. 如果有guideline内容进行并发处理
guideline_analysis = ""
if guidelines_list:
logger.info(f"guidelines_list: {guidelines_list}")
guidelines_count = len(guidelines_list)
if guidelines_count > 0:
# 获取最优批次数量(并发数)
batch_count = _get_optimal_batch_size(guidelines_count)
# 计算每个批次应该包含多少条guideline
guidelines_per_batch = max(1, guidelines_count // batch_count)
# 分批处理guidelines - 将字典列表转换为字符串列表以便处理
batches = []
for i in range(0, guidelines_count, guidelines_per_batch):
batch_guidelines = guidelines_list[i:i + guidelines_per_batch]
# 将格式化为字符串保持原有的格式以便LLM处理
batch_strings = []
for guideline in batch_guidelines:
guideline_str = f"{guideline['id']}) Condition: {guideline['condition']} Action: {guideline['action']}"
batch_strings.append(guideline_str)
batches.append(batch_strings)
# 确保批次数量不超过要求的并发数
while len(batches) > batch_count:
# 将最后一个批次合并到倒数第二个批次
batches[-2].extend(batches[-1])
batches.pop()
logger.info(f"Processing {guidelines_count} guidelines in {len(batches)} batches with {batch_count} concurrent batches")
# 并发执行所有任务guideline批次处理 + agent创建
tasks = []
# 添加所有guideline批次任务
for batch in batches:
task = process_guideline_batch(
guidelines_batch=batch,
chat_history=chat_history,
terms=terms_analysis,
model_name=model_name,
api_key=api_key,
model_server=model_server
)
tasks.append(task)
# 添加agent创建任务
agent_task = agent_manager.get_or_create_agent(
bot_id=bot_id,
project_dir=project_dir,
model_name=model_name,
api_key=api_key,
model_server=model_server,
generate_cfg=generate_cfg,
language=language,
system_prompt=system_prompt,
mcp_settings=mcp_settings,
robot_type=robot_type,
user_identifier=user_identifier
)
tasks.append(agent_task)
# 等待所有任务完成
all_results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理结果最后一个结果是agent前面的是guideline批次结果
agent = all_results[-1] # agent创建的结果
batch_results = all_results[:-1] # guideline批次的结果
logger.info(f"batch_results:{batch_results}")
# 合并guideline分析结果使用JSON格式的checks数组
all_checks = []
for i, result in enumerate(batch_results):
if isinstance(result, Exception):
logger.error(f"Guideline batch {i} failed: {result}")
continue
if result and isinstance(result, dict) and 'checks' in result:
# 如果是JSON对象且包含checks数组只保留applies为true的checks
applicable_checks = [check for check in result['checks'] if check.get('applies') is True]
all_checks.extend(applicable_checks)
elif result and isinstance(result, str) and result.strip():
# 如果是普通文本,保留原有逻辑
logger.info(f"Non-JSON result from batch {i}: {result}")
if all_checks:
# 将checks数组格式化为JSON字符串
guideline_analysis = "\n".join([item["rationale"] for item in all_checks])
# guideline_analysis = json.dumps({"checks": all_checks}, ensure_ascii=False)
logger.info(f"Merged guideline analysis result: {guideline_analysis}")
# 将分析结果添加到最后一个消息的内容中
if guideline_analysis:
messages = append_user_last_message(messages, f"\n\nActive Guidelines:\n{guideline_analysis}\nPlease follow these guidelines in your response.")
else:
# 3. 从全局管理器获取或创建助手实例
agent = await agent_manager.get_or_create_agent(
bot_id=bot_id,
project_dir=project_dir,
model_name=model_name,
api_key=api_key,
model_server=model_server,
generate_cfg=generate_cfg,
language=language,
system_prompt=system_prompt,
mcp_settings=mcp_settings,
robot_type=robot_type,
user_identifier=user_identifier
)
messages = append_user_last_message(messages, f"\n\nlanguage:{get_language_text(language)}")
logger.info(f"Stream mode: Generated preamble text ({len(preamble_text)} chars)")
if guideline_analysis != '':
pre_message_list.append({"role": "assistant","reasoning_content": guideline_analysis})
# 根据stream参数决定返回流式还是非流式响应
# 如果是流式模式,使用增强的流式响应生成器
if stream:
return StreamingResponse(
generate_stream_response(agent, messages, pre_message_list, tool_response, model_name),
enhanced_generate_stream_response(
agent_manager=agent_manager,
bot_id=bot_id,
api_key=api_key,
messages=messages,
tool_response=tool_response,
model_name=model_name,
model_server=model_server,
language=language,
system_prompt=system_prompt or "",
mcp_settings=mcp_settings,
robot_type=robot_type,
project_dir=project_dir,
generate_cfg=generate_cfg,
user_identifier=user_identifier,
pre_message_list=pre_message_list
),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "Connection": "keep-alive"}
)
# 使用公共函数处理所有逻辑
agent, _, guideline_analysis, _ = await process_guidelines_and_terms(
bot_id=bot_id,
api_key=api_key,
model_name=model_name,
model_server=model_server,
system_prompt=system_prompt or "",
messages=messages,
agent_manager=agent_manager,
project_dir=project_dir,
generate_cfg=generate_cfg,
language=language,
mcp_settings=mcp_settings,
robot_type=robot_type,
user_identifier=user_identifier
)
# 准备最终的消息
final_messages = messages.copy()
final_messages = append_user_last_message(final_messages, f"\n\nlanguage:{get_language_text(language)}")
if guideline_analysis:
final_messages = append_user_last_message(final_messages, f"\n\nActive Guidelines:\n{guideline_analysis}\nPlease follow these guidelines in your response.")
pre_message_list.append({"role": "assistant","reasoning_content": guideline_analysis})
# 非流式响应
agent_responses = agent.run_nonstream(final_messages)
final_responses = pre_message_list + agent_responses
if final_responses and len(final_responses) > 0:
# 使用 get_content_from_messages 处理响应,支持 tool_response 参数
content = get_content_from_messages(final_responses, tool_response=tool_response)
# 构造OpenAI格式的响应
return ChatResponse(
choices=[{
"index": 0,
"message": {
"role": "assistant",
"content": content
},
"finish_reason": "stop"
}],
usage={
"prompt_tokens": sum(len(msg.get("content", "")) for msg in messages),
"completion_tokens": len(content),
"total_tokens": sum(len(msg.get("content", "")) for msg in messages) + len(content)
}
)
else:
# 非流式响应
agent_responses = agent.run_nonstream(messages)
final_responses = pre_message_list+agent_responses
if final_responses and len(final_responses) > 0:
# 使用 get_content_from_messages 处理响应,支持 tool_response 参数
content = get_content_from_messages(final_responses, tool_response=tool_response)
# 构造OpenAI格式的响应
return ChatResponse(
choices=[{
"index": 0,
"message": {
"role": "assistant",
"content": content
},
"finish_reason": "stop"
}],
usage={
"prompt_tokens": sum(len(msg.get("content", "")) for msg in messages),
"completion_tokens": len(content),
"total_tokens": sum(len(msg.get("content", "")) for msg in messages) + len(content)
}
)
else:
raise HTTPException(status_code=500, detail="No response from agent")
raise HTTPException(status_code=500, detail="No response from agent")
@router.post("/api/v1/chat/completions")