Compare commits

...

5 Commits

Author SHA1 Message Date
朱潮
2d2e1dbcdf vad 2026-03-22 00:52:11 +08:00
朱潮
7a547322e3 语义分割 2026-03-22 00:42:57 +08:00
朱潮
f9e9c3c26d 默认音色 2026-03-21 23:56:58 +08:00
朱潮
d68a87dba8 支持语音合成和语音识别api 2026-03-21 23:51:02 +08:00
朱潮
99273a91d3 支持语音合成和语音识别api 2026-03-21 23:50:51 +08:00
11 changed files with 1238 additions and 210 deletions

36
poetry.lock generated
View File

@ -5302,25 +5302,19 @@ train = ["accelerate (>=0.20.3)", "datasets"]
[[package]] [[package]]
name = "setuptools" name = "setuptools"
version = "82.0.1" version = "70.3.0"
description = "Most extensible Python build backend with support for C/C++ extension modules" description = "Easily download, build, install, upgrade, and uninstall Python packages"
optional = false optional = false
python-versions = ">=3.9" python-versions = ">=3.8"
groups = ["main"] groups = ["main"]
markers = "python_version >= \"3.13\""
files = [ files = [
{file = "setuptools-82.0.1-py3-none-any.whl", hash = "sha256:a59e362652f08dcd477c78bb6e7bd9d80a7995bc73ce773050228a348ce2e5bb"}, {file = "setuptools-70.3.0-py3-none-any.whl", hash = "sha256:fe384da74336c398e0d956d1cae0669bc02eed936cdb1d49b57de1990dc11ffc"},
{file = "setuptools-82.0.1.tar.gz", hash = "sha256:7d872682c5d01cfde07da7bccc7b65469d3dca203318515ada1de5eda35efbf9"}, {file = "setuptools-70.3.0.tar.gz", hash = "sha256:f171bab1dfbc86b132997f26a119f6056a57950d058587841a0082e8830f9dc5"},
] ]
[package.extras] [package.extras]
check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1) ; sys_platform != \"cygwin\"", "ruff (>=0.13.0) ; sys_platform != \"cygwin\""] doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "pygments-github-lexers (==0.0.5)", "pyproject-hooks (!=1.1)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-favicon", "sphinx-inline-tabs", "sphinx-lint", "sphinx-notfound-page (>=1,<2)", "sphinx-reredirects", "sphinxcontrib-towncrier"]
core = ["importlib_metadata (>=6) ; python_version < \"3.10\"", "jaraco.functools (>=4)", "jaraco.text (>=3.7)", "more_itertools", "more_itertools (>=8.8)", "packaging (>=24.2)", "tomli (>=2.0.1) ; python_version < \"3.11\"", "wheel (>=0.43.0)"] test = ["build[virtualenv] (>=1.0.3)", "filelock (>=3.4.0)", "importlib-metadata", "ini2toml[lite] (>=0.14)", "jaraco.develop (>=7.21) ; python_version >= \"3.9\" and sys_platform != \"cygwin\"", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "jaraco.test", "mypy (==1.10.0)", "packaging (>=23.2)", "pip (>=19.1)", "pyproject-hooks (!=1.1)", "pytest (>=6,!=8.1.*)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-home (>=0.5)", "pytest-mypy", "pytest-perf ; sys_platform != \"cygwin\"", "pytest-ruff (>=0.3.2) ; sys_platform != \"cygwin\"", "pytest-subprocess", "pytest-timeout", "pytest-xdist (>=3)", "tomli", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel"]
cover = ["pytest-cov"]
doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "pygments-github-lexers (==0.0.5)", "pyproject-hooks (!=1.1)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-favicon", "sphinx-inline-tabs", "sphinx-lint", "sphinx-notfound-page (>=1,<2)", "sphinx-reredirects", "sphinxcontrib-towncrier", "towncrier (<24.7)"]
enabler = ["pytest-enabler (>=2.2)"]
test = ["build[virtualenv] (>=1.0.3)", "filelock (>=3.4.0)", "ini2toml[lite] (>=0.14)", "jaraco.develop (>=7.21) ; python_version >= \"3.9\" and sys_platform != \"cygwin\"", "jaraco.envs (>=2.2)", "jaraco.path (>=3.7.2)", "jaraco.test (>=5.5)", "packaging (>=24.2)", "pip (>=19.1)", "pyproject-hooks (!=1.1)", "pytest (>=6,!=8.1.*)", "pytest-home (>=0.5)", "pytest-perf ; sys_platform != \"cygwin\"", "pytest-subprocess", "pytest-timeout", "pytest-xdist (>=3)", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel (>=0.44.0)"]
type = ["importlib_metadata (>=7.0.2) ; python_version < \"3.10\"", "jaraco.develop (>=7.21) ; sys_platform != \"cygwin\"", "mypy (==1.18.*)", "pytest-mypy"]
[[package]] [[package]]
name = "shellingham" name = "shellingham"
@ -6347,6 +6341,20 @@ files = [
{file = "wcwidth-0.6.0.tar.gz", hash = "sha256:cdc4e4262d6ef9a1a57e018384cbeb1208d8abbc64176027e2c2455c81313159"}, {file = "wcwidth-0.6.0.tar.gz", hash = "sha256:cdc4e4262d6ef9a1a57e018384cbeb1208d8abbc64176027e2c2455c81313159"},
] ]
[[package]]
name = "webrtcvad"
version = "2.0.10"
description = "Python interface to the Google WebRTC Voice Activity Detector (VAD)"
optional = false
python-versions = "*"
groups = ["main"]
files = [
{file = "webrtcvad-2.0.10.tar.gz", hash = "sha256:f1bed2fb25b63fb7b1a55d64090c993c9c9167b28485ae0bcdd81cf6ede96aea"},
]
[package.extras]
dev = ["check-manifest", "memory_profiler", "nose", "psutil", "unittest2", "zest.releaser"]
[[package]] [[package]]
name = "websockets" name = "websockets"
version = "15.0.1" version = "15.0.1"
@ -6983,4 +6991,4 @@ cffi = ["cffi (>=1.17,<2.0) ; platform_python_implementation != \"PyPy\" and pyt
[metadata] [metadata]
lock-version = "2.1" lock-version = "2.1"
python-versions = ">=3.12,<3.15" python-versions = ">=3.12,<3.15"
content-hash = "1461514ed1f9639f41f43ebb28f2a3fcd2d5a5dde954cd509c0ea7bf181e9bb6" content-hash = "c9c4f80cdbf7d6bce20f65f40b9adce05c5f4a830299de148fcd8482937bddb0"

View File

@ -39,7 +39,9 @@ dependencies = [
"ragflow-sdk (>=0.23.0,<0.24.0)", "ragflow-sdk (>=0.23.0,<0.24.0)",
"httpx (>=0.28.1,<0.29.0)", "httpx (>=0.28.1,<0.29.0)",
"wsgidav (>=4.3.3,<5.0.0)", "wsgidav (>=4.3.3,<5.0.0)",
"websockets (>=15.0.0,<16.0.0)" "websockets (>=15.0.0,<16.0.0)",
"setuptools (<71)",
"webrtcvad (>=2.0.10,<3.0.0)",
] ]
[tool.poetry.requires-plugins] [tool.poetry.requires-plugins]

View File

