add default transport

This commit is contained in:
朱潮 2025-12-13 03:09:25 +08:00
parent ec9558fd4c
commit 25b4d9a87d

View File

@ -87,94 +87,161 @@ async def enhanced_generate_stream_response(
generate_cfg: Optional[dict],
user_identifier: Optional[str]
):
"""增强的渐进式流式响应生成器"""
"""增强的渐进式流式响应生成器 - 并发优化版本"""
try:
# 第一阶段并行启动preamble_text生成和第二阶段处理
# 准备参数
query_text = get_user_last_message_content(messages)
chat_history = format_messages_to_chat_history(messages)
# 创建preamble_text生成任务
preamble_text, system_prompt = get_preamble_text(language, system_prompt)
# 创建输出队列和控制事件
output_queue = asyncio.Queue()
preamble_completed = asyncio.Event()
# Preamble 任务
async def preamble_task():
try:
preamble_result = await call_preamble_llm(chat_history, query_text, preamble_text, language, model_name, api_key, model_server)
# 只有当preamble_text不为空且不为"<empty>"时才输出
if preamble_result and preamble_result.strip() and preamble_result != "<empty>":
preamble_content = f"[PREAMBLE]\n{preamble_result}\n"
chunk_data = create_stream_chunk(f"chatcmpl-preamble", model_name, preamble_content)
await output_queue.put(("preamble", f"data: {json.dumps(chunk_data, ensure_ascii=False)}\n\n"))
logger.info(f"Stream mode: Generated preamble text ({len(preamble_result)} chars)")
else:
logger.info("Stream mode: Skipped empty preamble text")
# 标记 preamble 完成
preamble_completed.set()
await output_queue.put(("preamble_done", None))
except Exception as e:
logger.error(f"Error generating preamble text: {e}")
# 即使出错也要标记完成,避免阻塞
preamble_completed.set()
await output_queue.put(("preamble_done", None))
# Agent 任务(准备 + 流式处理)
async def agent_task():
try:
# 准备 agent
agent = await agent_manager.get_or_create_agent(
bot_id=bot_id,
project_dir=project_dir,
model_name=model_name,
api_key=api_key,
model_server=model_server,
generate_cfg=generate_cfg,
language=language,
system_prompt=system_prompt,
mcp_settings=mcp_settings,
robot_type=robot_type,
user_identifier=user_identifier
)
# 开始流式处理
logger.info(f"Starting agent stream response")
chunk_id = 0
message_tag = ""
function_name = ""
tool_args = ""
async for msg, metadata in agent.astream({"messages": messages}, stream_mode="messages"):
new_content = ""
if isinstance(msg, AIMessageChunk):
# 判断是否有工具调用
if msg.tool_call_chunks: # 检查工具调用块
if message_tag != "TOOL_CALL":
message_tag = "TOOL_CALL"
if msg.tool_call_chunks[0]["name"]:
function_name = msg.tool_call_chunks[0]["name"]
if msg.tool_call_chunks[0]["args"]:
tool_args += msg.tool_call_chunks[0]["args"]
elif len(msg.content) > 0:
preamble_completed.set()
await output_queue.put(("preamble_done", None))
if message_tag != "ANSWER":
message_tag = "ANSWER"
new_content = f"[{message_tag}]\n{msg.text}"
elif message_tag == "ANSWER":
new_content = msg.text
elif message_tag == "TOOL_CALL" and \
(
("finish_reason" in msg.response_metadata and msg.response_metadata["finish_reason"] == "tool_calls") or \
("stop_reason" in msg.response_metadata and msg.response_metadata["stop_reason"] == "tool_use")
):
new_content = f"[{message_tag}] {function_name}\n{tool_args}"
message_tag = "TOOL_CALL"
elif isinstance(msg, ToolMessage) and len(msg.content) > 0:
message_tag = "TOOL_RESPONSE"
new_content = f"[{message_tag}] {msg.name}\n{msg.text}"
elif isinstance(msg, AIMessage) and msg.additional_kwargs and "thinking" in msg.additional_kwargs:
new_content = "[THINK]\n" + msg.additional_kwargs["thinking"] + "\n"
# 等待preamble_text任务完成
try:
preamble_text = await call_preamble_llm(chat_history, query_text, preamble_text, language, model_name, api_key, model_server)
# 只有当preamble_text不为空且不为"<empty>"时才输出
if preamble_text and preamble_text.strip() and preamble_text != "<empty>":
preamble_content = f"[PREAMBLE]\n{preamble_text}\n"
chunk_data = create_stream_chunk(f"chatcmpl-preamble", model_name, preamble_content)
yield f"data: {json.dumps(chunk_data, ensure_ascii=False)}\n\n"
logger.info(f"Stream mode: Generated preamble text ({len(preamble_text)} chars)")
else:
logger.info("Stream mode: Skipped empty preamble text")
except Exception as e:
logger.error(f"Error generating preamble text: {e}")
# 等待guideline分析任务完成
agent = await agent_manager.get_or_create_agent(
bot_id=bot_id,
project_dir=project_dir,
model_name=model_name,
api_key=api_key,
model_server=model_server,
generate_cfg=generate_cfg,
language=language,
system_prompt=system_prompt,
mcp_settings=mcp_settings,
robot_type=robot_type,
user_identifier=user_identifier
)
# 第三阶段agent响应流式传输
logger.info(f"Starting agent stream response")
chunk_id = 0
message_tag = ""
function_name = ""
tool_args = ""
async for msg,metadata in agent.astream({"messages": messages}, stream_mode="messages"):
new_content = ""
if isinstance(msg, AIMessageChunk):
# 判断是否有工具调用
if msg.tool_call_chunks: # 检查工具调用块
if message_tag != "TOOL_CALL":
message_tag = "TOOL_CALL"
if msg.tool_call_chunks[0]["name"]:
function_name = msg.tool_call_chunks[0]["name"]
if msg.tool_call_chunks[0]["args"]:
tool_args += msg.tool_call_chunks[0]["args"]
elif len(msg.content)>0:
if message_tag != "ANSWER":
message_tag = "ANSWER"
new_content = f"[{message_tag}]\n{msg.text}"
elif message_tag == "ANSWER":
new_content = msg.text
elif message_tag == "TOOL_CALL" and \
(
("finish_reason" in msg.response_metadata and msg.response_metadata["finish_reason"] == "tool_calls") or \
("stop_reason" in msg.response_metadata and msg.response_metadata["stop_reason"] == "tool_use")
):
new_content = f"[{message_tag}] {function_name}\n{tool_args}"
message_tag = "TOOL_CALL"
elif isinstance(msg, ToolMessage) and len(msg.content)>0:
message_tag = "TOOL_RESPONSE"
new_content = f"[{message_tag}] {msg.name}\n{msg.text}"
elif isinstance(msg, AIMessage) and msg.additional_kwargs and "thinking" in msg.additional_kwargs:
new_content = "[THINK]\n"+msg.additional_kwargs["thinking"]+ "\n"
# 只有当有新内容时才发送chunk
if new_content:
if chunk_id == 0:
logger.info(f"Agent首个Token已生成, 开始流式输出")
chunk_id += 1
chunk_data = create_stream_chunk(f"chatcmpl-{chunk_id}", model_name, new_content)
yield f"data: {json.dumps(chunk_data, ensure_ascii=False)}\n\n"
final_chunk = create_stream_chunk(f"chatcmpl-{chunk_id + 1}", model_name, finish_reason="stop")
yield f"data: {json.dumps(final_chunk, ensure_ascii=False)}\n\n"
# 只有当有新内容时才发送chunk
if new_content:
if chunk_id == 0:
logger.info(f"Agent首个Token已生成, 开始流式输出")
chunk_id += 1
chunk_data = create_stream_chunk(f"chatcmpl-{chunk_id}", model_name, new_content)
await output_queue.put(("agent", f"data: {json.dumps(chunk_data, ensure_ascii=False)}\n\n"))
# 发送最终chunk
final_chunk = create_stream_chunk(f"chatcmpl-{chunk_id + 1}", model_name, finish_reason="stop")
await output_queue.put(("agent", f"data: {json.dumps(final_chunk, ensure_ascii=False)}\n\n"))
await output_queue.put(("agent_done", None))
except Exception as e:
logger.error(f"Error in agent task: {e}")
await output_queue.put(("agent_done", None))
# 并发执行任务
preamble_task_handle = asyncio.create_task(preamble_task())
agent_task_handle = asyncio.create_task(agent_task())
# 输出控制器:确保 preamble 先输出,然后是 agent stream
preamble_output_done = False
while True:
try:
# 设置超时避免无限等待
item_type, item_data = await asyncio.wait_for(output_queue.get(), timeout=1.0)
if item_type == "preamble":
# 立即输出 preamble 内容
if item_data:
yield item_data
preamble_output_done = True
elif item_type == "preamble_done":
# Preamble 已完成,标记并继续处理
preamble_output_done = True
elif item_type == "agent":
# Agent stream 内容,需要等待 preamble 输出完成
if preamble_output_done:
yield item_data
else:
# preamble 还没输出,先放回队列
await output_queue.put((item_type, item_data))
# 等待 preamble 完成
await preamble_completed.wait()
preamble_output_done = True
elif item_type == "agent_done":
# Agent stream 完成,结束循环
break
except asyncio.TimeoutError:
# 检查是否还有任务在运行
if all(task.done() for task in [preamble_task_handle, agent_task_handle]):
# 所有任务都完成了,退出循环
break
continue
# 发送结束标记
yield "data: [DONE]\n\n"
logger.info(f"Enhanced stream response completed, total chunks: {chunk_id}")
logger.info(f"Enhanced stream response completed")
except Exception as e:
import traceback