From 16c50fa261baebf6d07576476ca566976e4e0f4e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E6=BD=AE?= Date: Sat, 21 Mar 2026 02:16:21 +0800 Subject: [PATCH] =?UTF-8?q?=E8=AF=AD=E9=9F=B3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- routes/voice.py | 6 ++ services/voice_session_manager.py | 109 ++++++++++++++++++++++-------- 2 files changed, 86 insertions(+), 29 deletions(-) diff --git a/routes/voice.py b/routes/voice.py index 7031328..d224230 100644 --- a/routes/voice.py +++ b/routes/voice.py @@ -27,6 +27,7 @@ async def voice_realtime(websocket: WebSocket): Server sends: - {"type": "audio", "data": ""} - {"type": "asr_text", "text": "recognized text"} + - {"type": "agent_stream", "text": "incremental text chunk"} - {"type": "agent_result", "text": "agent answer"} - {"type": "llm_text", "text": "polished answer"} - {"type": "status", "status": "ready|listening|thinking|speaking|idle"} @@ -56,6 +57,10 @@ async def voice_realtime(websocket: WebSocket): async def on_agent_result(text: str): await send_json({"type": "agent_result", "text": text}) + async def on_agent_stream(text: str): + """Forward streaming agent text chunks to frontend""" + await send_json({"type": "agent_stream", "text": text}) + async def on_llm_text(text: str): await send_json({"type": "llm_text", "text": text}) @@ -93,6 +98,7 @@ async def voice_realtime(websocket: WebSocket): on_audio=on_audio, on_asr_text=on_asr_text, on_agent_result=on_agent_result, + on_agent_stream=on_agent_stream, on_llm_text=on_llm_text, on_status=on_status, on_error=on_error, diff --git a/services/voice_session_manager.py b/services/voice_session_manager.py index 0a2ac0c..0d9109e 100644 --- a/services/voice_session_manager.py +++ b/services/voice_session_manager.py @@ -20,6 +20,7 @@ class VoiceSession: on_audio: Optional[Callable[[bytes], Awaitable[None]]] = None, on_asr_text: Optional[Callable[[str], Awaitable[None]]] = None, on_agent_result: Optional[Callable[[str], Awaitable[None]]] = None, + on_agent_stream: Optional[Callable[[str], Awaitable[None]]] = None, on_llm_text: Optional[Callable[[str], Awaitable[None]]] = None, on_status: Optional[Callable[[str], Awaitable[None]]] = None, on_error: Optional[Callable[[str], Awaitable[None]]] = None, @@ -35,6 +36,7 @@ class VoiceSession: self._on_audio = on_audio self._on_asr_text = on_asr_text self._on_agent_result = on_agent_result + self._on_agent_stream = on_agent_stream self._on_llm_text = on_llm_text self._on_status = on_status self._on_error = on_error @@ -45,6 +47,7 @@ class VoiceSession: # When True, discard TTS audio from SERVER_ACK (comfort speech period) self._is_sending_chat_tts_text = False self._receive_task: Optional[asyncio.Task] = None + self._agent_task: Optional[asyncio.Task] = None async def start(self) -> None: """Fetch bot config, connect to Volcengine and start receiving responses""" @@ -75,6 +78,10 @@ class VoiceSession: except Exception as e: logger.warning(f"Error during session cleanup: {e}") finally: + if self._agent_task and not self._agent_task.done(): + from utils.cancel_manager import trigger_cancel + trigger_cancel(self.session_id) + self._agent_task.cancel() if self._receive_task and not self._receive_task.done(): self._receive_task.cancel() await self.realtime_client.close() @@ -141,15 +148,28 @@ class VoiceSession: self._is_user_querying = False asr_text = self._current_asr_text + # Filter out ASR during thinking/speaking (TTS echo protection) + if self._is_sending_chat_tts_text: + logger.info(f"[Voice] Discarding ASR during thinking/speaking: '{asr_text}'") + return + logger.info(f"[Voice] ASR result: '{asr_text}'") if self._on_asr_text and asr_text: await self._on_asr_text(asr_text) await self._emit_status("thinking") + # Cancel previous agent task if still running + if self._agent_task and not self._agent_task.done(): + logger.info(f"[Voice] Interrupting previous agent task") + from utils.cancel_manager import trigger_cancel + trigger_cancel(self.session_id) + self._agent_task.cancel() + self._agent_task = None + # Trigger comfort TTS + agent call self._is_sending_chat_tts_text = True - asyncio.create_task(self._on_asr_text_received(asr_text)) + self._agent_task = asyncio.create_task(self._on_asr_text_received(asr_text)) elif event == 350: # TTS segment completed @@ -159,14 +179,14 @@ class VoiceSession: logger.info(f"[Voice] TTS segment done, type={tts_type}, is_sending={self._is_sending_chat_tts_text}") # When comfort TTS or RAG TTS finishes, stop discarding audio - if self._is_sending_chat_tts_text and tts_type in ("chat_tts_text", "external_rag"): + if self._is_sending_chat_tts_text and tts_type == "chat_tts_text": self._is_sending_chat_tts_text = False logger.info(f"[Voice] Comfort/RAG TTS done, resuming audio forwarding") elif event == 359: # TTS fully completed (all segments done) logger.info(f"[Voice] TTS fully completed") - await self._emit_status("idle") + # await self._emit_status("idle") elif event in (152, 153): logger.info(f"[Voice] Session finished event: {event}") @@ -185,17 +205,17 @@ class VoiceSession: try: # 1. Send comfort TTS (real Chinese text, not "...") - logger.info(f"[Voice] Sending comfort TTS...") - await self.realtime_client.chat_tts_text( - content="请稍等,让我查一下。", - start=True, - end=False, - ) - await self.realtime_client.chat_tts_text( - content="", - start=False, - end=True, - ) + # logger.info(f"[Voice] Sending comfort TTS...") + # await self.realtime_client.chat_tts_text( + # content="请稍等,让我查一下。", + # start=True, + # end=False, + # ) + # await self.realtime_client.chat_tts_text( + # content="", + # start=False, + # end=True, + # ) # 2. Call v3 agent (this may take a while) logger.info(f"[Voice] Calling v3 agent with text: '{text}'") @@ -205,22 +225,36 @@ class VoiceSession: if self._on_agent_result and agent_result: await self._on_agent_result(agent_result) - # 3. Inject RAG result so the built-in LLM can polish and TTS it + # 3. Send agent result directly as TTS (bypass LLM) if agent_result: clean_result = self._extract_answer(agent_result) - logger.info(f"[Voice] Injecting RAG text ({len(clean_result)} chars): {clean_result[:200]}") - await self.realtime_client.chat_rag_text(clean_result) + logger.info(f"[Voice] Sending agent result as TTS ({len(clean_result)} chars)") + await self.realtime_client.chat_tts_text( + content=clean_result, + start=True, + end=False, + ) + await self.realtime_client.chat_tts_text( + content="", + start=False, + end=True, + ) else: - logger.warning(f"[Voice] Agent returned empty result, skipping RAG injection") + logger.warning(f"[Voice] Agent returned empty result") self._is_sending_chat_tts_text = False + await self._emit_status("idle") + except asyncio.CancelledError: + logger.info(f"[Voice] Agent task cancelled (user interrupted)") + self._is_sending_chat_tts_text = False + raise except Exception as e: logger.error(f"[Voice] Error in ASR text callback: {e}", exc_info=True) self._is_sending_chat_tts_text = False await self._emit_error(f"Agent call failed: {str(e)}") async def _call_v3_agent(self, user_text: str) -> str: - """Call v3 agent API internally (stream=false) to get full reasoning result""" + """Call v3 agent API in streaming mode, accumulate text and return full result""" try: from utils.api_models import ChatRequestV3, Message from utils.fastapi_utils import ( @@ -228,18 +262,17 @@ class VoiceSession: create_project_directory, ) from agent.agent_config import AgentConfig - from routes.chat import create_agent_and_generate_response + from routes.chat import enhanced_generate_stream_response bot_config = self._bot_config language = bot_config.get("language", "zh") - messages_raw = [{"role": "user", "content": user_text}] messages_obj = [Message(role="user", content=user_text)] request = ChatRequestV3( messages=messages_obj, bot_id=self.bot_id, - stream=False, + stream=True, session_id=self.session_id, user_identifier=self.user_identifier, ) @@ -259,16 +292,34 @@ class VoiceSession: processed_messages, language, ) - config.stream = False + config.stream = True - result = await create_agent_and_generate_response(config) + # Consume the async generator, parse SSE chunks to accumulate content + accumulated_text = [] + async for sse_line in enhanced_generate_stream_response(config): + if not sse_line or not sse_line.startswith("data: "): + continue + data_str = sse_line.strip().removeprefix("data: ") + if data_str == "[DONE]": + break + try: + data = json.loads(data_str) + choices = data.get("choices", []) + if choices: + delta = choices[0].get("delta", {}) + content = delta.get("content", "") + if content: + accumulated_text.append(content) + if self._on_agent_stream: + await self._on_agent_stream(content) + except (json.JSONDecodeError, KeyError): + continue - if hasattr(result, 'choices'): - choices = result.choices - if choices and len(choices) > 0: - return choices[0].get("message", {}).get("content", "") - return "" + return "".join(accumulated_text) + except asyncio.CancelledError: + logger.info(f"[Voice] v3 agent call cancelled") + raise except Exception as e: logger.error(f"[Voice] Error calling v3 agent: {e}", exc_info=True) return ""