@ -165,7 +165,7 @@ safetensors==0.7.0 ; python_version >= "3.12" and python_version < "3.15"
scikit-learn==1.8.0 ; python_version >= "3.12" and python_version < "3.15" scikit-learn==1.8.0 ; python_version >= "3.12" and python_version < "3.15"
scipy==1.17.1 ; python_version >= "3.12" and python_version < "3.15" scipy==1.17.1 ; python_version >= "3.12" and python_version < "3.15"
sentence-transformers==3.4.1 ; python_version >= "3.12" and python_version < "3.15" sentence-transformers==3.4.1 ; python_version >= "3.12" and python_version < "3.15"
setuptools==82.0.1 ; python_version >= "3.13" and python_version < "3.15" setuptools==70.3.0 ; python_version >= "3.12" and python_version < "3.15"
shellingham==1.5.4 ; python_version >= "3.12" and python_version < "3.15" shellingham==1.5.4 ; python_version >= "3.12" and python_version < "3.15"
six==1.17.0 ; python_version >= "3.12" and python_version < "3.15" six==1.17.0 ; python_version >= "3.12" and python_version < "3.15"
sniffio==1.3.1 ; python_version >= "3.12" and python_version < "3.15" sniffio==1.3.1 ; python_version >= "3.12" and python_version < "3.15"
@ -203,6 +203,7 @@ uvloop==0.22.1 ; python_version >= "3.12" and python_version < "3.15"
watchfiles==1.1.1 ; python_version >= "3.12" and python_version < "3.15" watchfiles==1.1.1 ; python_version >= "3.12" and python_version < "3.15"
wcmatch==10.1 ; python_version >= "3.12" and python_version < "3.15" wcmatch==10.1 ; python_version >= "3.12" and python_version < "3.15"
wcwidth==0.6.0 ; python_version >= "3.12" and python_version < "3.15" wcwidth==0.6.0 ; python_version >= "3.12" and python_version < "3.15"
webrtcvad==2.0.10 ; python_version >= "3.12" and python_version < "3.15"
websockets==15.0.1 ; python_version >= "3.12" and python_version < "3.15" websockets==15.0.1 ; python_version >= "3.12" and python_version < "3.15"
wrapt==1.17.3 ; python_version >= "3.12" and python_version < "3.15" wrapt==1.17.3 ; python_version >= "3.12" and python_version < "3.15"
wsgidav==4.3.3 ; python_version >= "3.12" and python_version < "3.15" wsgidav==4.3.3 ; python_version >= "3.12" and python_version < "3.15"

View File

@ -7,6 +7,7 @@ from typing import Optional
from fastapi import APIRouter, WebSocket, WebSocketDisconnect from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from services.voice_session_manager import VoiceSession from services.voice_session_manager import VoiceSession
from utils.settings import VOICE_DEFAULT_MODE
logger = logging.getLogger('app') logger = logging.getLogger('app')
@ -35,7 +36,7 @@ async def voice_realtime(websocket: WebSocket):
""" """
await websocket.accept() await websocket.accept()
session: Optional[VoiceSession] = None session = None
async def send_json(data: dict): async def send_json(data: dict):
try: try:
@ -91,7 +92,9 @@ async def voice_realtime(websocket: WebSocket):
await send_json({"type": "error", "message": "bot_id is required"}) await send_json({"type": "error", "message": "bot_id is required"})
continue continue
session = VoiceSession( voice_mode = msg.get("voice_mode") or VOICE_DEFAULT_MODE
session_kwargs = dict(
bot_id=bot_id, bot_id=bot_id,
session_id=msg.get("session_id"), session_id=msg.get("session_id"),
user_identifier=msg.get("user_identifier"), user_identifier=msg.get("user_identifier"),
@ -104,6 +107,13 @@ async def voice_realtime(websocket: WebSocket):
on_error=on_error, on_error=on_error,
) )
if voice_mode == "lite":
from services.voice_lite_session import VoiceLiteSession
session = VoiceLiteSession(**session_kwargs)
logger.info(f"[Voice] Using lite mode for bot_id={bot_id}")
else:
session = VoiceSession(**session_kwargs)
try: try:
await session.start() await session.start()
except Exception as e: except Exception as e:

View File

@ -8,15 +8,13 @@ import websockets
from services import realtime_voice_protocol as protocol from services import realtime_voice_protocol as protocol
from utils.settings import ( from utils.settings import (
VOLCENGINE_REALTIME_URL,
VOLCENGINE_APP_ID, VOLCENGINE_APP_ID,
VOLCENGINE_ACCESS_KEY, VOLCENGINE_ACCESS_KEY,
VOLCENGINE_RESOURCE_ID,
VOLCENGINE_APP_KEY,
VOLCENGINE_DEFAULT_SPEAKER, VOLCENGINE_DEFAULT_SPEAKER,
VOLCENGINE_TTS_SAMPLE_RATE, VOLCENGINE_TTS_SAMPLE_RATE,
) )
VOLCENGINE_REALTIME_URL = "wss://openspeech.bytedance.com/api/v3/realtime/dialogue"
logger = logging.getLogger('app') logger = logging.getLogger('app')
@ -48,8 +46,8 @@ class RealtimeDialogClient:
return { return {
"X-Api-App-ID": VOLCENGINE_APP_ID, "X-Api-App-ID": VOLCENGINE_APP_ID,
"X-Api-Access-Key": VOLCENGINE_ACCESS_KEY, "X-Api-Access-Key": VOLCENGINE_ACCESS_KEY,
"X-Api-Resource-Id": VOLCENGINE_RESOURCE_ID, "X-Api-Resource-Id": "volc.speech.dialog",
"X-Api-App-Key": VOLCENGINE_APP_KEY, "X-Api-App-Key": "PlgvMymc7f3tQnJ6",
"X-Api-Connect-Id": self._connect_id, "X-Api-Connect-Id": self._connect_id,
} }

View File

