From 25b4d9a87dc215fd674243feb58e900f5b5f21a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E6=BD=AE?= Date: Sat, 13 Dec 2025 03:09:25 +0800 Subject: [PATCH] add default transport --- routes/chat.py | 229 ++++++++++++++++++++++++++++++++----------------- 1 file changed, 148 insertions(+), 81 deletions(-) diff --git a/routes/chat.py b/routes/chat.py index c1fcd9e..5fb843a 100644 --- a/routes/chat.py +++ b/routes/chat.py @@ -87,94 +87,161 @@ async def enhanced_generate_stream_response( generate_cfg: Optional[dict], user_identifier: Optional[str] ): - """增强的渐进式流式响应生成器""" + """增强的渐进式流式响应生成器 - 并发优化版本""" try: - # 第一阶段:并行启动preamble_text生成和第二阶段处理 + # 准备参数 query_text = get_user_last_message_content(messages) chat_history = format_messages_to_chat_history(messages) - - # 创建preamble_text生成任务 preamble_text, system_prompt = get_preamble_text(language, system_prompt) + + # 创建输出队列和控制事件 + output_queue = asyncio.Queue() + preamble_completed = asyncio.Event() + + # Preamble 任务 + async def preamble_task(): + try: + preamble_result = await call_preamble_llm(chat_history, query_text, preamble_text, language, model_name, api_key, model_server) + # 只有当preamble_text不为空且不为""时才输出 + if preamble_result and preamble_result.strip() and preamble_result != "": + preamble_content = f"[PREAMBLE]\n{preamble_result}\n" + chunk_data = create_stream_chunk(f"chatcmpl-preamble", model_name, preamble_content) + await output_queue.put(("preamble", f"data: {json.dumps(chunk_data, ensure_ascii=False)}\n\n")) + logger.info(f"Stream mode: Generated preamble text ({len(preamble_result)} chars)") + else: + logger.info("Stream mode: Skipped empty preamble text") + + # 标记 preamble 完成 + preamble_completed.set() + await output_queue.put(("preamble_done", None)) + + except Exception as e: + logger.error(f"Error generating preamble text: {e}") + # 即使出错也要标记完成,避免阻塞 + preamble_completed.set() + await output_queue.put(("preamble_done", None)) + + # Agent 任务(准备 + 流式处理) + async def agent_task(): + try: + # 准备 agent + agent = await agent_manager.get_or_create_agent( + bot_id=bot_id, + project_dir=project_dir, + model_name=model_name, + api_key=api_key, + model_server=model_server, + generate_cfg=generate_cfg, + language=language, + system_prompt=system_prompt, + mcp_settings=mcp_settings, + robot_type=robot_type, + user_identifier=user_identifier + ) + + # 开始流式处理 + logger.info(f"Starting agent stream response") + chunk_id = 0 + message_tag = "" + function_name = "" + tool_args = "" + + async for msg, metadata in agent.astream({"messages": messages}, stream_mode="messages"): + new_content = "" + if isinstance(msg, AIMessageChunk): + # 判断是否有工具调用 + if msg.tool_call_chunks: # 检查工具调用块 + if message_tag != "TOOL_CALL": + message_tag = "TOOL_CALL" + if msg.tool_call_chunks[0]["name"]: + function_name = msg.tool_call_chunks[0]["name"] + if msg.tool_call_chunks[0]["args"]: + tool_args += msg.tool_call_chunks[0]["args"] + elif len(msg.content) > 0: + preamble_completed.set() + await output_queue.put(("preamble_done", None)) + if message_tag != "ANSWER": + message_tag = "ANSWER" + new_content = f"[{message_tag}]\n{msg.text}" + elif message_tag == "ANSWER": + new_content = msg.text + elif message_tag == "TOOL_CALL" and \ + ( + ("finish_reason" in msg.response_metadata and msg.response_metadata["finish_reason"] == "tool_calls") or \ + ("stop_reason" in msg.response_metadata and msg.response_metadata["stop_reason"] == "tool_use") + ): + new_content = f"[{message_tag}] {function_name}\n{tool_args}" + message_tag = "TOOL_CALL" + elif isinstance(msg, ToolMessage) and len(msg.content) > 0: + message_tag = "TOOL_RESPONSE" + new_content = f"[{message_tag}] {msg.name}\n{msg.text}" + elif isinstance(msg, AIMessage) and msg.additional_kwargs and "thinking" in msg.additional_kwargs: + new_content = "[THINK]\n" + msg.additional_kwargs["thinking"] + "\n" - # 等待preamble_text任务完成 - try: - preamble_text = await call_preamble_llm(chat_history, query_text, preamble_text, language, model_name, api_key, model_server) - # 只有当preamble_text不为空且不为""时才输出 - if preamble_text and preamble_text.strip() and preamble_text != "": - preamble_content = f"[PREAMBLE]\n{preamble_text}\n" - chunk_data = create_stream_chunk(f"chatcmpl-preamble", model_name, preamble_content) - yield f"data: {json.dumps(chunk_data, ensure_ascii=False)}\n\n" - logger.info(f"Stream mode: Generated preamble text ({len(preamble_text)} chars)") - else: - logger.info("Stream mode: Skipped empty preamble text") - except Exception as e: - logger.error(f"Error generating preamble text: {e}") - - # 等待guideline分析任务完成 - agent = await agent_manager.get_or_create_agent( - bot_id=bot_id, - project_dir=project_dir, - model_name=model_name, - api_key=api_key, - model_server=model_server, - generate_cfg=generate_cfg, - language=language, - system_prompt=system_prompt, - mcp_settings=mcp_settings, - robot_type=robot_type, - user_identifier=user_identifier - ) - - # 第三阶段:agent响应流式传输 - logger.info(f"Starting agent stream response") - chunk_id = 0 - message_tag = "" - function_name = "" - tool_args = "" - async for msg,metadata in agent.astream({"messages": messages}, stream_mode="messages"): - new_content = "" - if isinstance(msg, AIMessageChunk): - # 判断是否有工具调用 - if msg.tool_call_chunks: # 检查工具调用块 - if message_tag != "TOOL_CALL": - message_tag = "TOOL_CALL" - if msg.tool_call_chunks[0]["name"]: - function_name = msg.tool_call_chunks[0]["name"] - if msg.tool_call_chunks[0]["args"]: - tool_args += msg.tool_call_chunks[0]["args"] - elif len(msg.content)>0: - if message_tag != "ANSWER": - message_tag = "ANSWER" - new_content = f"[{message_tag}]\n{msg.text}" - elif message_tag == "ANSWER": - new_content = msg.text - elif message_tag == "TOOL_CALL" and \ - ( - ("finish_reason" in msg.response_metadata and msg.response_metadata["finish_reason"] == "tool_calls") or \ - ("stop_reason" in msg.response_metadata and msg.response_metadata["stop_reason"] == "tool_use") - ): - new_content = f"[{message_tag}] {function_name}\n{tool_args}" - message_tag = "TOOL_CALL" - elif isinstance(msg, ToolMessage) and len(msg.content)>0: - message_tag = "TOOL_RESPONSE" - new_content = f"[{message_tag}] {msg.name}\n{msg.text}" - elif isinstance(msg, AIMessage) and msg.additional_kwargs and "thinking" in msg.additional_kwargs: - new_content = "[THINK]\n"+msg.additional_kwargs["thinking"]+ "\n" - - # 只有当有新内容时才发送chunk - if new_content: - if chunk_id == 0: - logger.info(f"Agent首个Token已生成, 开始流式输出") - chunk_id += 1 - chunk_data = create_stream_chunk(f"chatcmpl-{chunk_id}", model_name, new_content) - yield f"data: {json.dumps(chunk_data, ensure_ascii=False)}\n\n" - - final_chunk = create_stream_chunk(f"chatcmpl-{chunk_id + 1}", model_name, finish_reason="stop") - yield f"data: {json.dumps(final_chunk, ensure_ascii=False)}\n\n" - + # 只有当有新内容时才发送chunk + if new_content: + if chunk_id == 0: + logger.info(f"Agent首个Token已生成, 开始流式输出") + chunk_id += 1 + chunk_data = create_stream_chunk(f"chatcmpl-{chunk_id}", model_name, new_content) + await output_queue.put(("agent", f"data: {json.dumps(chunk_data, ensure_ascii=False)}\n\n")) + + # 发送最终chunk + final_chunk = create_stream_chunk(f"chatcmpl-{chunk_id + 1}", model_name, finish_reason="stop") + await output_queue.put(("agent", f"data: {json.dumps(final_chunk, ensure_ascii=False)}\n\n")) + await output_queue.put(("agent_done", None)) + + except Exception as e: + logger.error(f"Error in agent task: {e}") + await output_queue.put(("agent_done", None)) + + # 并发执行任务 + preamble_task_handle = asyncio.create_task(preamble_task()) + agent_task_handle = asyncio.create_task(agent_task()) + + # 输出控制器:确保 preamble 先输出,然后是 agent stream + preamble_output_done = False + + while True: + try: + # 设置超时避免无限等待 + item_type, item_data = await asyncio.wait_for(output_queue.get(), timeout=1.0) + + if item_type == "preamble": + # 立即输出 preamble 内容 + if item_data: + yield item_data + preamble_output_done = True + + elif item_type == "preamble_done": + # Preamble 已完成,标记并继续处理 + preamble_output_done = True + + elif item_type == "agent": + # Agent stream 内容,需要等待 preamble 输出完成 + if preamble_output_done: + yield item_data + else: + # preamble 还没输出,先放回队列 + await output_queue.put((item_type, item_data)) + # 等待 preamble 完成 + await preamble_completed.wait() + preamble_output_done = True + + elif item_type == "agent_done": + # Agent stream 完成,结束循环 + break + + except asyncio.TimeoutError: + # 检查是否还有任务在运行 + if all(task.done() for task in [preamble_task_handle, agent_task_handle]): + # 所有任务都完成了,退出循环 + break + continue + # 发送结束标记 yield "data: [DONE]\n\n" - logger.info(f"Enhanced stream response completed, total chunks: {chunk_id}") + logger.info(f"Enhanced stream response completed") except Exception as e: import traceback