Compare commits

..

5 Commits

Author SHA1 Message Date
朱潮
2d2e1dbcdf vad 2026-03-22 00:52:11 +08:00
朱潮
7a547322e3 语义分割 2026-03-22 00:42:57 +08:00
朱潮
f9e9c3c26d 默认音色 2026-03-21 23:56:58 +08:00
朱潮
d68a87dba8 支持语音合成和语音识别api 2026-03-21 23:51:02 +08:00
朱潮
99273a91d3 支持语音合成和语音识别api 2026-03-21 23:50:51 +08:00
11 changed files with 1238 additions and 210 deletions

36
poetry.lock generated
View File

@ -5302,25 +5302,19 @@ train = ["accelerate (>=0.20.3)", "datasets"]
[[package]]
name = "setuptools"
version = "82.0.1"
description = "Most extensible Python build backend with support for C/C++ extension modules"
version = "70.3.0"
description = "Easily download, build, install, upgrade, and uninstall Python packages"
optional = false
python-versions = ">=3.9"
python-versions = ">=3.8"
groups = ["main"]
markers = "python_version >= \"3.13\""
files = [
{file = "setuptools-82.0.1-py3-none-any.whl", hash = "sha256:a59e362652f08dcd477c78bb6e7bd9d80a7995bc73ce773050228a348ce2e5bb"},
{file = "setuptools-82.0.1.tar.gz", hash = "sha256:7d872682c5d01cfde07da7bccc7b65469d3dca203318515ada1de5eda35efbf9"},
{file = "setuptools-70.3.0-py3-none-any.whl", hash = "sha256:fe384da74336c398e0d956d1cae0669bc02eed936cdb1d49b57de1990dc11ffc"},
{file = "setuptools-70.3.0.tar.gz", hash = "sha256:f171bab1dfbc86b132997f26a119f6056a57950d058587841a0082e8830f9dc5"},
]
[package.extras]
check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1) ; sys_platform != \"cygwin\"", "ruff (>=0.13.0) ; sys_platform != \"cygwin\""]
core = ["importlib_metadata (>=6) ; python_version < \"3.10\"", "jaraco.functools (>=4)", "jaraco.text (>=3.7)", "more_itertools", "more_itertools (>=8.8)", "packaging (>=24.2)", "tomli (>=2.0.1) ; python_version < \"3.11\"", "wheel (>=0.43.0)"]
cover = ["pytest-cov"]
doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "pygments-github-lexers (==0.0.5)", "pyproject-hooks (!=1.1)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-favicon", "sphinx-inline-tabs", "sphinx-lint", "sphinx-notfound-page (>=1,<2)", "sphinx-reredirects", "sphinxcontrib-towncrier", "towncrier (<24.7)"]
enabler = ["pytest-enabler (>=2.2)"]
test = ["build[virtualenv] (>=1.0.3)", "filelock (>=3.4.0)", "ini2toml[lite] (>=0.14)", "jaraco.develop (>=7.21) ; python_version >= \"3.9\" and sys_platform != \"cygwin\"", "jaraco.envs (>=2.2)", "jaraco.path (>=3.7.2)", "jaraco.test (>=5.5)", "packaging (>=24.2)", "pip (>=19.1)", "pyproject-hooks (!=1.1)", "pytest (>=6,!=8.1.*)", "pytest-home (>=0.5)", "pytest-perf ; sys_platform != \"cygwin\"", "pytest-subprocess", "pytest-timeout", "pytest-xdist (>=3)", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel (>=0.44.0)"]
type = ["importlib_metadata (>=7.0.2) ; python_version < \"3.10\"", "jaraco.develop (>=7.21) ; sys_platform != \"cygwin\"", "mypy (==1.18.*)", "pytest-mypy"]
doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "pygments-github-lexers (==0.0.5)", "pyproject-hooks (!=1.1)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-favicon", "sphinx-inline-tabs", "sphinx-lint", "sphinx-notfound-page (>=1,<2)", "sphinx-reredirects", "sphinxcontrib-towncrier"]
test = ["build[virtualenv] (>=1.0.3)", "filelock (>=3.4.0)", "importlib-metadata", "ini2toml[lite] (>=0.14)", "jaraco.develop (>=7.21) ; python_version >= \"3.9\" and sys_platform != \"cygwin\"", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "jaraco.test", "mypy (==1.10.0)", "packaging (>=23.2)", "pip (>=19.1)", "pyproject-hooks (!=1.1)", "pytest (>=6,!=8.1.*)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-home (>=0.5)", "pytest-mypy", "pytest-perf ; sys_platform != \"cygwin\"", "pytest-ruff (>=0.3.2) ; sys_platform != \"cygwin\"", "pytest-subprocess", "pytest-timeout", "pytest-xdist (>=3)", "tomli", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel"]
[[package]]
name = "shellingham"
@ -6347,6 +6341,20 @@ files = [
{file = "wcwidth-0.6.0.tar.gz", hash = "sha256:cdc4e4262d6ef9a1a57e018384cbeb1208d8abbc64176027e2c2455c81313159"},
]
[[package]]
name = "webrtcvad"
version = "2.0.10"
description = "Python interface to the Google WebRTC Voice Activity Detector (VAD)"
optional = false
python-versions = "*"
groups = ["main"]
files = [
{file = "webrtcvad-2.0.10.tar.gz", hash = "sha256:f1bed2fb25b63fb7b1a55d64090c993c9c9167b28485ae0bcdd81cf6ede96aea"},
]
[package.extras]
dev = ["check-manifest", "memory_profiler", "nose", "psutil", "unittest2", "zest.releaser"]
[[package]]
name = "websockets"
version = "15.0.1"
@ -6983,4 +6991,4 @@ cffi = ["cffi (>=1.17,<2.0) ; platform_python_implementation != \"PyPy\" and pyt
[metadata]
lock-version = "2.1"
python-versions = ">=3.12,<3.15"
content-hash = "1461514ed1f9639f41f43ebb28f2a3fcd2d5a5dde954cd509c0ea7bf181e9bb6"
content-hash = "c9c4f80cdbf7d6bce20f65f40b9adce05c5f4a830299de148fcd8482937bddb0"

View File

@ -39,7 +39,9 @@ dependencies = [
"ragflow-sdk (>=0.23.0,<0.24.0)",
"httpx (>=0.28.1,<0.29.0)",
"wsgidav (>=4.3.3,<5.0.0)",
"websockets (>=15.0.0,<16.0.0)"
"websockets (>=15.0.0,<16.0.0)",
"setuptools (<71)",
"webrtcvad (>=2.0.10,<3.0.0)",
]
[tool.poetry.requires-plugins]

View File

@ -165,7 +165,7 @@ safetensors==0.7.0 ; python_version >= "3.12" and python_version < "3.15"
scikit-learn==1.8.0 ; python_version >= "3.12" and python_version < "3.15"
scipy==1.17.1 ; python_version >= "3.12" and python_version < "3.15"
sentence-transformers==3.4.1 ; python_version >= "3.12" and python_version < "3.15"
setuptools==82.0.1 ; python_version >= "3.13" and python_version < "3.15"
setuptools==70.3.0 ; python_version >= "3.12" and python_version < "3.15"
shellingham==1.5.4 ; python_version >= "3.12" and python_version < "3.15"
six==1.17.0 ; python_version >= "3.12" and python_version < "3.15"
sniffio==1.3.1 ; python_version >= "3.12" and python_version < "3.15"
@ -203,6 +203,7 @@ uvloop==0.22.1 ; python_version >= "3.12" and python_version < "3.15"
watchfiles==1.1.1 ; python_version >= "3.12" and python_version < "3.15"
wcmatch==10.1 ; python_version >= "3.12" and python_version < "3.15"
wcwidth==0.6.0 ; python_version >= "3.12" and python_version < "3.15"
webrtcvad==2.0.10 ; python_version >= "3.12" and python_version < "3.15"
websockets==15.0.1 ; python_version >= "3.12" and python_version < "3.15"
wrapt==1.17.3 ; python_version >= "3.12" and python_version < "3.15"
wsgidav==4.3.3 ; python_version >= "3.12" and python_version < "3.15"

View File

@ -7,6 +7,7 @@ from typing import Optional
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from services.voice_session_manager import VoiceSession
from utils.settings import VOICE_DEFAULT_MODE
logger = logging.getLogger('app')
@ -35,7 +36,7 @@ async def voice_realtime(websocket: WebSocket):
"""
await websocket.accept()
session: Optional[VoiceSession] = None
session = None
async def send_json(data: dict):
try:
@ -91,7 +92,9 @@ async def voice_realtime(websocket: WebSocket):
await send_json({"type": "error", "message": "bot_id is required"})
continue
session = VoiceSession(
voice_mode = msg.get("voice_mode") or VOICE_DEFAULT_MODE
session_kwargs = dict(
bot_id=bot_id,
session_id=msg.get("session_id"),
user_identifier=msg.get("user_identifier"),
@ -104,6 +107,13 @@ async def voice_realtime(websocket: WebSocket):
on_error=on_error,
)
if voice_mode == "lite":
from services.voice_lite_session import VoiceLiteSession
session = VoiceLiteSession(**session_kwargs)
logger.info(f"[Voice] Using lite mode for bot_id={bot_id}")
else:
session = VoiceSession(**session_kwargs)
try:
await session.start()
except Exception as e:

View File

@ -8,15 +8,13 @@ import websockets
from services import realtime_voice_protocol as protocol
from utils.settings import (
VOLCENGINE_REALTIME_URL,
VOLCENGINE_APP_ID,
VOLCENGINE_ACCESS_KEY,
VOLCENGINE_RESOURCE_ID,
VOLCENGINE_APP_KEY,
VOLCENGINE_DEFAULT_SPEAKER,
VOLCENGINE_TTS_SAMPLE_RATE,
)
VOLCENGINE_REALTIME_URL = "wss://openspeech.bytedance.com/api/v3/realtime/dialogue"
logger = logging.getLogger('app')
@ -48,8 +46,8 @@ class RealtimeDialogClient:
return {
"X-Api-App-ID": VOLCENGINE_APP_ID,
"X-Api-Access-Key": VOLCENGINE_ACCESS_KEY,
"X-Api-Resource-Id": VOLCENGINE_RESOURCE_ID,
"X-Api-App-Key": VOLCENGINE_APP_KEY,
"X-Api-Resource-Id": "volc.speech.dialog",
"X-Api-App-Key": "PlgvMymc7f3tQnJ6",
"X-Api-Connect-Id": self._connect_id,
}

View File

@ -0,0 +1,250 @@
import gzip
import json
import struct
import uuid
import logging
from typing import AsyncGenerator, Tuple
import websockets
from utils.settings import (
VOLCENGINE_ACCESS_KEY,
VOLCENGINE_APP_ID,
)
VOLCENGINE_ASR_URL = "wss://openspeech.bytedance.com/api/v3/sauc/bigmodel_async"
logger = logging.getLogger('app')
# Protocol constants (v3/sauc)
PROTOCOL_VERSION = 0b0001
HEADER_SIZE = 0b0001
# Message types
FULL_CLIENT_REQUEST = 0b0001
AUDIO_ONLY_REQUEST = 0b0010
FULL_SERVER_RESPONSE = 0b1001
SERVER_ERROR_RESPONSE = 0b1111
# Flags
POS_SEQUENCE = 0b0001
NEG_SEQUENCE = 0b0010
NEG_WITH_SEQUENCE = 0b0011
# Serialization / Compression
JSON_SERIAL = 0b0001
GZIP_COMPRESS = 0b0001
def _build_header(msg_type: int, flags: int = POS_SEQUENCE,
serial: int = JSON_SERIAL, compress: int = GZIP_COMPRESS) -> bytearray:
header = bytearray(4)
header[0] = (PROTOCOL_VERSION << 4) | HEADER_SIZE
header[1] = (msg_type << 4) | flags
header[2] = (serial << 4) | compress
header[3] = 0x00
return header
class StreamingASRClient:
"""Volcengine v3/sauc/bigmodel streaming ASR client."""
def __init__(self, uid: str = "voice_lite"):
self._uid = uid
self._ws = None
self._seq = 1
def _build_config(self) -> dict:
return {
"user": {
"uid": self._uid,
},
"audio": {
"format": "pcm",
"codec": "raw",
"rate": 24000,
"bits": 16,
"channel": 1,
},
"request": {
"model_name": "bigmodel",
"enable_itn": True,
"enable_punc": True,
"enable_ddc": True,
"show_utterances": True,
"enable_nonstream": False,
},
}
def _build_auth_headers(self) -> dict:
return {
"X-Api-Resource-Id": "volc.seedasr.sauc.duration",
"X-Api-Connect-Id": str(uuid.uuid4()),
"X-Api-Access-Key": VOLCENGINE_ACCESS_KEY,
"X-Api-App-Key": VOLCENGINE_APP_ID,
}
async def connect(self) -> None:
"""Connect to ASR WebSocket and send initial full_client_request."""
headers = self._build_auth_headers()
logger.info(f"[ASR] Connecting to {VOLCENGINE_ASR_URL} with headers: {headers}")
self._ws = await websockets.connect(
VOLCENGINE_ASR_URL,
additional_headers=headers,
ping_interval=None,
proxy=None
)
logger.info(f"[ASR] Connected to {VOLCENGINE_ASR_URL}")
# Send full_client_request with config (seq=1)
self._seq = 1
config = self._build_config()
config_bytes = gzip.compress(json.dumps(config).encode())
frame = bytearray(_build_header(FULL_CLIENT_REQUEST, POS_SEQUENCE, JSON_SERIAL, GZIP_COMPRESS))
frame.extend(struct.pack('>i', self._seq))
frame.extend(struct.pack('>I', len(config_bytes)))
frame.extend(config_bytes)
self._seq += 1
await self._ws.send(bytes(frame))
# Wait for server ack
resp = await self._ws.recv()
parsed = self._parse_response(resp)
if parsed and parsed.get("code", 0) != 0:
raise ConnectionError(f"[ASR] Server rejected config: {parsed}")
logger.info(f"[ASR] Config accepted, ready for audio")
async def send_audio(self, chunk: bytes) -> None:
"""Send an audio chunk to ASR with sequence number."""
if not self._ws:
return
compressed = gzip.compress(chunk)
frame = bytearray(_build_header(AUDIO_ONLY_REQUEST, POS_SEQUENCE, JSON_SERIAL, GZIP_COMPRESS))
frame.extend(struct.pack('>i', self._seq))
frame.extend(struct.pack('>I', len(compressed)))
frame.extend(compressed)
self._seq += 1
await self._ws.send(bytes(frame))
async def send_finish(self) -> None:
"""Send last audio frame with negative sequence to signal end."""
if not self._ws:
return
payload = gzip.compress(b'')
frame = bytearray(_build_header(AUDIO_ONLY_REQUEST, NEG_WITH_SEQUENCE, JSON_SERIAL, GZIP_COMPRESS))
frame.extend(struct.pack('>i', -self._seq))
frame.extend(struct.pack('>I', len(payload)))
frame.extend(payload)
await self._ws.send(bytes(frame))
async def receive_results(self) -> AsyncGenerator[Tuple[str, bool], None]:
"""Yield (text, is_last) tuples from ASR responses."""
if not self._ws:
return
try:
async for message in self._ws:
if isinstance(message, str):
logger.info(f"[ASR] Received text message: {message[:200]}")
continue
parsed = self._parse_response(message)
logger.info(f"[ASR] Received binary ({len(message)} bytes), parsed: {parsed}")
if parsed is None:
continue
code = parsed.get("code", 0)
if code != 0:
logger.warning(f"[ASR] Server error: {parsed}")
return
is_last = parsed.get("is_last", False)
payload_msg = parsed.get("payload_msg")
if payload_msg and isinstance(payload_msg, dict):
text = self._extract_text(payload_msg)
if text:
yield (text, is_last)
if is_last:
return
except websockets.exceptions.ConnectionClosed:
logger.info("[ASR] Connection closed")
@staticmethod
def _extract_text(payload: dict) -> str:
"""Extract recognized text from payload."""
result = payload.get("result")
if not result or not isinstance(result, dict):
return ""
# Try utterances first (show_utterances=True)
utterances = result.get("utterances", [])
if utterances:
parts = []
for utt in utterances:
text = utt.get("text", "")
if text:
parts.append(text)
return "".join(parts)
# Fallback to result.text
text = result.get("text", "")
if isinstance(text, str):
return text
return ""
def _parse_response(self, data: bytes) -> dict:
"""Parse binary ASR response into a dict."""
if len(data) < 4:
return None
msg_type = data[1] >> 4
msg_flags = data[1] & 0x0f
serial_method = data[2] >> 4
compression = data[2] & 0x0f
header_size = data[0] & 0x0f
payload = data[header_size * 4:]
result = {"code": 0, "is_last": False}
# Parse sequence and last flag
if msg_flags & 0x01: # has sequence
result["sequence"] = struct.unpack('>i', payload[:4])[0]
payload = payload[4:]
if msg_flags & 0x02: # is last package
result["is_last"] = True
if msg_type == SERVER_ERROR_RESPONSE:
result["code"] = struct.unpack('>i', payload[:4])[0]
payload_size = struct.unpack('>I', payload[4:8])[0]
payload = payload[8:]
elif msg_type == FULL_SERVER_RESPONSE:
payload_size = struct.unpack('>I', payload[:4])[0]
payload = payload[4:]
else:
return result
if not payload:
return result
if compression == GZIP_COMPRESS:
try:
payload = gzip.decompress(payload)
except Exception:
return result
if serial_method == JSON_SERIAL:
try:
result["payload_msg"] = json.loads(payload.decode('utf-8'))
except Exception:
pass
return result
async def close(self) -> None:
if self._ws:
logger.info("[ASR] Closing connection")
await self._ws.close()
self._ws = None

View File

@ -0,0 +1,158 @@
import base64
import json
import logging
import re
import uuid
import httpx
import numpy as np
from utils.settings import (
VOLCENGINE_APP_ID,
VOLCENGINE_ACCESS_KEY,
VOLCENGINE_DEFAULT_SPEAKER,
)
VOLCENGINE_TTS_URL= "https://openspeech.bytedance.com/api/v3/tts/unidirectional/sse"
logger = logging.getLogger('app')
# Regex to detect text that is only emoji/whitespace (no speakable content)
_EMOJI_ONLY_RE = re.compile(
r'^[\s\U00002600-\U000027BF\U0001F300-\U0001FAFF\U0000FE00-\U0000FE0F\U0000200D]*$'
)
def convert_pcm_s16_to_f32(pcm_data: bytes) -> bytes:
"""Convert PCM int16 audio to float32 PCM for frontend playback."""
samples = np.frombuffer(pcm_data, dtype=np.int16).astype(np.float32) / 32768.0
if len(samples) == 0:
return b''
return samples.astype(np.float32).tobytes()
class StreamingTTSClient:
"""Volcengine v3/tts/unidirectional/sse streaming TTS client."""
def __init__(self, speaker: str = ""):
self._speaker = speaker or VOLCENGINE_DEFAULT_SPEAKER
async def synthesize(self, text: str):
"""
Synthesize text to audio via SSE streaming.
Yields 24kHz float32 PCM audio chunks.
"""
if not text.strip():
return
# Skip pure emoji text
if _EMOJI_ONLY_RE.match(text):
logger.info(f"[TTS] Skipping emoji-only text: '{text}'")
return
headers = {
"X-Api-App-Id": VOLCENGINE_APP_ID,
"X-Api-Access-Key": VOLCENGINE_ACCESS_KEY,
"X-Api-Resource-Id": "seed-tts-2.0",
"Content-Type": "application/json",
"Connection": "keep-alive",
}
body = {
"user": {
"uid": str(uuid.uuid4()),
},
"req_params": {
"text": text,
"speaker": self._speaker,
"audio_params": {
"format": "pcm",
"sample_rate": 24000,
},
},
}
try:
logger.info(f"[TTS] Requesting: speaker={self._speaker}, text='{text[:50]}'")
async with httpx.AsyncClient(timeout=httpx.Timeout(30.0, read=60.0)) as client:
async with client.stream("POST", VOLCENGINE_TTS_URL, headers=headers, json=body) as response:
logger.info(f"[TTS] Response status: {response.status_code}")
if response.status_code != 200:
error_body = await response.aread()
logger.error(f"[TTS] HTTP {response.status_code}: {error_body.decode('utf-8', errors='replace')}")
return
chunk_count = 0
# Parse SSE format: lines prefixed with "event:", "data:", separated by blank lines
current_event = ""
current_data = ""
raw_logged = False
async for raw_line in response.aiter_lines():
if not raw_logged:
logger.info(f"[TTS] First SSE line: {raw_line[:200]}")
raw_logged = True
line = raw_line.strip()
if line == "":
# Blank line = end of one SSE event
if current_data:
async for audio in self._process_sse_data(current_data):
chunk_count += 1
yield audio
current_event = ""
current_data = ""
continue
if line.startswith(":"):
# SSE comment, skip
continue
if ":" in line:
field, value = line.split(":", 1)
value = value.lstrip()
if field == "event":
current_event = value
elif field == "data":
current_data += value + "\n"
# Handle remaining data without trailing blank line
if current_data:
async for audio in self._process_sse_data(current_data):
chunk_count += 1
yield audio
logger.info(f"[TTS] Stream done, yielded {chunk_count} audio chunks")
except Exception as e:
logger.error(f"[TTS] Error: {e}", exc_info=True)
async def _process_sse_data(self, data_str: str):
"""Parse SSE data field and yield audio chunks if present."""
data_str = data_str.rstrip("\n")
if not data_str:
return
try:
data = json.loads(data_str)
except json.JSONDecodeError:
logger.warning(f"[TTS] Non-JSON SSE data: {data_str[:100]}")
return
code = data.get("code", 0)
if code == 0 and data.get("data"):
# Audio data chunk
pcm_raw = base64.b64decode(data["data"])
pcm_f32 = convert_pcm_s16_to_f32(pcm_raw)
if pcm_f32:
yield pcm_f32
elif code == 20000000:
# End of stream
logger.info(f"[TTS] End signal received")
return
elif code > 0:
error_msg = data.get("message", "Unknown TTS error")
logger.error(f"[TTS] Error code={code}: {error_msg}")
return

View File

@ -0,0 +1,448 @@
import asyncio
import logging
import struct
import uuid
from typing import Optional, Callable, Awaitable
import webrtcvad
from services.streaming_asr_client import StreamingASRClient
from services.streaming_tts_client import StreamingTTSClient
from services.voice_utils import (
StreamTagFilter,
clean_markdown,
stream_v3_agent,
TTSSentenceSplitter,
)
from utils.settings import VOICE_LITE_SILENCE_TIMEOUT
logger = logging.getLogger('app')
class VoiceLiteSession:
"""Voice Lite session: ASR -> Agent -> TTS pipeline (cheaper alternative)."""
def __init__(
self,
bot_id: str,
session_id: Optional[str] = None,
user_identifier: Optional[str] = None,
on_audio: Optional[Callable[[bytes], Awaitable[None]]] = None,
on_asr_text: Optional[Callable[[str], Awaitable[None]]] = None,
on_agent_result: Optional[Callable[[str], Awaitable[None]]] = None,
on_agent_stream: Optional[Callable[[str], Awaitable[None]]] = None,
on_llm_text: Optional[Callable[[str], Awaitable[None]]] = None,
on_status: Optional[Callable[[str], Awaitable[None]]] = None,
on_error: Optional[Callable[[str], Awaitable[None]]] = None,
):
self.bot_id = bot_id
self.session_id = session_id or str(uuid.uuid4())
self.user_identifier = user_identifier or ""
self._bot_config: dict = {}
self._speaker: str = ""
# Callbacks
self._on_audio = on_audio
self._on_asr_text = on_asr_text
self._on_agent_result = on_agent_result
self._on_agent_stream = on_agent_stream
self._on_llm_text = on_llm_text
self._on_status = on_status
self._on_error = on_error
self._running = False
self._asr_client: Optional[StreamingASRClient] = None
self._asr_receive_task: Optional[asyncio.Task] = None
self._agent_task: Optional[asyncio.Task] = None
# Silence timeout tracking
self._last_asr_time: float = 0
self._silence_timer_task: Optional[asyncio.Task] = None
self._current_asr_text: str = ""
self._last_text_change_time: float = 0
self._last_changed_text: str = ""
self._last_asr_emit_time: float = 0
self._utterance_lock = asyncio.Lock()
# VAD (Voice Activity Detection) via webrtcvad
self._vad = webrtcvad.Vad(2) # aggressiveness 0-3 (2 = balanced)
self._vad_speaking = False # Whether user is currently speaking
self._vad_silence_start: float = 0 # When silence started
self._vad_finish_task: Optional[asyncio.Task] = None
self._pre_buffer: list = [] # Buffer audio before VAD triggers
self._vad_voice_streak: int = 0 # Consecutive voiced chunks count
self._vad_silence_streak: int = 0 # Consecutive silent chunks count
async def start(self) -> None:
"""Fetch bot config, mark session as running."""
from utils.fastapi_utils import fetch_bot_config_from_db
self._bot_config = await fetch_bot_config_from_db(self.bot_id, self.user_identifier)
self._speaker = self._bot_config.get("voice_speaker", "")
self._running = True
await self._emit_status("ready")
async def stop(self) -> None:
"""Gracefully stop the session."""
self._running = False
if self._vad_finish_task and not self._vad_finish_task.done():
self._vad_finish_task.cancel()
if self._silence_timer_task and not self._silence_timer_task.done():
self._silence_timer_task.cancel()
if self._agent_task and not self._agent_task.done():
from utils.cancel_manager import trigger_cancel
trigger_cancel(self.session_id)
self._agent_task.cancel()
if self._asr_receive_task and not self._asr_receive_task.done():
self._asr_receive_task.cancel()
if self._asr_client:
try:
await self._asr_client.send_finish()
except Exception:
pass
await self._asr_client.close()
# VAD configuration
VAD_SILENCE_DURATION = 1.5 # Seconds of silence before sending finish
VAD_PRE_BUFFER_SIZE = 5 # Number of audio chunks to buffer before VAD triggers
VAD_SOURCE_RATE = 24000 # Input audio sample rate
VAD_TARGET_RATE = 16000 # webrtcvad supported sample rate
VAD_FRAME_DURATION_MS = 30 # Frame duration for webrtcvad (10, 20, or 30 ms)
VAD_SPEECH_CHUNKS = 3 # Consecutive voiced chunks required to start speech
VAD_SILENCE_CHUNKS = 5 # Consecutive silent chunks required to confirm silence
_audio_chunk_count = 0
@staticmethod
def _resample_24k_to_16k(pcm_data: bytes) -> bytes:
"""Downsample 16-bit PCM from 24kHz to 16kHz (ratio 3:2).
Takes every 2 out of 3 samples (simple decimation).
"""
n_samples = len(pcm_data) // 2
if n_samples == 0:
return b''
samples = struct.unpack(f'<{n_samples}h', pcm_data[:n_samples * 2])
# Pick samples at indices 0, 1.5, 3, 4.5, ... -> floor(i * 3/2) for output index i
out_len = (n_samples * 2) // 3
resampled = []
for i in range(out_len):
src_idx = (i * 3) // 2
if src_idx < n_samples:
resampled.append(samples[src_idx])
return struct.pack(f'<{len(resampled)}h', *resampled)
def _webrtcvad_detect(self, pcm_data: bytes) -> bool:
"""Run webrtcvad on audio data. Returns True if voice is detected in any frame."""
resampled = self._resample_24k_to_16k(pcm_data)
frame_size = (self.VAD_TARGET_RATE * self.VAD_FRAME_DURATION_MS // 1000) * 2 # bytes per frame
if len(resampled) < frame_size:
return False
# Check frames; return True if any frame has voice
voice_frames = 0
total_frames = 0
for offset in range(0, len(resampled) - frame_size + 1, frame_size):
frame = resampled[offset:offset + frame_size]
total_frames += 1
try:
if self._vad.is_speech(frame, self.VAD_TARGET_RATE):
voice_frames += 1
except Exception:
pass
# Consider voice detected if at least one frame has speech
return voice_frames > 0
async def handle_audio(self, audio_data: bytes) -> None:
"""Forward user audio to ASR with VAD gating. Lazy-connect on speech start."""
if not self._running:
return
self._audio_chunk_count += 1
has_voice = self._webrtcvad_detect(audio_data)
now = asyncio.get_event_loop().time()
# Update consecutive streaks
if has_voice:
self._vad_voice_streak += 1
self._vad_silence_streak = 0
else:
self._vad_silence_streak += 1
self._vad_voice_streak = 0
if has_voice:
# Cancel any pending finish
if self._vad_finish_task and not self._vad_finish_task.done():
self._vad_finish_task.cancel()
self._vad_finish_task = None
if not self._vad_speaking and self._vad_voice_streak >= self.VAD_SPEECH_CHUNKS:
# Speech just started — connect ASR
self._vad_speaking = True
logger.info(f"[VoiceLite] VAD: speech started (webrtcvad), connecting ASR...")
try:
await self._connect_asr()
# Send buffered pre-speech audio
for buffered in self._pre_buffer:
await self._asr_client.send_audio(buffered)
self._pre_buffer.clear()
except Exception as e:
logger.error(f"[VoiceLite] VAD: ASR connect failed: {e}", exc_info=True)
self._vad_speaking = False
return
# Send current chunk
if self._asr_client:
try:
await self._asr_client.send_audio(audio_data)
except Exception:
pass
self._vad_silence_start = 0
else:
if self._vad_speaking:
# Brief silence while speaking — keep sending for ASR context
if self._asr_client:
try:
await self._asr_client.send_audio(audio_data)
except Exception:
pass
if self._vad_silence_start == 0:
self._vad_silence_start = now
# Require both consecutive silent chunks AND time threshold
if (self._vad_silence_streak >= self.VAD_SILENCE_CHUNKS
and (now - self._vad_silence_start) >= self.VAD_SILENCE_DURATION):
if not self._vad_finish_task or self._vad_finish_task.done():
self._vad_finish_task = asyncio.create_task(self._vad_send_finish())
else:
# Not speaking — buffer recent audio for pre-speech context
self._pre_buffer.append(audio_data)
if len(self._pre_buffer) > self.VAD_PRE_BUFFER_SIZE:
self._pre_buffer.pop(0)
async def _vad_send_finish(self) -> None:
"""Send finish signal to ASR after silence detected."""
logger.info(f"[VoiceLite] VAD: silence detected, sending finish to ASR")
self._vad_speaking = False
self._vad_silence_start = 0
self._vad_voice_streak = 0
self._vad_silence_streak = 0
if self._asr_client:
try:
await self._asr_client.send_finish()
except Exception as e:
logger.warning(f"[VoiceLite] VAD: send_finish failed: {e}")
async def handle_text(self, text: str) -> None:
"""Handle direct text input - bypass ASR and go straight to agent."""
if not self._running:
return
await self._interrupt_current()
self._agent_task = asyncio.create_task(self._process_utterance(text))
async def _connect_asr(self) -> None:
"""Create and connect a new ASR client, start receive loop."""
if self._asr_client:
try:
await self._asr_client.close()
except Exception:
pass
if self._asr_receive_task and not self._asr_receive_task.done():
self._asr_receive_task.cancel()
self._asr_client = StreamingASRClient(uid=self.user_identifier or "voice_lite")
await self._asr_client.connect()
logger.info(f"[VoiceLite] ASR client connected")
# Start receive loop for this ASR session
self._asr_receive_task = asyncio.create_task(self._asr_receive_loop())
async def _asr_receive_loop(self) -> None:
"""Receive ASR results from the current ASR session."""
try:
_listening_emitted = False
async for text, is_final in self._asr_client.receive_results():
if not self._running:
return
if not _listening_emitted:
await self._emit_status("listening")
_listening_emitted = True
now = asyncio.get_event_loop().time()
self._last_asr_time = now
self._current_asr_text = text
# Track text changes for stability detection
if text != self._last_changed_text:
self._last_changed_text = text
self._last_text_change_time = now
# Reset stability timer on every result
self._reset_silence_timer()
except asyncio.CancelledError:
pass
except Exception as e:
if self._running:
logger.warning(f"[VoiceLite] ASR session ended: {e}")
finally:
logger.info(f"[VoiceLite] ASR session done")
# Clean up ASR client after session ends
if self._asr_client:
try:
await self._asr_client.close()
except Exception:
pass
self._asr_client = None
def _reset_silence_timer(self) -> None:
"""Reset the silence timeout timer."""
if self._silence_timer_task and not self._silence_timer_task.done():
self._silence_timer_task.cancel()
self._silence_timer_task = asyncio.create_task(self._silence_timeout())
async def _silence_timeout(self) -> None:
"""Wait for silence timeout, then check if text has been stable."""
try:
await asyncio.sleep(VOICE_LITE_SILENCE_TIMEOUT)
if not self._running:
return
# Check if text has been stable (unchanged) for the timeout period
now = asyncio.get_event_loop().time()
if (self._current_asr_text
and (now - self._last_text_change_time) >= VOICE_LITE_SILENCE_TIMEOUT):
logger.info(f"[VoiceLite] Text stable for {VOICE_LITE_SILENCE_TIMEOUT}s, processing: '{self._current_asr_text}'")
await self._on_utterance_complete(self._current_asr_text)
except asyncio.CancelledError:
pass
async def _on_utterance_complete(self, text: str) -> None:
"""Called when a complete utterance is detected."""
if not text.strip():
return
async with self._utterance_lock:
# Cancel silence timer
if self._silence_timer_task and not self._silence_timer_task.done():
self._silence_timer_task.cancel()
# Interrupt any in-progress agent+TTS
await self._interrupt_current()
# Send final ASR text to frontend
if self._on_asr_text:
await self._on_asr_text(text)
self._current_asr_text = ""
self._last_changed_text = ""
self._agent_task = asyncio.create_task(self._process_utterance(text))
async def _interrupt_current(self) -> None:
"""Cancel current agent+TTS task if running."""
if self._agent_task and not self._agent_task.done():
logger.info(f"[VoiceLite] Interrupting previous agent task")
from utils.cancel_manager import trigger_cancel
trigger_cancel(self.session_id)
self._agent_task.cancel()
try:
await self._agent_task
except (asyncio.CancelledError, Exception):
pass
self._agent_task = None
async def _process_utterance(self, text: str) -> None:
"""Process a complete utterance: agent -> TTS pipeline."""
try:
logger.info(f"[VoiceLite] Processing utterance: '{text}'")
await self._emit_status("thinking")
accumulated_text = []
tag_filter = StreamTagFilter()
splitter = TTSSentenceSplitter()
tts_client = StreamingTTSClient(speaker=self._speaker)
speaking = False
async for chunk in stream_v3_agent(
user_text=text,
bot_id=self.bot_id,
bot_config=self._bot_config,
session_id=self.session_id,
user_identifier=self.user_identifier,
):
accumulated_text.append(chunk)
if self._on_agent_stream:
await self._on_agent_stream(chunk)
passthrough = tag_filter.feed(chunk)
if not passthrough:
if tag_filter.answer_ended:
for sentence in splitter.flush():
sentence = clean_markdown(sentence)
if sentence:
if not speaking:
await self._emit_status("speaking")
speaking = True
await self._send_tts(tts_client, sentence)
continue
# Feed raw passthrough to splitter (preserve newlines for splitting),
# apply clean_markdown on output sentences
for sentence in splitter.feed(passthrough):
sentence = clean_markdown(sentence)
if sentence:
if not speaking:
await self._emit_status("speaking")
speaking = True
await self._send_tts(tts_client, sentence)
# Handle remaining text
for sentence in splitter.flush():
sentence = clean_markdown(sentence)
if sentence:
if not speaking:
await self._emit_status("speaking")
speaking = True
await self._send_tts(tts_client, sentence)
# Log full agent result (not sent to frontend, already streamed)
full_result = "".join(accumulated_text)
logger.info(f"[VoiceLite] Agent done ({len(full_result)} chars)")
# Notify frontend that agent text stream is complete
if self._on_agent_result:
await self._on_agent_result(full_result)
await self._emit_status("idle")
except asyncio.CancelledError:
logger.info(f"[VoiceLite] Agent task cancelled (user interrupted)")
raise
except Exception as e:
logger.error(f"[VoiceLite] Error processing utterance: {e}", exc_info=True)
await self._emit_error(f"Processing failed: {str(e)}")
async def _send_tts(self, tts_client: StreamingTTSClient, sentence: str) -> None:
"""Synthesize a sentence and emit audio chunks."""
logger.info(f"[VoiceLite] TTS sentence: '{sentence[:80]}'")
async for audio_chunk in tts_client.synthesize(sentence):
if self._on_audio:
await self._on_audio(audio_chunk)
async def _emit_status(self, status: str) -> None:
if self._on_status:
await self._on_status(status)
async def _emit_error(self, message: str) -> None:
if self._on_error:
await self._on_error(message)

View File

@ -1,96 +1,14 @@
import asyncio
import json
import logging
import re
import uuid
from typing import Optional, Callable, Awaitable, AsyncGenerator
from typing import Optional, Callable, Awaitable
from services.realtime_voice_client import RealtimeDialogClient
from services.voice_utils import StreamTagFilter, clean_markdown, stream_v3_agent, SENTENCE_END_RE
logger = logging.getLogger('app')
class _StreamTagFilter:
"""
Filters streaming text based on tag blocks.
Only passes through content inside [ANSWER] blocks.
If no tags are found at all, passes through everything (fallback).
Skips content inside [TOOL_CALL], [TOOL_RESPONSE], [THINK], [SOURCE], etc.
"""
SKIP_TAGS = {"TOOL_CALL", "TOOL_RESPONSE", "THINK", "SOURCE", "REFERENCE"}
KNOWN_TAGS = {"ANSWER", "TOOL_CALL", "TOOL_RESPONSE", "THINK", "SOURCE", "REFERENCE", "PREAMBLE", "SUMMARY"}
def __init__(self):
self.state = "idle" # idle, answer, skip
self.found_any_tag = False
self._pending = "" # buffer for partial tag like "[TOO..."
self.answer_ended = False # True when ANSWER block ends (e.g. hit [TOOL_CALL])
def feed(self, chunk: str) -> str:
"""Feed a chunk, return text that should be passed to TTS."""
self.answer_ended = False
self._pending += chunk
output = []
while self._pending:
if self.state in ("idle", "answer"):
bracket_pos = self._pending.find("[")
if bracket_pos == -1:
if self.state == "answer" or not self.found_any_tag:
output.append(self._pending)
self._pending = ""
else:
before = self._pending[:bracket_pos]
if before and (self.state == "answer" or not self.found_any_tag):
output.append(before)
close_pos = self._pending.find("]", bracket_pos)
if close_pos == -1:
# Incomplete tag — wait for next chunk
self._pending = self._pending[bracket_pos:]
break
tag_name = self._pending[bracket_pos + 1:close_pos]
self._pending = self._pending[close_pos + 1:]
if tag_name not in self.KNOWN_TAGS:
if self.state == "answer" or not self.found_any_tag:
output.append(f"[{tag_name}]")
continue
self.found_any_tag = True
if tag_name == "ANSWER":
self.state = "answer"
else:
if self.state == "answer":
self.answer_ended = True
self.state = "skip"
elif self.state == "skip":
bracket_pos = self._pending.find("[")
if bracket_pos == -1:
self._pending = ""
else:
close_pos = self._pending.find("]", bracket_pos)
if close_pos == -1:
self._pending = self._pending[bracket_pos:]
break
tag_name = self._pending[bracket_pos + 1:close_pos]
self._pending = self._pending[close_pos + 1:]
if tag_name not in self.KNOWN_TAGS:
continue
if tag_name == "ANSWER":
self.state = "answer"
else:
self.state = "skip"
return "".join(output)
class VoiceSession:
"""Manages a single voice dialogue session lifecycle"""
@ -288,30 +206,6 @@ class VoiceSession:
logger.error(f"[Voice] Server error: {error_msg}")
await self._emit_error(error_msg)
# Sentence-ending punctuation pattern for splitting TTS
_SENTENCE_END_RE = re.compile(r'[。!?;\n.!?;]')
# Markdown syntax to strip before TTS
_MD_CLEAN_RE = re.compile(r'#{1,6}\s*|(?<!\w)\*{1,3}|(?<!\w)_{1,3}|\*{1,3}(?!\w)|_{1,3}(?!\w)|~~|`{1,3}|^>\s*|^\s*[-*+]\s+|^\s*\d+\.\s+|\[([^\]]*)\]\([^)]*\)|!\[([^\]]*)\]\([^)]*\)', re.MULTILINE)
@staticmethod
def _clean_markdown(text: str) -> str:
"""Strip Markdown formatting characters for TTS readability."""
# Replace links/images with their display text
text = re.sub(r'!\[([^\]]*)\]\([^)]*\)', r'\1', text)
text = re.sub(r'\[([^\]]*)\]\([^)]*\)', r'\1', text)
# Remove headings, bold, italic, strikethrough, code marks, blockquote
text = re.sub(r'#{1,6}\s*', '', text)
text = re.sub(r'\*{1,3}|_{1,3}|~~|`{1,3}', '', text)
text = re.sub(r'^>\s*', '', text, flags=re.MULTILINE)
# Remove list markers
text = re.sub(r'^\s*[-*+]\s+', '', text, flags=re.MULTILINE)
text = re.sub(r'^\s*\d+\.\s+', '', text, flags=re.MULTILINE)
# Remove horizontal rules
text = re.sub(r'^[\s]*[-*_]{3,}[\s]*$', '', text, flags=re.MULTILINE)
# Collapse extra whitespace
text = re.sub(r'\n{2,}', '\n', text)
return text.strip()
async def _on_asr_text_received(self, text: str) -> None:
"""Called when ASR text is received — stream agent output, send TTS sentence by sentence"""
if not text.strip():
@ -324,9 +218,15 @@ class VoiceSession:
accumulated_text = [] # full agent output for on_agent_result callback
sentence_buf = "" # buffer for accumulating until sentence boundary
tts_started = False # whether we've sent the first TTS chunk
tag_filter = _StreamTagFilter()
tag_filter = StreamTagFilter()
async for chunk in self._stream_v3_agent(text):
async for chunk in stream_v3_agent(
user_text=text,
bot_id=self.bot_id,
bot_config=self._bot_config,
session_id=self.session_id,
user_identifier=self.user_identifier,
):
accumulated_text.append(chunk)
if self._on_agent_stream:
@ -341,7 +241,7 @@ class VoiceSession:
flush = sentence_buf.strip()
sentence_buf = ""
if flush:
flush = self._clean_markdown(flush)
flush = clean_markdown(flush)
if flush:
if tts_started and self._tts_segment_done:
logger.info(f"[Voice] TTS segment done, closing session and waiting for delivery (answer ended)")
@ -371,7 +271,7 @@ class VoiceSession:
# Check for sentence boundaries and send complete sentences to TTS
while True:
match = self._SENTENCE_END_RE.search(sentence_buf)
match = SENTENCE_END_RE.search(sentence_buf)
if not match:
break
# Split at sentence boundary (include the punctuation)
@ -380,7 +280,7 @@ class VoiceSession:
sentence_buf = sentence_buf[end_pos:]
if sentence:
sentence = self._clean_markdown(sentence)
sentence = clean_markdown(sentence)
if sentence:
# If previous TTS segment completed (e.g. gap during tool call),
# close old session, wait for TTS delivery to finish, then restart
@ -410,7 +310,7 @@ class VoiceSession:
# Handle remaining text in buffer (last sentence without ending punctuation)
remaining = sentence_buf.strip()
if remaining:
remaining = self._clean_markdown(remaining)
remaining = clean_markdown(remaining)
if remaining:
# If previous TTS segment completed, close and wait before restart
if tts_started and self._tts_segment_done:
@ -464,70 +364,6 @@ class VoiceSession:
self._is_sending_chat_tts_text = False
await self._emit_error(f"Agent call failed: {str(e)}")
async def _stream_v3_agent(self, user_text: str) -> AsyncGenerator[str, None]:
"""Call v3 agent API in streaming mode, yield text chunks as they arrive"""
try:
from utils.api_models import ChatRequestV3, Message
from utils.fastapi_utils import (
process_messages,
create_project_directory,
)
from agent.agent_config import AgentConfig
from routes.chat import enhanced_generate_stream_response
bot_config = self._bot_config
language = bot_config.get("language", "zh")
messages_obj = [Message(role="user", content=user_text)]
request = ChatRequestV3(
messages=messages_obj,
bot_id=self.bot_id,
stream=True,
session_id=self.session_id,
user_identifier=self.user_identifier,
)
project_dir = create_project_directory(
bot_config.get("dataset_ids", []),
self.bot_id,
bot_config.get("skills", []),
)
processed_messages = process_messages(messages_obj, language)
config = await AgentConfig.from_v3_request(
request,
bot_config,
project_dir,
processed_messages,
language,
)
config.stream = True
async for sse_line in enhanced_generate_stream_response(config):
if not sse_line or not sse_line.startswith("data: "):
continue
data_str = sse_line.strip().removeprefix("data: ")
if data_str == "[DONE]":
break
try:
data = json.loads(data_str)
choices = data.get("choices", [])
if choices:
delta = choices[0].get("delta", {})
content = delta.get("content", "")
if content:
yield content
except (json.JSONDecodeError, KeyError):
continue
except asyncio.CancelledError:
logger.info(f"[Voice] v3 agent call cancelled")
raise
except Exception as e:
logger.error(f"[Voice] Error calling v3 agent: {e}", exc_info=True)
@staticmethod
def _extract_answer(agent_result: str) -> str:
"""Extract the answer portion from agent result, stripping tags like [ANSWER], [THINK] etc."""

316
services/voice_utils.py Normal file
View File

@ -0,0 +1,316 @@
import json
import re
import logging
from typing import Optional, AsyncGenerator
logger = logging.getLogger('app')
SENTENCE_END_RE = re.compile(r'[。!?;\n.!?;]')
# Emoji pattern: matches Unicode emoji without touching CJK characters
_EMOJI_RE = re.compile(
"["
"\U0001F600-\U0001F64F" # emoticons
"\U0001F300-\U0001F5FF" # symbols & pictographs
"\U0001F680-\U0001F6FF" # transport & map
"\U0001F1E0-\U0001F1FF" # flags
"\U0001F900-\U0001F9FF" # supplemental symbols
"\U0001FA00-\U0001FA6F" # chess symbols
"\U0001FA70-\U0001FAFF" # symbols extended-A
"\U00002702-\U000027B0" # dingbats
"\U00002600-\U000026FF" # misc symbols
"\U0000FE00-\U0000FE0F" # variation selectors
"\U0000200D" # zero width joiner
"\U000024C2" # Ⓜ enclosed letter
"\U00002B50\U00002B55" # star, circle
"\U000023CF\U000023E9-\U000023F3\U000023F8-\U000023FA" # media controls
"\U0001F170-\U0001F251" # enclosed alphanumeric supplement
"]+",
flags=re.UNICODE,
)
# Strong sentence-ending punctuation (excluding \n which is handled separately)
_STRONG_PUNCT_RE = re.compile(r'[。!?;.!?;~]')
# Soft punctuation (usable as split points when buffer is getting long)
_SOFT_PUNCT_RE = re.compile(r'[,:、)) \t]')
class TTSSentenceSplitter:
"""
Intelligent sentence splitter for TTS streaming.
Rules (in priority order):
1. Split on newlines unconditionally (LLM paragraph boundaries)
2. Split on strong punctuation ( etc.) only if accumulated >= MIN_LENGTH
3. If buffer reaches SOFT_THRESHOLD, also split on soft punctuation (etc.)
4. If buffer reaches MAX_LENGTH, force split at best available position
- Strip emoji from output (TTS cannot pronounce them)
- On flush(), return any remaining text regardless of length
"""
MIN_LENGTH = 10 # Don't send sentences shorter than this
SOFT_THRESHOLD = 30 # Start considering soft punctuation splits
MAX_LENGTH = 80 # Force split even without punctuation
def __init__(self):
self._buf = ""
def _clean_for_tts(self, text: str) -> str:
"""Remove emoji and collapse whitespace."""
text = _EMOJI_RE.sub("", text)
text = re.sub(r'[ \t]+', ' ', text)
return text.strip()
def feed(self, chunk: str) -> list[str]:
"""Feed a text chunk, return list of ready sentences (may be empty)."""
self._buf += chunk
results = []
while self._buf:
buf_len = len(self._buf)
# 0. Newline split — highest priority
nl_pos = self._buf.find('\n')
if nl_pos >= 0:
before = self._buf[:nl_pos]
rest = self._buf[nl_pos:].lstrip('\n')
cleaned = self._clean_for_tts(before)
if len(cleaned) >= self.MIN_LENGTH:
# Long enough, emit as a sentence
self._buf = rest
results.append(cleaned)
continue
elif not rest:
# No more text after newline, keep buffer and wait
break
else:
# Too short — merge with next paragraph
self._buf = before + rest
continue
# 1. Try strong punctuation split — scan for the best split point
best_end = -1
for match in _STRONG_PUNCT_RE.finditer(self._buf):
end_pos = match.end()
candidate = self._buf[:end_pos]
if len(candidate.strip()) >= self.MIN_LENGTH:
best_end = end_pos
break # Take the first valid (long enough) split
# Short segment before this punct — skip and keep scanning
if best_end > 0:
sentence = self._clean_for_tts(self._buf[:best_end])
self._buf = self._buf[best_end:]
if sentence:
results.append(sentence)
continue
# 2. Buffer getting long: try soft punctuation split
if buf_len >= self.SOFT_THRESHOLD:
best_soft = -1
for m in _SOFT_PUNCT_RE.finditer(self._buf):
pos = m.end()
if pos >= self.MIN_LENGTH:
best_soft = pos
if pos >= self.SOFT_THRESHOLD:
break
if best_soft >= self.MIN_LENGTH:
sentence = self._clean_for_tts(self._buf[:best_soft])
self._buf = self._buf[best_soft:]
if sentence:
results.append(sentence)
continue
# 3. Buffer too long: force split at MAX_LENGTH
if buf_len >= self.MAX_LENGTH:
split_at = self.MAX_LENGTH
search_region = self._buf[self.MIN_LENGTH:self.MAX_LENGTH]
last_space = max(search_region.rfind(' '), search_region.rfind(''),
search_region.rfind(','), search_region.rfind(''))
if last_space >= 0:
split_at = self.MIN_LENGTH + last_space + 1
sentence = self._clean_for_tts(self._buf[:split_at])
self._buf = self._buf[split_at:]
if sentence:
results.append(sentence)
continue
# Not enough text yet, wait for more
break
return results
def flush(self) -> list[str]:
"""Flush remaining buffer. Call at end of stream."""
results = []
if self._buf.strip():
sentence = self._clean_for_tts(self._buf)
if sentence:
results.append(sentence)
self._buf = ""
return results
class StreamTagFilter:
"""
Filters streaming text based on tag blocks.
Only passes through content inside [ANSWER] blocks.
If no tags are found at all, passes through everything (fallback).
Skips content inside [TOOL_CALL], [TOOL_RESPONSE], [THINK], [SOURCE], etc.
"""
SKIP_TAGS = {"TOOL_CALL", "TOOL_RESPONSE", "THINK", "SOURCE", "REFERENCE"}
KNOWN_TAGS = {"ANSWER", "TOOL_CALL", "TOOL_RESPONSE", "THINK", "SOURCE", "REFERENCE", "PREAMBLE", "SUMMARY"}
def __init__(self):
self.state = "idle" # idle, answer, skip
self.found_any_tag = False
self._pending = ""
self.answer_ended = False
def feed(self, chunk: str) -> str:
"""Feed a chunk, return text that should be passed to TTS."""
self.answer_ended = False
self._pending += chunk
output = []
while self._pending:
if self.state in ("idle", "answer"):
bracket_pos = self._pending.find("[")
if bracket_pos == -1:
if self.state == "answer" or not self.found_any_tag:
output.append(self._pending)
self._pending = ""
else:
before = self._pending[:bracket_pos]
if before and (self.state == "answer" or not self.found_any_tag):
output.append(before)
close_pos = self._pending.find("]", bracket_pos)
if close_pos == -1:
self._pending = self._pending[bracket_pos:]
break
tag_name = self._pending[bracket_pos + 1:close_pos]
self._pending = self._pending[close_pos + 1:]
if tag_name not in self.KNOWN_TAGS:
if self.state == "answer" or not self.found_any_tag:
output.append(f"[{tag_name}]")
continue
self.found_any_tag = True
if tag_name == "ANSWER":
self.state = "answer"
else:
if self.state == "answer":
self.answer_ended = True
self.state = "skip"
elif self.state == "skip":
bracket_pos = self._pending.find("[")
if bracket_pos == -1:
self._pending = ""
else:
close_pos = self._pending.find("]", bracket_pos)
if close_pos == -1:
self._pending = self._pending[bracket_pos:]
break
tag_name = self._pending[bracket_pos + 1:close_pos]
self._pending = self._pending[close_pos + 1:]
if tag_name not in self.KNOWN_TAGS:
continue
if tag_name == "ANSWER":
self.state = "answer"
else:
self.state = "skip"
return "".join(output)
def clean_markdown(text: str) -> str:
"""Strip Markdown formatting characters for TTS readability."""
text = re.sub(r'!\[([^\]]*)\]\([^)]*\)', r'\1', text)
text = re.sub(r'\[([^\]]*)\]\([^)]*\)', r'\1', text)
text = re.sub(r'#{1,6}\s*', '', text)
text = re.sub(r'\*{1,3}|_{1,3}|~~|`{1,3}', '', text)
text = re.sub(r'^>\s*', '', text, flags=re.MULTILINE)
text = re.sub(r'^\s*[-*+]\s+', '', text, flags=re.MULTILINE)
text = re.sub(r'^\s*\d+\.\s+', '', text, flags=re.MULTILINE)
text = re.sub(r'^[\s]*[-*_]{3,}[\s]*$', '', text, flags=re.MULTILINE)
text = re.sub(r'\n{2,}', '\n', text)
return text.strip()
async def stream_v3_agent(
user_text: str,
bot_id: str,
bot_config: dict,
session_id: str,
user_identifier: str,
) -> AsyncGenerator[str, None]:
"""Call v3 agent API in streaming mode, yield text chunks as they arrive."""
import asyncio
try:
from utils.api_models import ChatRequestV3, Message
from utils.fastapi_utils import (
process_messages,
create_project_directory,
)
from agent.agent_config import AgentConfig
from routes.chat import enhanced_generate_stream_response
language = bot_config.get("language", "zh")
messages_obj = [Message(role="user", content=user_text)]
request = ChatRequestV3(
messages=messages_obj,
bot_id=bot_id,
stream=True,
session_id=session_id,
user_identifier=user_identifier,
)
project_dir = create_project_directory(
bot_config.get("dataset_ids", []),
bot_id,
bot_config.get("skills", []),
)
processed_messages = process_messages(messages_obj, language)
config = await AgentConfig.from_v3_request(
request,
bot_config,
project_dir,
processed_messages,
language,
)
config.stream = True
async for sse_line in enhanced_generate_stream_response(config):
if not sse_line or not sse_line.startswith("data: "):
continue
data_str = sse_line.strip().removeprefix("data: ")
if data_str == "[DONE]":
break
try:
data = json.loads(data_str)
choices = data.get("choices", [])
if choices:
delta = choices[0].get("delta", {})
content = delta.get("content", "")
if content:
yield content
except (json.JSONDecodeError, KeyError):
continue
except asyncio.CancelledError:
logger.info(f"[Voice] v3 agent call cancelled")
raise
except Exception as e:
logger.error(f"[Voice] Error calling v3 agent: {e}", exc_info=True)

View File

@ -121,19 +121,20 @@ NEW_API_ADMIN_KEY = os.getenv("NEW_API_ADMIN_KEY", "")
# ============================================================
# Volcengine Realtime Dialogue Configuration
# ============================================================
VOLCENGINE_REALTIME_URL = os.getenv(
"VOLCENGINE_REALTIME_URL",
"wss://openspeech.bytedance.com/api/v3/realtime/dialogue"
)
VOLCENGINE_APP_ID = os.getenv("VOLCENGINE_APP_ID", "8718217928")
VOLCENGINE_ACCESS_KEY = os.getenv("VOLCENGINE_ACCESS_KEY", "ynJMX-5ix1FsJvswC9KTNlGUdubcchqc")
VOLCENGINE_RESOURCE_ID = os.getenv("VOLCENGINE_RESOURCE_ID", "volc.speech.dialog")
VOLCENGINE_APP_KEY = os.getenv("VOLCENGINE_APP_KEY", "PlgvMymc7f3tQnJ6")
VOLCENGINE_APP_ID = os.getenv("VOLCENGINE_APP_ID", "2511880162")
VOLCENGINE_ACCESS_KEY = os.getenv("VOLCENGINE_ACCESS_KEY", "pjLbaqR1lHFfkv1xcJAYnvKV0HAvsBvt")
VOLCENGINE_DEFAULT_SPEAKER = os.getenv(
"VOLCENGINE_DEFAULT_SPEAKER", "zh_male_yunzhou_jupiter_bigtts"
"VOLCENGINE_DEFAULT_SPEAKER", "zh_female_xiaohe_uranus_bigtts"
)
VOLCENGINE_TTS_SAMPLE_RATE = int(os.getenv("VOLCENGINE_TTS_SAMPLE_RATE", "24000"))
# ============================================================
# Voice Lite Configuration (ASR + Agent + TTS pipeline)
# ============================================================
VOICE_DEFAULT_MODE = os.getenv("VOICE_DEFAULT_MODE", "lite") # "realtime" | "lite"
# Silence timeout (seconds) - ASR considers user done speaking after this
VOICE_LITE_SILENCE_TIMEOUT = float(os.getenv("VOICE_LITE_SILENCE_TIMEOUT", "1.5"))
# ============================================================
# Single Agent Mode Configuration
# ============================================================