@ -0,0 +1,250 @@
import gzip
import json
import struct
import uuid
import logging
from typing import AsyncGenerator, Tuple
import websockets
from utils.settings import (
VOLCENGINE_ACCESS_KEY,
VOLCENGINE_APP_ID,
)
VOLCENGINE_ASR_URL = "wss://openspeech.bytedance.com/api/v3/sauc/bigmodel_async"
logger = logging.getLogger('app')
# Protocol constants (v3/sauc)
PROTOCOL_VERSION = 0b0001
HEADER_SIZE = 0b0001
# Message types
FULL_CLIENT_REQUEST = 0b0001
AUDIO_ONLY_REQUEST = 0b0010
FULL_SERVER_RESPONSE = 0b1001
SERVER_ERROR_RESPONSE = 0b1111
# Flags
POS_SEQUENCE = 0b0001
NEG_SEQUENCE = 0b0010
NEG_WITH_SEQUENCE = 0b0011
# Serialization / Compression
JSON_SERIAL = 0b0001
GZIP_COMPRESS = 0b0001
def _build_header(msg_type: int, flags: int = POS_SEQUENCE,
serial: int = JSON_SERIAL, compress: int = GZIP_COMPRESS) -> bytearray:
header = bytearray(4)
header[0] = (PROTOCOL_VERSION << 4) | HEADER_SIZE
header[1] = (msg_type << 4) | flags
header[2] = (serial << 4) | compress
header[3] = 0x00
return header
class StreamingASRClient:
"""Volcengine v3/sauc/bigmodel streaming ASR client."""
def __init__(self, uid: str = "voice_lite"):
self._uid = uid
self._ws = None
self._seq = 1
def _build_config(self) -> dict:
return {
"user": {
"uid": self._uid,
},
"audio": {
"format": "pcm",
"codec": "raw",
"rate": 24000,
"bits": 16,
"channel": 1,
},
"request": {
"model_name": "bigmodel",
"enable_itn": True,
"enable_punc": True,
"enable_ddc": True,
"show_utterances": True,
"enable_nonstream": False,
},
}
def _build_auth_headers(self) -> dict:
return {
"X-Api-Resource-Id": "volc.seedasr.sauc.duration",
"X-Api-Connect-Id": str(uuid.uuid4()),
"X-Api-Access-Key": VOLCENGINE_ACCESS_KEY,
"X-Api-App-Key": VOLCENGINE_APP_ID,
}
async def connect(self) -> None:
"""Connect to ASR WebSocket and send initial full_client_request."""
headers = self._build_auth_headers()
logger.info(f"[ASR] Connecting to {VOLCENGINE_ASR_URL} with headers: {headers}")
self._ws = await websockets.connect(
VOLCENGINE_ASR_URL,
additional_headers=headers,
ping_interval=None,
proxy=None
)
logger.info(f"[ASR] Connected to {VOLCENGINE_ASR_URL}")
# Send full_client_request with config (seq=1)
self._seq = 1
config = self._build_config()
config_bytes = gzip.compress(json.dumps(config).encode())
frame = bytearray(_build_header(FULL_CLIENT_REQUEST, POS_SEQUENCE, JSON_SERIAL, GZIP_COMPRESS))
frame.extend(struct.pack('>i', self._seq))
frame.extend(struct.pack('>I', len(config_bytes)))
frame.extend(config_bytes)
self._seq += 1
await self._ws.send(bytes(frame))
# Wait for server ack
resp = await self._ws.recv()
parsed = self._parse_response(resp)
if parsed and parsed.get("code", 0) != 0:
raise ConnectionError(f"[ASR] Server rejected config: {parsed}")
logger.info(f"[ASR] Config accepted, ready for audio")
async def send_audio(self, chunk: bytes) -> None:
"""Send an audio chunk to ASR with sequence number."""
if not self._ws:
return
compressed = gzip.compress(chunk)
frame = bytearray(_build_header(AUDIO_ONLY_REQUEST, POS_SEQUENCE, JSON_SERIAL, GZIP_COMPRESS))
frame.extend(struct.pack('>i', self._seq))
frame.extend(struct.pack('>I', len(compressed)))
frame.extend(compressed)
self._seq += 1
await self._ws.send(bytes(frame))
async def send_finish(self) -> None:
"""Send last audio frame with negative sequence to signal end."""
if not self._ws:
return
payload = gzip.compress(b'')
frame = bytearray(_build_header(AUDIO_ONLY_REQUEST, NEG_WITH_SEQUENCE, JSON_SERIAL, GZIP_COMPRESS))
frame.extend(struct.pack('>i', -self._seq))
frame.extend(struct.pack('>I', len(payload)))
frame.extend(payload)
await self._ws.send(bytes(frame))
async def receive_results(self) -> AsyncGenerator[Tuple[str, bool], None]:
"""Yield (text, is_last) tuples from ASR responses."""
if not self._ws:
return
try:
async for message in self._ws:
if isinstance(message, str):
logger.info(f"[ASR] Received text message: {message[:200]}")
continue
parsed = self._parse_response(message)
logger.info(f"[ASR] Received binary ({len(message)} bytes), parsed: {parsed}")
if parsed is None:
continue
code = parsed.get("code", 0)
if code != 0:
logger.warning(f"[ASR] Server error: {parsed}")
return
is_last = parsed.get("is_last", False)
payload_msg = parsed.get("payload_msg")
if payload_msg and isinstance(payload_msg, dict):
text = self._extract_text(payload_msg)
if text:
yield (text, is_last)
if is_last:
return
except websockets.exceptions.ConnectionClosed:
logger.info("[ASR] Connection closed")
@staticmethod
def _extract_text(payload: dict) -> str:
"""Extract recognized text from payload."""
result = payload.get("result")
if not result or not isinstance(result, dict):
return ""
# Try utterances first (show_utterances=True)
utterances = result.get("utterances", [])
if utterances:
parts = []
for utt in utterances:
text = utt.get("text", "")
if text:
parts.append(text)
return "".join(parts)
# Fallback to result.text
text = result.get("text", "")
if isinstance(text, str):
return text
return ""
def _parse_response(self, data: bytes) -> dict:
"""Parse binary ASR response into a dict."""
if len(data) < 4:
return None
msg_type = data[1] >> 4
msg_flags = data[1] & 0x0f
serial_method = data[2] >> 4
compression = data[2] & 0x0f
header_size = data[0] & 0x0f
payload = data[header_size * 4:]
result = {"code": 0, "is_last": False}
# Parse sequence and last flag
if msg_flags & 0x01: # has sequence
result["sequence"] = struct.unpack('>i', payload[:4])[0]
payload = payload[4:]
if msg_flags & 0x02: # is last package
result["is_last"] = True
if msg_type == SERVER_ERROR_RESPONSE:
result["code"] = struct.unpack('>i', payload[:4])[0]
payload_size = struct.unpack('>I', payload[4:8])[0]
payload = payload[8:]
elif msg_type == FULL_SERVER_RESPONSE:
payload_size = struct.unpack('>I', payload[:4])[0]
payload = payload[4:]
else:
return result
if not payload:
return result
if compression == GZIP_COMPRESS:
try:
payload = gzip.decompress(payload)
except Exception:
return result
if serial_method == JSON_SERIAL:
try:
result["payload_msg"] = json.loads(payload.decode('utf-8'))
except Exception:
pass
return result
async def close(self) -> None:
if self._ws:
logger.info("[ASR] Closing connection")
await self._ws.close()
self._ws = None

View File

@ -0,0 +1,158 @@
import base64
import json
import logging
import re
import uuid
import httpx
import numpy as np
from utils.settings import (
VOLCENGINE_APP_ID,
VOLCENGINE_ACCESS_KEY,
VOLCENGINE_DEFAULT_SPEAKER,
)
VOLCENGINE_TTS_URL= "https://openspeech.bytedance.com/api/v3/tts/unidirectional/sse"
logger = logging.getLogger('app')
# Regex to detect text that is only emoji/whitespace (no speakable content)
_EMOJI_ONLY_RE = re.compile(
r'^[\s\U00002600-\U000027BF\U0001F300-\U0001FAFF\U0000FE00-\U0000FE0F\U0000200D]*$'
)
def convert_pcm_s16_to_f32(pcm_data: bytes) -> bytes:
"""Convert PCM int16 audio to float32 PCM for frontend playback."""
samples = np.frombuffer(pcm_data, dtype=np.int16).astype(np.float32) / 32768.0
if len(samples) == 0:
return b''
return samples.astype(np.float32).tobytes()
class StreamingTTSClient:
"""Volcengine v3/tts/unidirectional/sse streaming TTS client."""
def __init__(self, speaker: str = ""):
self._speaker = speaker or VOLCENGINE_DEFAULT_SPEAKER
async def synthesize(self, text: str):
"""
Synthesize text to audio via SSE streaming.
Yields 24kHz float32 PCM audio chunks.
"""
if not text.strip():
return
# Skip pure emoji text
if _EMOJI_ONLY_RE.match(text):
logger.info(f"[TTS] Skipping emoji-only text: '{text}'")
return
headers = {
"X-Api-App-Id": VOLCENGINE_APP_ID,
"X-Api-Access-Key": VOLCENGINE_ACCESS_KEY,
"X-Api-Resource-Id": "seed-tts-2.0",
"Content-Type": "application/json",
"Connection": "keep-alive",
}
body = {
"user": {
"uid": str(uuid.uuid4()),
},
"req_params": {
"text": text,
"speaker": self._speaker,
"audio_params": {
"format": "pcm",
"sample_rate": 24000,
},
},
}
try:
logger.info(f"[TTS] Requesting: speaker={self._speaker}, text='{text[:50]}'")
async with httpx.AsyncClient(timeout=httpx.Timeout(30.0, read=60.0)) as client:
async with client.stream("POST", VOLCENGINE_TTS_URL, headers=headers, json=body) as response:
logger.info(f"[TTS] Response status: {response.status_code}")
if response.status_code != 200:
error_body = await response.aread()
logger.error(f"[TTS] HTTP {response.status_code}: {error_body.decode('utf-8', errors='replace')}")
return
chunk_count = 0
# Parse SSE format: lines prefixed with "event:", "data:", separated by blank lines
current_event = ""
current_data = ""
raw_logged = False
async for raw_line in response.aiter_lines():
if not raw_logged:
logger.info(f"[TTS] First SSE line: {raw_line[:200]}")
raw_logged = True
line = raw_line.strip()
if line == "":
# Blank line = end of one SSE event
if current_data:
async for audio in self._process_sse_data(current_data):
chunk_count += 1
yield audio
current_event = ""
current_data = ""
continue
if line.startswith(":"):
# SSE comment, skip
continue
if ":" in line:
field, value = line.split(":", 1)
value = value.lstrip()
if field == "event":
current_event = value
elif field == "data":
current_data += value + "\n"
# Handle remaining data without trailing blank line
if current_data:
async for audio in self._process_sse_data(current_data):
chunk_count += 1
yield audio
logger.info(f"[TTS] Stream done, yielded {chunk_count} audio chunks")
except Exception as e:
logger.error(f"[TTS] Error: {e}", exc_info=True)
async def _process_sse_data(self, data_str: str):
"""Parse SSE data field and yield audio chunks if present."""
data_str = data_str.rstrip("\n")
if not data_str:
return
try:
data = json.loads(data_str)
except json.JSONDecodeError:
logger.warning(f"[TTS] Non-JSON SSE data: {data_str[:100]}")
return
code = data.get("code", 0)
if code == 0 and data.get("data"):
# Audio data chunk
pcm_raw = base64.b64decode(data["data"])
pcm_f32 = convert_pcm_s16_to_f32(pcm_raw)
if pcm_f32:
yield pcm_f32
elif code == 20000000:
# End of stream
logger.info(f"[TTS] End signal received")
return
elif code > 0:
error_msg = data.get("message", "Unknown TTS error")
logger.error(f"[TTS] Error code={code}: {error_msg}")
return

