From db71607ff18450d75a74950b89ce982f1a9b5bcd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E6=BD=AE?= Date: Fri, 28 Nov 2025 00:17:48 +0800 Subject: [PATCH] preamble --- routes/chat.py | 548 +++++++++++++++++++++++++++++++++---------------- 1 file changed, 372 insertions(+), 176 deletions(-) diff --git a/routes/chat.py b/routes/chat.py index 479df05..0c96fe8 100644 --- a/routes/chat.py +++ b/routes/chat.py @@ -148,6 +148,300 @@ async def generate_stream_response(agent, messages, pre_message_list, tool_respo yield f"data: {json.dumps(error_data, ensure_ascii=False)}\n\n" +async def process_guidelines_and_terms( + bot_id: str, + api_key: str, + model_name: str, + model_server: str, + system_prompt: str, + messages: list, + agent_manager, + project_dir: Optional[str], + generate_cfg: Optional[dict], + language: str, + mcp_settings: Optional[list], + robot_type: str, + user_identifier: Optional[str] +) -> tuple: + """ + 公共函数:处理guideline分析和terms处理,返回agent和analysis结果 + + Returns: + tuple: (agent, processed_system_prompt, guideline_analysis, terms_analysis) + """ + # 提取system_prompt中的guideline和terms + processed_system_prompt, guidelines_list, terms_list = extract_block_from_system_prompt(system_prompt) + + # 处理terms + terms_analysis = "" + if terms_list: + logger.info(f"Processing terms: {len(terms_list)} terms") + try: + from embedding.embedding import process_terms_with_embedding + query_text = get_user_last_message_content(messages) + terms_analysis = process_terms_with_embedding(terms_list, bot_id, query_text) + if terms_analysis: + processed_system_prompt = processed_system_prompt.replace("{terms}", terms_analysis) + logger.info(f"Terms analysis completed: {len(terms_analysis)} chars") + except Exception as e: + logger.error(f"Error processing terms with embedding: {e}") + terms_analysis = "" + else: + # 当terms_list为空时,删除对应的pkl缓存文件 + try: + import os + cache_file = f"projects/cache/{bot_id}_terms.pkl" + if os.path.exists(cache_file): + os.remove(cache_file) + logger.info(f"Removed empty terms cache file: {cache_file}") + except Exception as e: + logger.error(f"Error removing terms cache file: {e}") + + # 处理guidelines + guideline_analysis = "" + agent = None + + if guidelines_list: + logger.info(f"Processing guidelines: {len(guidelines_list)} guidelines") + chat_history = format_messages_to_chat_history(messages) + + guidelines_count = len(guidelines_list) + batch_count = _get_optimal_batch_size(guidelines_count) + guidelines_per_batch = max(1, guidelines_count // batch_count) + + # 分批处理guidelines + batches = [] + for i in range(0, guidelines_count, guidelines_per_batch): + batch_guidelines = guidelines_list[i:i + guidelines_per_batch] + batch_strings = [] + for guideline in batch_guidelines: + guideline_str = f"{guideline['id']}) Condition: {guideline['condition']} Action: {guideline['action']}" + batch_strings.append(guideline_str) + batches.append(batch_strings) + + # 确保批次数量不超过要求的并发数 + while len(batches) > batch_count: + batches[-2].extend(batches[-1]) + batches.pop() + + # 创建所有任务 + tasks = [] + + # 添加guideline批次任务 + for batch in batches: + task = process_guideline_batch( + guidelines_batch=batch, + chat_history=chat_history, + terms=terms_analysis, + model_name=model_name, + api_key=api_key, + model_server=model_server + ) + tasks.append(task) + + # 添加agent创建任务 + agent_task = agent_manager.get_or_create_agent( + bot_id=bot_id, + project_dir=project_dir, + model_name=model_name, + api_key=api_key, + model_server=model_server, + generate_cfg=generate_cfg, + language=language, + system_prompt=processed_system_prompt, + mcp_settings=mcp_settings, + robot_type=robot_type, + user_identifier=user_identifier + ) + tasks.append(agent_task) + + # 并发执行所有任务 + all_results = await asyncio.gather(*tasks, return_exceptions=True) + + # 处理结果 + agent = all_results[-1] # agent创建的结果 + batch_results = all_results[:-1] # guideline批次的结果 + + # 合并guideline分析结果 + all_checks = [] + for i, result in enumerate(batch_results): + if isinstance(result, Exception): + logger.error(f"Guideline batch {i} failed: {result}") + continue + if result and isinstance(result, dict) and 'checks' in result: + applicable_checks = [check for check in result['checks'] if check.get('applies') is True] + all_checks.extend(applicable_checks) + elif result and isinstance(result, str) and result.strip(): + logger.info(f"Non-JSON result from batch {i}: {result}") + + if all_checks: + guideline_analysis = "\n".join([item["rationale"] for item in all_checks]) + logger.info(f"Guideline analysis completed: {len(guideline_analysis)} chars") + + else: + # 没有guidelines,直接创建agent + agent = await agent_manager.get_or_create_agent( + bot_id=bot_id, + project_dir=project_dir, + model_name=model_name, + api_key=api_key, + model_server=model_server, + generate_cfg=generate_cfg, + language=language, + system_prompt=processed_system_prompt, + mcp_settings=mcp_settings, + robot_type=robot_type, + user_identifier=user_identifier + ) + + return agent, processed_system_prompt, guideline_analysis, terms_analysis + + +async def enhanced_generate_stream_response( + agent_manager, + bot_id: str, + api_key: str, + messages: list, + tool_response: bool, + model_name: str, + model_server: str, + language: str, + system_prompt: str, + mcp_settings: Optional[list], + robot_type: str, + project_dir: Optional[str], + generate_cfg: Optional[dict], + user_identifier: Optional[str], + pre_message_list: Optional[list] +): + """增强的渐进式流式响应生成器""" + try: + # 第一阶段:立即传输preamble_text + if pre_message_list: + chunk_data = { + "id": f"chatcmpl-preamble", + "object": "chat.completion.chunk", + "created": int(__import__('time').time()), + "model": model_name, + "choices": [{ + "index": 0, + "delta": { + "content": get_content_from_messages(pre_message_list, tool_response=tool_response) + }, + "finish_reason": None + }] + } + yield f"data: {json.dumps(chunk_data, ensure_ascii=False)}\n\n" + + # 第二阶段:使用公共函数处理guideline分析和agent创建 + agent, _, guideline_analysis, _ = await process_guidelines_and_terms( + bot_id=bot_id, + api_key=api_key, + model_name=model_name, + model_server=model_server, + system_prompt=system_prompt, + messages=messages, + agent_manager=agent_manager, + project_dir=project_dir, + generate_cfg=generate_cfg, + language=language, + mcp_settings=mcp_settings, + robot_type=robot_type, + user_identifier=user_identifier + ) + + # 立即发送guideline_analysis + if guideline_analysis: + chunk_data = { + "id": f"chatcmpl-guideline", + "object": "chat.completion.chunk", + "created": int(__import__('time').time()), + "model": model_name, + "choices": [{ + "index": 0, + "delta": { + "content": get_content_from_messages([{"role": "assistant","reasoning_content": guideline_analysis}], tool_response=tool_response) + }, + "finish_reason": None + }] + } + yield f"data: {json.dumps(chunk_data, ensure_ascii=False)}\n\n" + + # 准备最终的消息 + final_messages = messages.copy() + final_messages = append_user_last_message(final_messages, f"\n\nlanguage:{get_language_text(language)}") + + if guideline_analysis: + final_messages = append_user_last_message(final_messages, f"\n\nActive Guidelines:\n{guideline_analysis}\nPlease follow these guidelines in your response.") + + # 第三阶段:agent响应流式传输 + logger.info(f"Starting agent stream response") + accumulated_content = "" + chunk_id = 0 + for response in agent.run(messages=final_messages): + previous_content = accumulated_content + accumulated_content = get_content_from_messages(response, tool_response=tool_response) + + # 计算新增的内容 + if accumulated_content.startswith(previous_content): + new_content = accumulated_content[len(previous_content):] + else: + new_content = accumulated_content + previous_content = "" + + # 只有当有新内容时才发送chunk + if new_content: + if chunk_id == 0: + logger.info(f"Agent首个Token已生成, 开始流式输出") + chunk_id += 1 + chunk_data = { + "id": f"chatcmpl-{chunk_id}", + "object": "chat.completion.chunk", + "created": int(__import__('time').time()), + "model": model_name, + "choices": [{ + "index": 0, + "delta": { + "content": new_content + }, + "finish_reason": None + }] + } + yield f"data: {json.dumps(chunk_data, ensure_ascii=False)}\n\n" + + # 发送最终完成标记 + final_chunk = { + "id": f"chatcmpl-{chunk_id + 1}", + "object": "chat.completion.chunk", + "created": int(__import__('time').time()), + "model": model_name, + "choices": [{ + "index": 0, + "delta": {}, + "finish_reason": "stop" + }] + } + yield f"data: {json.dumps(final_chunk, ensure_ascii=False)}\n\n" + + # 发送结束标记 + yield "data: [DONE]\n\n" + logger.info(f"Enhanced stream response completed, total chunks: {chunk_id}") + + except Exception as e: + import traceback + error_details = traceback.format_exc() + logger.error(f"Error in enhanced_generate_stream_response: {str(e)}") + logger.error(f"Full traceback: {error_details}") + + error_data = { + "error": { + "message": f"Stream error: {str(e)}", + "type": "internal_error" + } + } + yield f"data: {json.dumps(error_data, ensure_ascii=False)}\n\n" + + async def create_agent_and_generate_response( bot_id: str, api_key: str, @@ -167,192 +461,94 @@ async def create_agent_and_generate_response( """创建agent并生成响应的公共逻辑""" if generate_cfg is None: generate_cfg = {} + pre_message_list = [] - query_text = get_user_last_message_content(messages) - chat_history = format_messages_to_chat_history(messages) - preamble_text = await call_preamble_llm(chat_history, query_text, get_preamble_text(language), language, model_name, api_key, model_server) - - if preamble_text != '': + # 只在stream=True时生成preamble_text + preamble_text = "" + if stream: + query_text = get_user_last_message_content(messages) + chat_history = format_messages_to_chat_history(messages) + preamble_text = await call_preamble_llm(chat_history, query_text, get_preamble_text(language), language, model_name, api_key, model_server) pre_message_list.append({"role": "assistant","content": preamble_text}) - - # 1. 从system_prompt提取guideline和terms内容 - system_prompt, guidelines_list, terms_list = extract_block_from_system_prompt(system_prompt) - - # 2. 如果有terms内容,先进行embedding(embedding需要缓存起来,这个可以tmp文件缓存,以{bot_id}_terms作为key)embedding实现情参考 @embedding/embedding.py 文件,可以在里面实现。拿到embedding后,可以进行相似性检索,检索方式先使用cos相似度,找到阈值相似性>0.7的匹配项,重新整理为terms_analysis,格式:1) Name: term_name1, Description: desc, Synonyms: syn1, syn2。 - terms_analysis = "" - if terms_list: - logger.info(f"terms_list: {terms_list}") - # 使用embedding进行terms处理 - try: - from embedding.embedding import process_terms_with_embedding - terms_analysis = process_terms_with_embedding(terms_list, bot_id, query_text) - if terms_analysis: - # 将terms分析结果也添加到消息中 - system_prompt = system_prompt.replace("{terms}", terms_analysis) - logger.info(f"Generated terms analysis: {terms_analysis[:200]}...") # 只打印前200个字符 - except Exception as e: - logger.error(f"Error processing terms with embedding: {e}") - terms_analysis = "" - else: - # 当terms_list为空时,删除对应的pkl缓存文件 - try: - import os - cache_file = f"projects/cache/{bot_id}_terms.pkl" - if os.path.exists(cache_file): - os.remove(cache_file) - logger.info(f"Removed empty terms cache file: {cache_file}") - except Exception as e: - logger.error(f"Error removing terms cache file: {e}") - - # 3. 如果有guideline内容,进行并发处理 - guideline_analysis = "" - if guidelines_list: - logger.info(f"guidelines_list: {guidelines_list}") - guidelines_count = len(guidelines_list) - - if guidelines_count > 0: - # 获取最优批次数量(并发数) - batch_count = _get_optimal_batch_size(guidelines_count) - - # 计算每个批次应该包含多少条guideline - guidelines_per_batch = max(1, guidelines_count // batch_count) - - # 分批处理guidelines - 将字典列表转换为字符串列表以便处理 - batches = [] - for i in range(0, guidelines_count, guidelines_per_batch): - batch_guidelines = guidelines_list[i:i + guidelines_per_batch] - # 将格式化为字符串,保持原有的格式以便LLM处理 - batch_strings = [] - for guideline in batch_guidelines: - guideline_str = f"{guideline['id']}) Condition: {guideline['condition']} Action: {guideline['action']}" - batch_strings.append(guideline_str) - batches.append(batch_strings) - - # 确保批次数量不超过要求的并发数 - while len(batches) > batch_count: - # 将最后一个批次合并到倒数第二个批次 - batches[-2].extend(batches[-1]) - batches.pop() - - logger.info(f"Processing {guidelines_count} guidelines in {len(batches)} batches with {batch_count} concurrent batches") - - # 并发执行所有任务:guideline批次处理 + agent创建 - tasks = [] - - # 添加所有guideline批次任务 - for batch in batches: - task = process_guideline_batch( - guidelines_batch=batch, - chat_history=chat_history, - terms=terms_analysis, - model_name=model_name, - api_key=api_key, - model_server=model_server - ) - tasks.append(task) - - # 添加agent创建任务 - agent_task = agent_manager.get_or_create_agent( - bot_id=bot_id, - project_dir=project_dir, - model_name=model_name, - api_key=api_key, - model_server=model_server, - generate_cfg=generate_cfg, - language=language, - system_prompt=system_prompt, - mcp_settings=mcp_settings, - robot_type=robot_type, - user_identifier=user_identifier - ) - tasks.append(agent_task) - - # 等待所有任务完成 - all_results = await asyncio.gather(*tasks, return_exceptions=True) - - # 处理结果:最后一个结果是agent,前面的是guideline批次结果 - agent = all_results[-1] # agent创建的结果 - batch_results = all_results[:-1] # guideline批次的结果 - logger.info(f"batch_results:{batch_results}") - - # 合并guideline分析结果,使用JSON格式的checks数组 - all_checks = [] - for i, result in enumerate(batch_results): - if isinstance(result, Exception): - logger.error(f"Guideline batch {i} failed: {result}") - continue - if result and isinstance(result, dict) and 'checks' in result: - # 如果是JSON对象且包含checks数组,只保留applies为true的checks - applicable_checks = [check for check in result['checks'] if check.get('applies') is True] - all_checks.extend(applicable_checks) - elif result and isinstance(result, str) and result.strip(): - # 如果是普通文本,保留原有逻辑 - logger.info(f"Non-JSON result from batch {i}: {result}") - - if all_checks: - # 将checks数组格式化为JSON字符串 - guideline_analysis = "\n".join([item["rationale"] for item in all_checks]) - # guideline_analysis = json.dumps({"checks": all_checks}, ensure_ascii=False) - logger.info(f"Merged guideline analysis result: {guideline_analysis}") - - # 将分析结果添加到最后一个消息的内容中 - if guideline_analysis: - messages = append_user_last_message(messages, f"\n\nActive Guidelines:\n{guideline_analysis}\nPlease follow these guidelines in your response.") - - else: - # 3. 从全局管理器获取或创建助手实例 - agent = await agent_manager.get_or_create_agent( - bot_id=bot_id, - project_dir=project_dir, - model_name=model_name, - api_key=api_key, - model_server=model_server, - generate_cfg=generate_cfg, - language=language, - system_prompt=system_prompt, - mcp_settings=mcp_settings, - robot_type=robot_type, - user_identifier=user_identifier - ) - - messages = append_user_last_message(messages, f"\n\nlanguage:{get_language_text(language)}") + logger.info(f"Stream mode: Generated preamble text ({len(preamble_text)} chars)") - if guideline_analysis != '': - pre_message_list.append({"role": "assistant","reasoning_content": guideline_analysis}) - # 根据stream参数决定返回流式还是非流式响应 + # 如果是流式模式,使用增强的流式响应生成器 if stream: return StreamingResponse( - generate_stream_response(agent, messages, pre_message_list, tool_response, model_name), + enhanced_generate_stream_response( + agent_manager=agent_manager, + bot_id=bot_id, + api_key=api_key, + messages=messages, + tool_response=tool_response, + model_name=model_name, + model_server=model_server, + language=language, + system_prompt=system_prompt or "", + mcp_settings=mcp_settings, + robot_type=robot_type, + project_dir=project_dir, + generate_cfg=generate_cfg, + user_identifier=user_identifier, + pre_message_list=pre_message_list + ), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "Connection": "keep-alive"} ) + + + + # 使用公共函数处理所有逻辑 + agent, _, guideline_analysis, _ = await process_guidelines_and_terms( + bot_id=bot_id, + api_key=api_key, + model_name=model_name, + model_server=model_server, + system_prompt=system_prompt or "", + messages=messages, + agent_manager=agent_manager, + project_dir=project_dir, + generate_cfg=generate_cfg, + language=language, + mcp_settings=mcp_settings, + robot_type=robot_type, + user_identifier=user_identifier + ) + + # 准备最终的消息 + final_messages = messages.copy() + final_messages = append_user_last_message(final_messages, f"\n\nlanguage:{get_language_text(language)}") + + if guideline_analysis: + final_messages = append_user_last_message(final_messages, f"\n\nActive Guidelines:\n{guideline_analysis}\nPlease follow these guidelines in your response.") + pre_message_list.append({"role": "assistant","reasoning_content": guideline_analysis}) + + # 非流式响应 + agent_responses = agent.run_nonstream(final_messages) + final_responses = pre_message_list + agent_responses + if final_responses and len(final_responses) > 0: + # 使用 get_content_from_messages 处理响应,支持 tool_response 参数 + content = get_content_from_messages(final_responses, tool_response=tool_response) + + # 构造OpenAI格式的响应 + return ChatResponse( + choices=[{ + "index": 0, + "message": { + "role": "assistant", + "content": content + }, + "finish_reason": "stop" + }], + usage={ + "prompt_tokens": sum(len(msg.get("content", "")) for msg in messages), + "completion_tokens": len(content), + "total_tokens": sum(len(msg.get("content", "")) for msg in messages) + len(content) + } + ) else: - # 非流式响应 - agent_responses = agent.run_nonstream(messages) - final_responses = pre_message_list+agent_responses - if final_responses and len(final_responses) > 0: - # 使用 get_content_from_messages 处理响应,支持 tool_response 参数 - content = get_content_from_messages(final_responses, tool_response=tool_response) - - # 构造OpenAI格式的响应 - return ChatResponse( - choices=[{ - "index": 0, - "message": { - "role": "assistant", - "content": content - }, - "finish_reason": "stop" - }], - usage={ - "prompt_tokens": sum(len(msg.get("content", "")) for msg in messages), - "completion_tokens": len(content), - "total_tokens": sum(len(msg.get("content", "")) for msg in messages) + len(content) - } - ) - else: - raise HTTPException(status_code=500, detail="No response from agent") + raise HTTPException(status_code=500, detail="No response from agent") @router.post("/api/v1/chat/completions")