diff --git a/prompt/preamble_prompt.md b/prompt/preamble_prompt.md index 4dfbc99..1258222 100644 --- a/prompt/preamble_prompt.md +++ b/prompt/preamble_prompt.md @@ -9,7 +9,7 @@ FIRST, determine if this is a COMPLEX scenario that requires a preamble: **Complex Scenarios (preamble needed):** - Query scenarios: User is asking for information, searching, or looking up data - Action scenarios: User wants to perform an operation, execute a task -- Knowledge retrieval scenarios: User needs to search knowledge base, documents, or databases +- Knowledge retrieval scenarios: User needs to search knowledge base, documents、databases or Internet - Problem-solving: User is reporting issues, asking for help with problems - Complex requests: Multi-step tasks, detailed instructions needed diff --git a/routes/chat.py b/routes/chat.py index e1296fd..e88d8e9 100644 --- a/routes/chat.py +++ b/routes/chat.py @@ -16,7 +16,8 @@ from utils.api_models import ChatRequestV2 from utils.fastapi_utils import ( process_messages, extract_block_from_system_prompt, format_messages_to_chat_history, create_project_directory, extract_api_key_from_auth, generate_v2_auth_token, fetch_bot_config, - call_guideline_llm, _get_optimal_batch_size, process_guideline_batch, get_content_from_messages, call_preamble_llm, get_preamble_text, get_language_text + call_guideline_llm, _get_optimal_batch_size, process_guideline_batch, get_content_from_messages, call_preamble_llm, get_preamble_text, get_language_text, + create_stream_chunk ) router = APIRouter() @@ -56,97 +57,6 @@ def append_user_last_message(messages: list, content: str) -> bool: return messages -async def generate_stream_response(agent, messages, pre_message_list, tool_response: bool, model: str): - """生成流式响应""" - accumulated_content = "" - - - chunk_id = 0 - try: - - if len(pre_message_list)>0: - accumulated_content = get_content_from_messages(pre_message_list, tool_response=tool_response) - chunk_data = { - "id": f"chatcmpl-thought", - "object": "chat.completion.chunk", - "created": int(__import__('time').time()), - "model": model, - "choices": [{ - "index": 0, - "delta": { - "content": accumulated_content - }, - "finish_reason": None - }] - } - yield f"data: {json.dumps(chunk_data, ensure_ascii=False)}\n\n" - - logger.info(f"开始生成Agent流式响应, model: {model}") - for response in agent.run(messages=messages): - previous_content = accumulated_content - accumulated_content = get_content_from_messages(response, tool_response=tool_response) - - # 计算新增的内容 - if accumulated_content.startswith(previous_content): - new_content = accumulated_content[len(previous_content):] - else: - new_content = accumulated_content - previous_content = "" - - # 只有当有新内容时才发送chunk - if new_content: - if chunk_id == 0: - logger.info(f"Agent首个Token已生成, 开始流式输出") - chunk_id += 1 - # 构造OpenAI格式的流式响应 - chunk_data = { - "id": f"chatcmpl-{chunk_id}", - "object": "chat.completion.chunk", - "created": int(__import__('time').time()), - "model": model, - "choices": [{ - "index": 0, - "delta": { - "content": new_content - }, - "finish_reason": None - }] - } - - yield f"data: {json.dumps(chunk_data, ensure_ascii=False)}\n\n" - - # 发送最终完成标记 - final_chunk = { - "id": f"chatcmpl-{chunk_id + 1}", - "object": "chat.completion.chunk", - "created": int(__import__('time').time()), - "model": model, - "choices": [{ - "index": 0, - "delta": {}, - "finish_reason": "stop" - }] - } - yield f"data: {json.dumps(final_chunk, ensure_ascii=False)}\n\n" - - # 发送结束标记 - yield "data: [DONE]\n\n" - logger.info(f"Agent流式响应完成, 总共生成 {chunk_id} 个chunks") - except Exception as e: - import traceback - error_details = traceback.format_exc() - - logger.error(f"Error in generate_stream_response: {str(e)}") - logger.error(f"Full traceback: {error_details}") - - error_data = { - "error": { - "message": f"Stream error: {str(e)}", - "type": "internal_error" - } - } - yield f"data: {json.dumps(error_data, ensure_ascii=False)}\n\n" - async def process_guidelines_and_terms( bot_id: str, @@ -275,7 +185,7 @@ async def process_guidelines_and_terms( logger.info(f"Non-JSON result from batch {i}: {result}") if all_checks: - guideline_analysis = "\n".join([item["rationale"] for item in all_checks]) + guideline_analysis = "\n".join([item["condition"]+":"+item["rationale"] for item in all_checks]) logger.info(f"Guideline analysis completed: {len(guideline_analysis)} chars") else: @@ -348,19 +258,8 @@ async def enhanced_generate_stream_response( preamble_text = await preamble_task # 只有当preamble_text不为空且不为""时才输出 if preamble_text and preamble_text.strip() and preamble_text != "": - chunk_data = { - "id": f"chatcmpl-preamble", - "object": "chat.completion.chunk", - "created": int(__import__('time').time()), - "model": model_name, - "choices": [{ - "index": 0, - "delta": { - "content": get_content_from_messages([{"role": "assistant","content": preamble_text + "\n"}], tool_response=tool_response) - }, - "finish_reason": None - }] - } + preamble_content = get_content_from_messages([{"role": "assistant","content": preamble_text + "\n"}], tool_response=tool_response) + chunk_data = create_stream_chunk(f"chatcmpl-preamble", model_name, preamble_content) yield f"data: {json.dumps(chunk_data, ensure_ascii=False)}\n\n" logger.info(f"Stream mode: Generated preamble text ({len(preamble_text)} chars)") else: @@ -373,19 +272,8 @@ async def enhanced_generate_stream_response( # 立即发送guideline_analysis if guideline_analysis: - chunk_data = { - "id": f"chatcmpl-guideline", - "object": "chat.completion.chunk", - "created": int(__import__('time').time()), - "model": model_name, - "choices": [{ - "index": 0, - "delta": { - "content": get_content_from_messages([{"role": "assistant","reasoning_content": guideline_analysis+ "\n"}], tool_response=tool_response) - }, - "finish_reason": None - }] - } + guideline_content = get_content_from_messages([{"role": "assistant","reasoning_content": guideline_analysis+ "\n"}], tool_response=tool_response) + chunk_data = create_stream_chunk(f"chatcmpl-guideline", model_name, guideline_content) yield f"data: {json.dumps(chunk_data, ensure_ascii=False)}\n\n" # 准备最终的消息 @@ -415,33 +303,10 @@ async def enhanced_generate_stream_response( if chunk_id == 0: logger.info(f"Agent首个Token已生成, 开始流式输出") chunk_id += 1 - chunk_data = { - "id": f"chatcmpl-{chunk_id}", - "object": "chat.completion.chunk", - "created": int(__import__('time').time()), - "model": model_name, - "choices": [{ - "index": 0, - "delta": { - "content": new_content - }, - "finish_reason": None - }] - } + chunk_data = create_stream_chunk(f"chatcmpl-{chunk_id}", model_name, new_content) yield f"data: {json.dumps(chunk_data, ensure_ascii=False)}\n\n" - # 发送最终完成标记 - final_chunk = { - "id": f"chatcmpl-{chunk_id + 1}", - "object": "chat.completion.chunk", - "created": int(__import__('time').time()), - "model": model_name, - "choices": [{ - "index": 0, - "delta": {}, - "finish_reason": "stop" - }] - } + final_chunk = create_stream_chunk(f"chatcmpl-{chunk_id + 1}", model_name, finish_reason="stop") yield f"data: {json.dumps(final_chunk, ensure_ascii=False)}\n\n" # 发送结束标记 diff --git a/utils/fastapi_utils.py b/utils/fastapi_utils.py index e7f7e53..23f67e0 100644 --- a/utils/fastapi_utils.py +++ b/utils/fastapi_utils.py @@ -78,6 +78,20 @@ def get_versioned_filename(upload_dir: str, name_without_ext: str, file_extensio return versioned_filename, next_version +def create_stream_chunk(chunk_id: str, model_name: str, content: str = None, finish_reason: str = None) -> dict: + """Create a standardized streaming response chunk""" + chunk_data = { + "id": chunk_id, + "object": "chat.completion.chunk", + "created": int(__import__('time').time()), + "model": model_name, + "choices": [{ + "index": 0, + "delta": {"content": content} if content is not None else {}, + "finish_reason": finish_reason + }] + } + return chunk_data def get_content_from_messages(messages: List[dict], tool_response: bool = True) -> str: """Extract content from qwen-agent messages with special formatting"""