View File

@ -0,0 +1,448 @@
import asyncio
import logging
import struct
import uuid
from typing import Optional, Callable, Awaitable
import webrtcvad
from services.streaming_asr_client import StreamingASRClient
from services.streaming_tts_client import StreamingTTSClient
from services.voice_utils import (
StreamTagFilter,
clean_markdown,
stream_v3_agent,
TTSSentenceSplitter,
)
from utils.settings import VOICE_LITE_SILENCE_TIMEOUT
logger = logging.getLogger('app')
class VoiceLiteSession:
"""Voice Lite session: ASR -> Agent -> TTS pipeline (cheaper alternative)."""
def __init__(
self,
bot_id: str,
session_id: Optional[str] = None,
user_identifier: Optional[str] = None,
on_audio: Optional[Callable[[bytes], Awaitable[None]]] = None,
on_asr_text: Optional[Callable[[str], Awaitable[None]]] = None,
on_agent_result: Optional[Callable[[str], Awaitable[None]]] = None,
on_agent_stream: Optional[Callable[[str], Awaitable[None]]] = None,
on_llm_text: Optional[Callable[[str], Awaitable[None]]] = None,
on_status: Optional[Callable[[str], Awaitable[None]]] = None,
on_error: Optional[Callable[[str], Awaitable[None]]] = None,
):
self.bot_id = bot_id
self.session_id = session_id or str(uuid.uuid4())
self.user_identifier = user_identifier or ""
self._bot_config: dict = {}
self._speaker: str = ""
# Callbacks
self._on_audio = on_audio
self._on_asr_text = on_asr_text
self._on_agent_result = on_agent_result
self._on_agent_stream = on_agent_stream
self._on_llm_text = on_llm_text
self._on_status = on_status
self._on_error = on_error
self._running = False
self._asr_client: Optional[StreamingASRClient] = None
self._asr_receive_task: Optional[asyncio.Task] = None
self._agent_task: Optional[asyncio.Task] = None
# Silence timeout tracking
self._last_asr_time: float = 0
self._silence_timer_task: Optional[asyncio.Task] = None
self._current_asr_text: str = ""
self._last_text_change_time: float = 0
self._last_changed_text: str = ""
self._last_asr_emit_time: float = 0
self._utterance_lock = asyncio.Lock()
# VAD (Voice Activity Detection) via webrtcvad
self._vad = webrtcvad.Vad(2) # aggressiveness 0-3 (2 = balanced)
self._vad_speaking = False # Whether user is currently speaking
self._vad_silence_start: float = 0 # When silence started
self._vad_finish_task: Optional[asyncio.Task] = None
self._pre_buffer: list = [] # Buffer audio before VAD triggers
self._vad_voice_streak: int = 0 # Consecutive voiced chunks count
self._vad_silence_streak: int = 0 # Consecutive silent chunks count
async def start(self) -> None:
"""Fetch bot config, mark session as running."""
from utils.fastapi_utils import fetch_bot_config_from_db
self._bot_config = await fetch_bot_config_from_db(self.bot_id, self.user_identifier)
self._speaker = self._bot_config.get("voice_speaker", "")
self._running = True
await self._emit_status("ready")
async def stop(self) -> None:
"""Gracefully stop the session."""
self._running = False
if self._vad_finish_task and not self._vad_finish_task.done():
self._vad_finish_task.cancel()
if self._silence_timer_task and not self._silence_timer_task.done():
self._silence_timer_task.cancel()
if self._agent_task and not self._agent_task.done():
from utils.cancel_manager import trigger_cancel
trigger_cancel(self.session_id)
self._agent_task.cancel()
if self._asr_receive_task and not self._asr_receive_task.done():
self._asr_receive_task.cancel()
if self._asr_client:
try:
await self._asr_client.send_finish()
except Exception:
pass
await self._asr_client.close()
# VAD configuration
VAD_SILENCE_DURATION = 1.5 # Seconds of silence before sending finish
VAD_PRE_BUFFER_SIZE = 5 # Number of audio chunks to buffer before VAD triggers
VAD_SOURCE_RATE = 24000 # Input audio sample rate
VAD_TARGET_RATE = 16000 # webrtcvad supported sample rate
VAD_FRAME_DURATION_MS = 30 # Frame duration for webrtcvad (10, 20, or 30 ms)
VAD_SPEECH_CHUNKS = 3 # Consecutive voiced chunks required to start speech
VAD_SILENCE_CHUNKS = 5 # Consecutive silent chunks required to confirm silence
_audio_chunk_count = 0
@staticmethod
def _resample_24k_to_16k(pcm_data: bytes) -> bytes:
"""Downsample 16-bit PCM from 24kHz to 16kHz (ratio 3:2).
Takes every 2 out of 3 samples (simple decimation).
"""
n_samples = len(pcm_data) // 2
if n_samples == 0:
return b''
samples = struct.unpack(f'<{n_samples}h', pcm_data[:n_samples * 2])
# Pick samples at indices 0, 1.5, 3, 4.5, ... -> floor(i * 3/2) for output index i
out_len = (n_samples * 2) // 3
resampled = []
for i in range(out_len):
src_idx = (i * 3) // 2
if src_idx < n_samples:
resampled.append(samples[src_idx])
return struct.pack(f'<{len(resampled)}h', *resampled)
def _webrtcvad_detect(self, pcm_data: bytes) -> bool:
"""Run webrtcvad on audio data. Returns True if voice is detected in any frame."""
resampled = self._resample_24k_to_16k(pcm_data)
frame_size = (self.VAD_TARGET_RATE * self.VAD_FRAME_DURATION_MS // 1000) * 2 # bytes per frame
if len(resampled) < frame_size:
return False
# Check frames; return True if any frame has voice
voice_frames = 0
total_frames = 0
for offset in range(0, len(resampled) - frame_size + 1, frame_size):
frame = resampled[offset:offset + frame_size]
total_frames += 1
try:
if self._vad.is_speech(frame, self.VAD_TARGET_RATE):
voice_frames += 1
except Exception:
pass
# Consider voice detected if at least one frame has speech
return voice_frames > 0
async def handle_audio(self, audio_data: bytes) -> None:
"""Forward user audio to ASR with VAD gating. Lazy-connect on speech start."""
if not self._running:
return
self._audio_chunk_count += 1
has_voice = self._webrtcvad_detect(audio_data)
now = asyncio.get_event_loop().time()
# Update consecutive streaks
if has_voice:
self._vad_voice_streak += 1
self._vad_silence_streak = 0
else:
self._vad_silence_streak += 1
self._vad_voice_streak = 0
if has_voice:
# Cancel any pending finish
if self._vad_finish_task and not self._vad_finish_task.done():
self._vad_finish_task.cancel()
self._vad_finish_task = None
if not self._vad_speaking and self._vad_voice_streak >= self.VAD_SPEECH_CHUNKS:
# Speech just started — connect ASR
self._vad_speaking = True
logger.info(f"[VoiceLite] VAD: speech started (webrtcvad), connecting ASR...")
try:
await self._connect_asr()
# Send buffered pre-speech audio
for buffered in self._pre_buffer:
await self._asr_client.send_audio(buffered)
self._pre_buffer.clear()
except Exception as e:
logger.error(f"[VoiceLite] VAD: ASR connect failed: {e}", exc_info=True)
self._vad_speaking = False
return
# Send current chunk
if self._asr_client:
try:
await self._asr_client.send_audio(audio_data)
except Exception:
pass
self._vad_silence_start = 0
else:
if self._vad_speaking:
# Brief silence while speaking — keep sending for ASR context
if self._asr_client:
try:
await self._asr_client.send_audio(audio_data)
except Exception:
pass
if self._vad_silence_start == 0:
self._vad_silence_start = now
# Require both consecutive silent chunks AND time threshold
if (self._vad_silence_streak >= self.VAD_SILENCE_CHUNKS
and (now - self._vad_silence_start) >= self.VAD_SILENCE_DURATION):
if not self._vad_finish_task or self._vad_finish_task.done():
self._vad_finish_task = asyncio.create_task(self._vad_send_finish())
else:
# Not speaking — buffer recent audio for pre-speech context
self._pre_buffer.append(audio_data)
if len(self._pre_buffer) > self.VAD_PRE_BUFFER_SIZE:
self._pre_buffer.pop(0)
async def _vad_send_finish(self) -> None:
"""Send finish signal to ASR after silence detected."""
logger.info(f"[VoiceLite] VAD: silence detected, sending finish to ASR")
self._vad_speaking = False
self._vad_silence_start = 0
self._vad_voice_streak = 0
self._vad_silence_streak = 0
if self._asr_client:
try:
await self._asr_client.send_finish()
except Exception as e:
logger.warning(f"[VoiceLite] VAD: send_finish failed: {e}")
async def handle_text(self, text: str) -> None:
"""Handle direct text input - bypass ASR and go straight to agent."""
if not self._running:
return
await self._interrupt_current()
self._agent_task = asyncio.create_task(self._process_utterance(text))
async def _connect_asr(self) -> None:
"""Create and connect a new ASR client, start receive loop."""
if self._asr_client:
try:
await self._asr_client.close()
except Exception:
pass
if self._asr_receive_task and not self._asr_receive_task.done():
self._asr_receive_task.cancel()
self._asr_client = StreamingASRClient(uid=self.user_identifier or "voice_lite")
await self._asr_client.connect()
logger.info(f"[VoiceLite] ASR client connected")
# Start receive loop for this ASR session
self._asr_receive_task = asyncio.create_task(self._asr_receive_loop())
async def _asr_receive_loop(self) -> None:
"""Receive ASR results from the current ASR session."""
try:
_listening_emitted = False
async for text, is_final in self._asr_client.receive_results():
if not self._running:
return
if not _listening_emitted:
await self._emit_status("listening")
_listening_emitted = True
now = asyncio.get_event_loop().time()
self._last_asr_time = now
self._current_asr_text = text
# Track text changes for stability detection
if text != self._last_changed_text:
self._last_changed_text = text
self._last_text_change_time = now
# Reset stability timer on every result
self._reset_silence_timer()
except asyncio.CancelledError:
pass
except Exception as e:
if self._running:
logger.warning(f"[VoiceLite] ASR session ended: {e}")
finally:
logger.info(f"[VoiceLite] ASR session done")
# Clean up ASR client after session ends
if self._asr_client:
try:
await self._asr_client.close()
except Exception:
pass
self._asr_client = None
def _reset_silence_timer(self) -> None:
"""Reset the silence timeout timer."""
if self._silence_timer_task and not self._silence_timer_task.done():
self._silence_timer_task.cancel()
self._silence_timer_task = asyncio.create_task(self._silence_timeout())
async def _silence_timeout(self) -> None:
"""Wait for silence timeout, then check if text has been stable."""
try:
await asyncio.sleep(VOICE_LITE_SILENCE_TIMEOUT)
if not self._running:
return
# Check if text has been stable (unchanged) for the timeout period
now = asyncio.get_event_loop().time()
if (self._current_asr_text
and (now - self._last_text_change_time) >= VOICE_LITE_SILENCE_TIMEOUT):
logger.info(f"[VoiceLite] Text stable for {VOICE_LITE_SILENCE_TIMEOUT}s, processing: '{self._current_asr_text}'")
await self._on_utterance_complete(self._current_asr_text)
except asyncio.CancelledError:
pass
async def _on_utterance_complete(self, text: str) -> None:
"""Called when a complete utterance is detected."""
if not text.strip():
return
async with self._utterance_lock:
# Cancel silence timer
if self._silence_timer_task and not self._silence_timer_task.done():
self._silence_timer_task.cancel()
# Interrupt any in-progress agent+TTS
await self._interrupt_current()
# Send final ASR text to frontend
if self._on_asr_text:
await self._on_asr_text(text)
self._current_asr_text = ""
self._last_changed_text = ""
self._agent_task = asyncio.create_task(self._process_utterance(text))
async def _interrupt_current(self) -> None:
"""Cancel current agent+TTS task if running."""
if self._agent_task and not self._agent_task.done():
logger.info(f"[VoiceLite] Interrupting previous agent task")
from utils.cancel_manager import trigger_cancel
trigger_cancel(self.session_id)
self._agent_task.cancel()
try:
await self._agent_task
except (asyncio.CancelledError, Exception):
pass
self._agent_task = None
async def _process_utterance(self, text: str) -> None:
"""Process a complete utterance: agent -> TTS pipeline."""
try:
logger.info(f"[VoiceLite] Processing utterance: '{text}'")
await self._emit_status("thinking")
accumulated_text = []
tag_filter = StreamTagFilter()
splitter = TTSSentenceSplitter()
tts_client = StreamingTTSClient(speaker=self._speaker)
speaking = False
async for chunk in stream_v3_agent(
user_text=text,
bot_id=self.bot_id,
bot_config=self._bot_config,
session_id=self.session_id,
user_identifier=self.user_identifier,
):
accumulated_text.append(chunk)
if self._on_agent_stream:
await self._on_agent_stream(chunk)
passthrough = tag_filter.feed(chunk)
if not passthrough:
if tag_filter.answer_ended:
for sentence in splitter.flush():
sentence = clean_markdown(sentence)
if sentence:
if not speaking:
await self._emit_status("speaking")
speaking = True
await self._send_tts(tts_client, sentence)
continue
# Feed raw passthrough to splitter (preserve newlines for splitting),
# apply clean_markdown on output sentences
for sentence in splitter.feed(passthrough):
sentence = clean_markdown(sentence)
if sentence:
if not speaking:
await self._emit_status("speaking")
speaking = True
await self._send_tts(tts_client, sentence)
# Handle remaining text
for sentence in splitter.flush():
sentence = clean_markdown(sentence)
if sentence:
if not speaking:
await self._emit_status("speaking")
speaking = True
await self._send_tts(tts_client, sentence)
# Log full agent result (not sent to frontend, already streamed)
full_result = "".join(accumulated_text)
logger.info(f"[VoiceLite] Agent done ({len(full_result)} chars)")
# Notify frontend that agent text stream is complete
if self._on_agent_result:
await self._on_agent_result(full_result)
await self._emit_status("idle")
except asyncio.CancelledError:
logger.info(f"[VoiceLite] Agent task cancelled (user interrupted)")
raise
except Exception as e:
logger.error(f"[VoiceLite] Error processing utterance: {e}", exc_info=True)
await self._emit_error(f"Processing failed: {str(e)}")
async def _send_tts(self, tts_client: StreamingTTSClient, sentence: str) -> None:
"""Synthesize a sentence and emit audio chunks."""
logger.info(f"[VoiceLite] TTS sentence: '{sentence[:80]}'")
async for audio_chunk in tts_client.synthesize(sentence):
if self._on_audio:
await self._on_audio(audio_chunk)
async def _emit_status(self, status: str) -> None:
if self._on_status:
await self._on_status(status)
async def _emit_error(self, message: str) -> None:
if self._on_error:
await self._on_error(message)

View File

@ -1,96 +1,14 @@
import asyncio import asyncio
import json
import logging import logging
import re
import uuid import uuid
from typing import Optional, Callable, Awaitable, AsyncGenerator from typing import Optional, Callable, Awaitable
from services.realtime_voice_client import RealtimeDialogClient from services.realtime_voice_client import RealtimeDialogClient
from services.voice_utils import StreamTagFilter, clean_markdown, stream_v3_agent, SENTENCE_END_RE
logger = logging.getLogger('app') logger = logging.getLogger('app')
class _StreamTagFilter:
"""
Filters streaming text based on tag blocks.
Only passes through content inside [ANSWER] blocks.
If no tags are found at all, passes through everything (fallback).
Skips content inside [TOOL_CALL], [TOOL_RESPONSE], [THINK], [SOURCE], etc.
"""
SKIP_TAGS = {"TOOL_CALL", "TOOL_RESPONSE", "THINK", "SOURCE", "REFERENCE"}
KNOWN_TAGS = {"ANSWER", "TOOL_CALL", "TOOL_RESPONSE", "THINK", "SOURCE", "REFERENCE", "PREAMBLE", "SUMMARY"}
def __init__(self):
self.state = "idle" # idle, answer, skip
self.found_any_tag = False
self._pending = "" # buffer for partial tag like "[TOO..."
self.answer_ended = False # True when ANSWER block ends (e.g. hit [TOOL_CALL])
def feed(self, chunk: str) -> str:
"""Feed a chunk, return text that should be passed to TTS."""
self.answer_ended = False
self._pending += chunk
output = []
while self._pending:
if self.state in ("idle", "answer"):
bracket_pos = self._pending.find("[")
if bracket_pos == -1:
if self.state == "answer" or not self.found_any_tag:
output.append(self._pending)
self._pending = ""
else:
before = self._pending[:bracket_pos]
if before and (self.state == "answer" or not self.found_any_tag):
output.append(before)
close_pos = self._pending.find("]", bracket_pos)
if close_pos == -1:
# Incomplete tag — wait for next chunk
self._pending = self._pending[bracket_pos:]
break
tag_name = self._pending[bracket_pos + 1:close_pos]
self._pending = self._pending[close_pos + 1:]
if tag_name not in self.KNOWN_TAGS:
if self.state == "answer" or not self.found_any_tag:
output.append(f"[{tag_name}]")
continue
self.found_any_tag = True
if tag_name == "ANSWER":
self.state = "answer"
else:
if self.state == "answer":
self.answer_ended = True
self.state = "skip"
elif self.state == "skip":
bracket_pos = self._pending.find("[")
if bracket_pos == -1:
self._pending = ""
else:
close_pos = self._pending.find("]", bracket_pos)
if close_pos == -1:
self._pending = self._pending[bracket_pos:]
break
tag_name = self._pending[bracket_pos + 1:close_pos]
self._pending = self._pending[close_pos + 1:]
if tag_name not in self.KNOWN_TAGS:
continue
if tag_name == "ANSWER":
self.state = "answer"
else:
self.state = "skip"
return "".join(output)
class VoiceSession: class VoiceSession:
"""Manages a single voice dialogue session lifecycle""" """Manages a single voice dialogue session lifecycle"""
@ -288,30 +206,6 @@ class VoiceSession:
logger.error(f"[Voice] Server error: {error_msg}") logger.error(f"[Voice] Server error: {error_msg}")
await self._emit_error(error_msg) await self._emit_error(error_msg)
# Sentence-ending punctuation pattern for splitting TTS
_SENTENCE_END_RE = re.compile(r'[。!?;\n.!?;]')
# Markdown syntax to strip before TTS
_MD_CLEAN_RE = re.compile(r'#{1,6}\s*|(?<!\w)\*{1,3}|(?<!\w)_{1,3}|\*{1,3}(?!\w)|_{1,3}(?!\w)|~~|`{1,3}|^>\s*|^\s*[-*+]\s+|^\s*\d+\.\s+|\[([^\]]*)\]\([^)]*\)|!\[([^\]]*)\]\([^)]*\)', re.MULTILINE)
@staticmethod
def _clean_markdown(text: str) -> str:
"""Strip Markdown formatting characters for TTS readability."""
# Replace links/images with their display text
text = re.sub(r'!\[([^\]]*)\]\([^)]*\)', r'\1', text)
text = re.sub(r'\[([^\]]*)\]\([^)]*\)', r'\1', text)
# Remove headings, bold, italic, strikethrough, code marks, blockquote
text = re.sub(r'#{1,6}\s*', '', text)
text = re.sub(r'\*{1,3}|_{1,3}|~~|`{1,3}', '', text)
text = re.sub(r'^>\s*', '', text, flags=re.MULTILINE)
# Remove list markers
text = re.sub(r'^\s*[-*+]\s+', '', text, flags=re.MULTILINE)
text = re.sub(r'^\s*\d+\.\s+', '', text, flags=re.MULTILINE)
# Remove horizontal rules
text = re.sub(r'^[\s]*[-*_]{3,}[\s]*$', '', text, flags=re.MULTILINE)
# Collapse extra whitespace
text = re.sub(r'\n{2,}', '\n', text)
return text.strip()
async def _on_asr_text_received(self, text: str) -> None: async def _on_asr_text_received(self, text: str) -> None:
"""Called when ASR text is received — stream agent output, send TTS sentence by sentence""" """Called when ASR text is received — stream agent output, send TTS sentence by sentence"""
if not text.strip(): if not text.strip():
@ -324,9 +218,15 @@ class VoiceSession:
accumulated_text = [] # full agent output for on_agent_result callback accumulated_text = [] # full agent output for on_agent_result callback
sentence_buf = "" # buffer for accumulating until sentence boundary sentence_buf = "" # buffer for accumulating until sentence boundary
tts_started = False # whether we've sent the first TTS chunk tts_started = False # whether we've sent the first TTS chunk
tag_filter = _StreamTagFilter() tag_filter = StreamTagFilter()
async for chunk in self._stream_v3_agent(text): async for chunk in stream_v3_agent(
user_text=text,
bot_id=self.bot_id,
bot_config=self._bot_config,
session_id=self.session_id,
user_identifier=self.user_identifier,
):
accumulated_text.append(chunk) accumulated_text.append(chunk)
if self._on_agent_stream: if self._on_agent_stream:
@ -341,7 +241,7 @@ class VoiceSession:
flush = sentence_buf.strip() flush = sentence_buf.strip()
sentence_buf = "" sentence_buf = ""
if flush: if flush:
flush = self._clean_markdown(flush) flush = clean_markdown(flush)
if flush: if flush:
if tts_started and self._tts_segment_done: if tts_started and self._tts_segment_done:
logger.info(f"[Voice] TTS segment done, closing session and waiting for delivery (answer ended)") logger.info(f"[Voice] TTS segment done, closing session and waiting for delivery (answer ended)")
@ -371,7 +271,7 @@ class VoiceSession:
# Check for sentence boundaries and send complete sentences to TTS # Check for sentence boundaries and send complete sentences to TTS
while True: while True:
match = self._SENTENCE_END_RE.search(sentence_buf) match = SENTENCE_END_RE.search(sentence_buf)
if not match: if not match:
break break
# Split at sentence boundary (include the punctuation) # Split at sentence boundary (include the punctuation)
@ -380,7 +280,7 @@ class VoiceSession:
sentence_buf = sentence_buf[end_pos:] sentence_buf = sentence_buf[end_pos:]
if sentence: if sentence:
sentence = self._clean_markdown(sentence) sentence = clean_markdown(sentence)
if sentence: if sentence:
# If previous TTS segment completed (e.g. gap during tool call), # If previous TTS segment completed (e.g. gap during tool call),
# close old session, wait for TTS delivery to finish, then restart # close old session, wait for TTS delivery to finish, then restart
@ -410,7 +310,7 @@ class VoiceSession:
# Handle remaining text in buffer (last sentence without ending punctuation) # Handle remaining text in buffer (last sentence without ending punctuation)
remaining = sentence_buf.strip() remaining = sentence_buf.strip()
if remaining: if remaining:
remaining = self._clean_markdown(remaining) remaining = clean_markdown(remaining)
if remaining: if remaining:
# If previous TTS segment completed, close and wait before restart # If previous TTS segment completed, close and wait before restart
if tts_started and self._tts_segment_done: if tts_started and self._tts_segment_done:
@ -464,70 +364,6 @@ class VoiceSession:
self._is_sending_chat_tts_text = False self._is_sending_chat_tts_text = False
await self._emit_error(f"Agent call failed: {str(e)}") await self._emit_error(f"Agent call failed: {str(e)}")
async def _stream_v3_agent(self, user_text: str) -> AsyncGenerator[str, None]:
"""Call v3 agent API in streaming mode, yield text chunks as they arrive"""
try:
from utils.api_models import ChatRequestV3, Message
from utils.fastapi_utils import (
process_messages,
create_project_directory,
)
from agent.agent_config import AgentConfig
from routes.chat import enhanced_generate_stream_response
bot_config = self._bot_config
language = bot_config.get("language", "zh")
messages_obj = [Message(role="user", content=user_text)]
request = ChatRequestV3(
messages=messages_obj,
bot_id=self.bot_id,
stream=True,
session_id=self.session_id,
user_identifier=self.user_identifier,
)
project_dir = create_project_directory(
bot_config.get("dataset_ids", []),
self.bot_id,
bot_config.get("skills", []),
)
processed_messages = process_messages(messages_obj, language)
config = await AgentConfig.from_v3_request(
request,
bot_config,
project_dir,
processed_messages,
language,
)
config.stream = True
async for sse_line in enhanced_generate_stream_response(config):
if not sse_line or not sse_line.startswith("data: "):
continue
data_str = sse_line.strip().removeprefix("data: ")
if data_str == "[DONE]":
break
try:
data = json.loads(data_str)
choices = data.get("choices", [])
if choices:
delta = choices[0].get("delta", {})
content = delta.get("content", "")
if content:
yield content
except (json.JSONDecodeError, KeyError):
continue
except asyncio.CancelledError:
logger.info(f"[Voice] v3 agent call cancelled")
raise
except Exception as e:
logger.error(f"[Voice] Error calling v3 agent: {e}", exc_info=True)
@staticmethod @staticmethod
def _extract_answer(agent_result: str) -> str: def _extract_answer(agent_result: str) -> str:
"""Extract the answer portion from agent result, stripping tags like [ANSWER], [THINK] etc.""" """Extract the answer portion from agent result, stripping tags like [ANSWER], [THINK] etc."""

316
services/voice_utils.py Normal file
View File

@ -0,0 +1,316 @@
import json
import re
import logging
from typing import Optional, AsyncGenerator
logger = logging.getLogger('app')
SENTENCE_END_RE = re.compile(r'[。!?;\n.!?;]')
# Emoji pattern: matches Unicode emoji without touching CJK characters
_EMOJI_RE = re.compile(
"["
"\U0001F600-\U0001F64F" # emoticons
"\U0001F300-\U0001F5FF" # symbols & pictographs
"\U0001F680-\U0001F6FF" # transport & map
"\U0001F1E0-\U0001F1FF" # flags
"\U0001F900-\U0001F9FF" # supplemental symbols
"\U0001FA00-\U0001FA6F" # chess symbols
"\U0001FA70-\U0001FAFF" # symbols extended-A
"\U00002702-\U000027B0" # dingbats
"\U00002600-\U000026FF" # misc symbols
"\U0000FE00-\U0000FE0F" # variation selectors
"\U0000200D" # zero width joiner
"\U000024C2" # Ⓜ enclosed letter
"\U00002B50\U00002B55" # star, circle
"\U000023CF\U000023E9-\U000023F3\U000023F8-\U000023FA" # media controls
"\U0001F170-\U0001F251" # enclosed alphanumeric supplement
"]+",
flags=re.UNICODE,
)
# Strong sentence-ending punctuation (excluding \n which is handled separately)
_STRONG_PUNCT_RE = re.compile(r'[。!?;.!?;~]')
# Soft punctuation (usable as split points when buffer is getting long)
_SOFT_PUNCT_RE = re.compile(r'[,:、)) \t]')
class TTSSentenceSplitter:
"""
Intelligent sentence splitter for TTS streaming.
Rules (in priority order):
1. Split on newlines unconditionally (LLM paragraph boundaries)
2. Split on strong punctuation ( etc.) only if accumulated >= MIN_LENGTH
3. If buffer reaches SOFT_THRESHOLD, also split on soft punctuation (etc.)
4. If buffer reaches MAX_LENGTH, force split at best available position
- Strip emoji from output (TTS cannot pronounce them)
- On flush(), return any remaining text regardless of length
"""
MIN_LENGTH = 10 # Don't send sentences shorter than this
SOFT_THRESHOLD = 30 # Start considering soft punctuation splits
MAX_LENGTH = 80 # Force split even without punctuation
def __init__(self):
self._buf = ""
def _clean_for_tts(self, text: str) -> str:
"""Remove emoji and collapse whitespace."""
text = _EMOJI_RE.sub("", text)
text = re.sub(r'[ \t]+', ' ', text)
return text.strip()
def feed(self, chunk: str) -> list[str]:
"""Feed a text chunk, return list of ready sentences (may be empty)."""
self._buf += chunk
results = []
while self._buf:
buf_len = len(self._buf)
# 0. Newline split — highest priority
nl_pos = self._buf.find('\n')
if nl_pos >= 0:
before = self._buf[:nl_pos]
rest = self._buf[nl_pos:].lstrip('\n')
cleaned = self._clean_for_tts(before)
if len(cleaned) >= self.MIN_LENGTH:
# Long enough, emit as a sentence
self._buf = rest
results.append(cleaned)
continue
elif not rest:
# No more text after newline, keep buffer and wait
break
else:
# Too short — merge with next paragraph
self._buf = before + rest
continue
# 1. Try strong punctuation split — scan for the best split point
best_end = -1
for match in _STRONG_PUNCT_RE.finditer(self._buf):
end_pos = match.end()
candidate = self._buf[:end_pos]
if len(candidate.strip()) >= self.MIN_LENGTH:
best_end = end_pos
break # Take the first valid (long enough) split
# Short segment before this punct — skip and keep scanning
if best_end > 0:
sentence = self._clean_for_tts(self._buf[:best_end])
self._buf = self._buf[best_end:]
if sentence:
results.append(sentence)
continue
# 2. Buffer getting long: try soft punctuation split
if buf_len >= self.SOFT_THRESHOLD:
best_soft = -1
for m in _SOFT_PUNCT_RE.finditer(self._buf):
pos = m.end()
if pos >= self.MIN_LENGTH:
best_soft = pos
if pos >= self.SOFT_THRESHOLD:
break
if best_soft >= self.MIN_LENGTH:
sentence = self._clean_for_tts(self._buf[:best_soft])
self._buf = self._buf[best_soft:]
if sentence:
results.append(sentence)
continue
# 3. Buffer too long: force split at MAX_LENGTH
if buf_len >= self.MAX_LENGTH:
split_at = self.MAX_LENGTH
search_region = self._buf[self.MIN_LENGTH:self.MAX_LENGTH]
last_space = max(search_region.rfind(' '), search_region.rfind(''),
search_region.rfind(','), search_region.rfind(''))
if last_space >= 0:
split_at = self.MIN_LENGTH + last_space + 1
sentence = self._clean_for_tts(self._buf[:split_at])
self._buf = self._buf[split_at:]
if sentence:
results.append(sentence)
continue
# Not enough text yet, wait for more
break
return results
def flush(self) -> list[str]:
"""Flush remaining buffer. Call at end of stream."""
results = []
if self._buf.strip():
sentence = self._clean_for_tts(self._buf)
if sentence:
results.append(sentence)
self._buf = ""
return results
class StreamTagFilter:
"""
Filters streaming text based on tag blocks.
Only passes through content inside [ANSWER] blocks.
If no tags are found at all, passes through everything (fallback).
Skips content inside [TOOL_CALL], [TOOL_RESPONSE], [THINK], [SOURCE], etc.
"""
SKIP_TAGS = {"TOOL_CALL", "TOOL_RESPONSE", "THINK", "SOURCE", "REFERENCE"}
KNOWN_TAGS = {"ANSWER", "TOOL_CALL", "TOOL_RESPONSE", "THINK", "SOURCE", "REFERENCE", "PREAMBLE", "SUMMARY"}
def __init__(self):
self.state = "idle" # idle, answer, skip
self.found_any_tag = False
self._pending = ""
self.answer_ended = False
def feed(self, chunk: str) -> str:
"""Feed a chunk, return text that should be passed to TTS."""
self.answer_ended = False
self._pending += chunk
output = []
while self._pending:
if self.state in ("idle", "answer"):
bracket_pos = self._pending.find("[")
if bracket_pos == -1:
if self.state == "answer" or not self.found_any_tag:
output.append(self._pending)
self._pending = ""
else:
before = self._pending[:bracket_pos]
if before and (self.state == "answer" or not self.found_any_tag):
output.append(before)
close_pos = self._pending.find("]", bracket_pos)
if close_pos == -1:
self._pending = self._pending[bracket_pos:]
break
tag_name = self._pending[bracket_pos + 1:close_pos]
self._pending = self._pending[close_pos + 1:]
if tag_name not in self.KNOWN_TAGS:
if self.state == "answer" or not self.found_any_tag:
output.append(f"[{tag_name}]")
continue
self.found_any_tag = True
if tag_name == "ANSWER":
self.state = "answer"
else:
if self.state == "answer":
self.answer_ended = True
self.state = "skip"
elif self.state == "skip":
bracket_pos = self._pending.find("[")
if bracket_pos == -1:
self._pending = ""
else:
close_pos = self._pending.find("]", bracket_pos)
if close_pos == -1:
self._pending = self._pending[bracket_pos:]
break
tag_name = self._pending[bracket_pos + 1:close_pos]
self._pending = self._pending[close_pos + 1:]
if tag_name not in self.KNOWN_TAGS:
continue
if tag_name == "ANSWER":
self.state = "answer"
else:
self.state = "skip"
return "".join(output)
def clean_markdown(text: str) -> str:
"""Strip Markdown formatting characters for TTS readability."""
text = re.sub(r'!\[([^\]]*)\]\([^)]*\)', r'\1', text)
text = re.sub(r'\[([^\]]*)\]\([^)]*\)', r'\1', text)
text = re.sub(r'#{1,6}\s*', '', text)
text = re.sub(r'\*{1,3}|_{1,3}|~~|`{1,3}', '', text)
text = re.sub(r'^>\s*', '', text, flags=re.MULTILINE)
text = re.sub(r'^\s*[-*+]\s+', '', text, flags=re.MULTILINE)
text = re.sub(r'^\s*\d+\.\s+', '', text, flags=re.MULTILINE)
text = re.sub(r'^[\s]*[-*_]{3,}[\s]*$', '', text, flags=re.MULTILINE)
text = re.sub(r'\n{2,}', '\n', text)
return text.strip()
async def stream_v3_agent(
user_text: str,
bot_id: str,
bot_config: dict,
session_id: str,
user_identifier: str,
) -> AsyncGenerator[str, None]:
"""Call v3 agent API in streaming mode, yield text chunks as they arrive."""
import asyncio
try:
from utils.api_models import ChatRequestV3, Message
from utils.fastapi_utils import (
process_messages,
create_project_directory,
)
from agent.agent_config import AgentConfig
from routes.chat import enhanced_generate_stream_response
language = bot_config.get("language", "zh")
messages_obj = [Message(role="user", content=user_text)]
request = ChatRequestV3(
messages=messages_obj,
bot_id=bot_id,
stream=True,
session_id=session_id,
user_identifier=user_identifier,
)
project_dir = create_project_directory(
bot_config.get("dataset_ids", []),
bot_id,
bot_config.get("skills", []),
)
processed_messages = process_messages(messages_obj, language)
config = await AgentConfig.from_v3_request(
request,
bot_config,
project_dir,
processed_messages,
language,
)
config.stream = True
async for sse_line in enhanced_generate_stream_response(config):
if not sse_line or not sse_line.startswith("data: "):
continue
data_str = sse_line.strip().removeprefix("data: ")
if data_str == "[DONE]":
break
try:
data = json.loads(data_str)
choices = data.get("choices", [])
if choices:
delta = choices[0].get("delta", {})
content = delta.get("content", "")
if content:
yield content
except (json.JSONDecodeError, KeyError):
continue
except asyncio.CancelledError:
logger.info(f"[Voice] v3 agent call cancelled")
raise
except Exception as e:
logger.error(f"[Voice] Error calling v3 agent: {e}", exc_info=True)

View File

@ -121,19 +121,20 @@ NEW_API_ADMIN_KEY = os.getenv("NEW_API_ADMIN_KEY", "")
# ============================================================ # ============================================================
# Volcengine Realtime Dialogue Configuration # Volcengine Realtime Dialogue Configuration
# ============================================================ # ============================================================
VOLCENGINE_REALTIME_URL = os.getenv( VOLCENGINE_APP_ID = os.getenv("VOLCENGINE_APP_ID", "2511880162")
"VOLCENGINE_REALTIME_URL", VOLCENGINE_ACCESS_KEY = os.getenv("VOLCENGINE_ACCESS_KEY", "pjLbaqR1lHFfkv1xcJAYnvKV0HAvsBvt")
"wss://openspeech.bytedance.com/api/v3/realtime/dialogue"
)
VOLCENGINE_APP_ID = os.getenv("VOLCENGINE_APP_ID", "8718217928")
VOLCENGINE_ACCESS_KEY = os.getenv("VOLCENGINE_ACCESS_KEY", "ynJMX-5ix1FsJvswC9KTNlGUdubcchqc")
VOLCENGINE_RESOURCE_ID = os.getenv("VOLCENGINE_RESOURCE_ID", "volc.speech.dialog")
VOLCENGINE_APP_KEY = os.getenv("VOLCENGINE_APP_KEY", "PlgvMymc7f3tQnJ6")
VOLCENGINE_DEFAULT_SPEAKER = os.getenv( VOLCENGINE_DEFAULT_SPEAKER = os.getenv(
"VOLCENGINE_DEFAULT_SPEAKER", "zh_male_yunzhou_jupiter_bigtts" "VOLCENGINE_DEFAULT_SPEAKER", "zh_female_xiaohe_uranus_bigtts"
) )
VOLCENGINE_TTS_SAMPLE_RATE = int(os.getenv("VOLCENGINE_TTS_SAMPLE_RATE", "24000")) VOLCENGINE_TTS_SAMPLE_RATE = int(os.getenv("VOLCENGINE_TTS_SAMPLE_RATE", "24000"))
# ============================================================
# Voice Lite Configuration (ASR + Agent + TTS pipeline)
# ============================================================
VOICE_DEFAULT_MODE = os.getenv("VOICE_DEFAULT_MODE", "lite") # "realtime" | "lite"
# Silence timeout (seconds) - ASR considers user done speaking after this
VOICE_LITE_SILENCE_TIMEOUT = float(os.getenv("VOICE_LITE_SILENCE_TIMEOUT", "1.5"))
# ============================================================ # ============================================================
# Single Agent Mode Configuration # Single Agent Mode Configuration
# ============================================================ # ============================================================