在语音对话系统中,“什么时候开始说话”和“什么时候停止说话”决定了交互的流畅度。传统系统大量依赖 VAD(Voice Activity Detection,语音活动检测)来区分“有声 / 无声”,然后在静音处粗略认为“用户说完了”。这种做法存在两个典型问题:
Smart Turn v3.1 针对的是“轮次结束检测(turn end detection)”这一更高层次问题:在已有 VAD 分段的基础上,判断当前这一段语音是否已经表达完整、是否应触发系统应答。相较于“仅有 VAD”,Smart Turn 利用了语音的时序模式和内容线索,使得轮次结束判断更加接近人的直觉。
在本项目中,Smart Turn v3.1 已以 ONNX 形式提供,并通过脚本 record_and_predict.py (原始的升级后的版本,详见附录)实现了一个从麦克风实时采集、VAD 分段、模型推理到日志打印的端到端 Demo,非常适合作为实战入门。
以 record_and_predict.py 为核心,Smart Turn v3.1 的本地实时推理流程可以概括为:
int16 PCM 音频,以固定块大小 CHUNK = 512 连续读取。VAD_THRESHOLD 比较,得到布尔值 is_speech。 True:认为当前块属于语音 False:认为当前块属于静音 PRE_SPEECH_MS 时间内的音频一并纳入,避免剪掉起始音。 predict_endpoint)
每当判定需要检测时:float32 数组([-1, 1]) AsyncSmartTurnDetector 在后台线程调用 predict_endpoint(audio_array) predict_endpoint 会:prediction(0/1)和 probability(表达完整的概率) _process_segment() 负责打印本段的统计信息,包括:_reset_state() 重置内部状态,等待下一轮语音段。建议使用 Python 3.10+,并在项目目录创建虚拟环境:
cd /Users/huyiyang/Workspace/smart-turn
python3.10 -m venv venv
source venv/bin/activate
pip install -r requirements.txt如只进行推理实验,也可以基于 requirements_inference.txt 安装精简依赖,但运行 record_and_predict.py 需要包含 PyAudio、ONNX Runtime、NumPy 等组件。
record_and_predict.py 依赖 PyAudio,后者在安装和运行时依赖系统级 PortAudio 库。以 macOS + Homebrew 为例:
brew install portaudio
pip install pyaudio如果使用 pip install -r requirements.txt,确保在此之前已完成 PortAudio 的系统安装。
本项目中已包含:
silero_vad.onnx:Silero VAD 模型 smart-turn-v3.1.onnx:Smart Turn v3.1 ONNX 权重 其中 VAD 模型在缺失时会由 ensure_model() 自动从 GitHub 下载;Smart Turn ONNX 则需提前放置在仓库根目录(同脚本目录)或调整 inference.py 中的模型路径。
record_and_predict.py 结构解析脚本开头设置了一系列基础常量:
RATE = 16000:采样率 16 kHz CHUNK = 512:每次从设备读取 512 个样本(约 32 ms) CHANNELS = 1:单声道 VAD_THRESHOLD = 0.5:语音概率高于 0.5 判定为“语音” PRE_SPEECH_MS = 200:在触发语音开始前保留 200 ms 的历史音频 SileroVAD 类对 VAD ONNX 模型进行了封装,主要特性包括:
silero_vad.onnx,限制线程数,避免干扰主业务。 _state 和 _context 以保证跨块连续性。 prob(chunk_f32) 接收长度为 CHUNK 的 float32 数组,返回当前块的语音概率,并在必要时重置内部状态(MODEL_RESET_STATES_TIME 控制重置间隔)。AsyncSmartTurnDetectorAsyncSmartTurnDetector 负责把 Smart Turn 推理从主循环中“解耦”出来:
_warmup(): predict_endpoint(),完成模型加载与图优化的预热。 submit_async(audio_segment): _run_inference(): predict_endpoint(audio_segment) prediction、probability、inference_time_ms 以及一个简单的 audio_hash 标识。 get_result_if_ready() 与 get_result_blocking() 分别提供了非阻塞与带超时阻塞的结果获取方式,便于主循环在不同场景下选择使用。这种设计兼顾了两个目标:
DynamicEndpointDetectorDynamicEndpointDetector 是整个脚本体现“工程策略”的关键部分,其职责是:根据静音长度与上次模型置信度,动态决定检测频率与结束条件。
其核心机制包括:
EARLY_CHECK_MS、CHECK_INTERVAL_MS、MIN_CHECK_INTERVAL_MS、MAX_STOP_MS)统一转换为“块数”,方便与采集循环对接。 last_probability 计算“下一次允许检测前需要等待的块数”(get_dynamic_interval()): HIGH_CONFIDENCE):检测间隔缩短,允许更频繁尝试结束 should_check(trailing_silence_chunks): should_force_end(trailing_silence_chunks, since_trigger_chunks, max_chunks): should_end_by_confidence(probability, trailing_silence_chunks): 通过这套机制,系统在“用户停顿较短、模型信心很高”的情况下可以快速响应;在“不确定用户是否说完”的情况下,则倾向保守,避免打断。
record_and_predict()record_and_predict() 是整个 Demo 的核心控制逻辑,其主要步骤为:
chunk_ms、pre_chunks、max_chunks 等派生量。 SileroVAD、DynamicEndpointDetector、AsyncSmartTurnDetector。 pre_buffer,用于存放触发前的若干块音频。 CHUNK 大小的 int16 数据,并转换为 float32。 is_speech。 根据当前是否处于“语音段内”分为两种情况:
pre_buffer。 is_speech 为真,创建新语音段: segment = list(pre_buffer) + [当前块] speech_active = True,重置静音与计数,重置检测器状态。 segment,递增 since_trigger_chunks。 is_speech 为真: trailing_silence 置零。 is_speech 为假: trailing_silence。 随后依次执行:
detector.should_force_end(...)。 segment 为连续数组 audio_segment; get_result_blocking() 获取 Smart Turn 结果,若失败则同步调用 predict_endpoint(audio_segment); _process_segment() 打印日志; detector.should_check(trailing_silence) 为真: on_check_started); segment 并调用 smart_turn.submit_async(audio_segment.copy())。 smart_turn.get_result_if_ready() 检查是否有新结果。 on_check_completed); DEBUG_LOG,打印检测次数、静音时长、概率、推理耗时等信息。 should_end_by_confidence() 决定是否结束: _process_segment()、重置状态并重启流。KeyboardInterrupt 友好退出,关闭音频流并释放资源。 smart_turn.shutdown() 关闭线程池。整体而言,record_and_predict.py 实现了一个完整的实时语音 → 智能轮次结束判断 → 日志展示闭环,足以作为集成 Smart Turn v3.1 到生产系统前的“实验台”。
在依赖安装完毕(包括 PortAudio / PyAudio、ONNX Runtime、NumPy、Transformers 等)后,只需运行:
python record_and_predict.py
脚本启动后会:
此时,你可以尝试以下话术来观察模型表现:
每次语音段结束后,控制台会打印:
对比你的主观感受与模型判断,可以直观评估当前阈值和策略是否适合你的目标场景。
在不同业务场景中,可以围绕以下几个参数进行调优:
VAD_THRESHOLD: EARLY_CHECK_MS 与 MAX_STOP_MS: EARLY_CHECK_MS,减少过早检测;同时控制 MAX_STOP_MS,防止长时间静音。 EARLY_CHECK_MS 和 MAX_STOP_MS,以追求更快响应。 HIGH_CONFIDENCE / MEDIUM_CONFIDENCE / LOW_CONFIDENCE: HIGH_CONFIDENCE 或增加中等置信度下所需静音时长。 建议在本地环境中多轮对话测试,通过观测日志中概率与结束原因,逐步收敛到适合你应用场景的一组参数组合。
在已有语音系统中引入 Smart Turn v3.1 时,推荐的工程模式是:
record_and_predict.py 的数据处理与 predict_endpoint 调用逻辑); probability 与业务阈值决定是否触发系统回复,或继续等待更多语音。record_and_predict.py 提供了一套从麦克风采集到策略决策的参考实现,你可以按以下层次拆分并迁移到自己的代码库中:
inference.py / predict_endpoint) record_and_predict.py 提供了一个 仅依赖麦克风输入的完整实战 Demo:包括 Silero VAD、动态端点检测策略、Smart Turn ONNX 推理与异步调度。 在掌握并跑通 record_and_predict.py 的基础上,你可以逐步将 Smart Turn v3.1 集成到自己的语音对话系统中,将“何时说话”这一能力提升到更接近人类直觉的水平。
# record_and_predict.py
import os
import time
import math
import urllib.request
import threading
from collections import deque
from concurrent.futures import ThreadPoolExecutor, Future
from typing import Optional
import numpy as np
import pyaudio
from scipy.io import wavfile
import onnxruntime as ort
from inference import predict_endpoint
# --- 基础配置(固定 16 kHz 单声道,512 样本块)---
RATE = 16000
CHUNK = 512 # Silero VAD 在 16 kHz 下需要 512 个样本
FORMAT = pyaudio.paInt16
CHANNELS = 1
# --- VAD 配置 ---
VAD_THRESHOLD = 0.5 # 语音概率阈值
PRE_SPEECH_MS = 200 # 触发前保留的毫秒数
# --- 动态端点检测配置 ---
EARLY_CHECK_MS = 80 # 静音后多久开始第一次检测
CHECK_INTERVAL_MS = 150 # 基础检测间隔
MIN_CHECK_INTERVAL_MS = 80 # 高置信度时的最小检测间隔
MAX_STOP_MS = 1500 # 最大静音等待时间(兜底)
MAX_DURATION_SECONDS = 8 # 每段音频的最大时长上限
# --- 置信度阈值 ---
HIGH_CONFIDENCE = 0.70 # 高置信度阈值,可立即结束
MEDIUM_CONFIDENCE = 0.50 # 中等置信度
LOW_CONFIDENCE = 0.30 # 低置信度,需继续等待
# --- 调试配置 ---
DEBUG_SAVE_WAV = False
TEMP_OUTPUT_WAV = "temp_output.wav"
DEBUG_LOG = True # 是否打印检测日志
# --- Silero ONNX 模型 ---
ONNX_MODEL_URL = (
"https://github.com/snakers4/silero-vad/raw/master/src/silero_vad/data/silero_vad.onnx"
)
ONNX_MODEL_PATH = "silero_vad.onnx"
MODEL_RESET_STATES_TIME = 5.0
class SileroVAD:
"""Silero VAD ONNX 封装类,适用于 16 kHz 单声道,块大小为 512。"""
def __init__(self, model_path: str):
opts = ort.SessionOptions()
opts.inter_op_num_threads = 1
opts.intra_op_num_threads = 1
self.session = ort.InferenceSession(
model_path, providers=["CPUExecutionProvider"], sess_options=opts
)
self.context_size = 64
self._state = None
self._context = None
self._last_reset_time = time.time()
self._init_states()
def _init_states(self):
self._state = np.zeros((2, 1, 128), dtype=np.float32)
self._context = np.zeros((1, self.context_size), dtype=np.float32)
def maybe_reset(self):
if (time.time() - self._last_reset_time) >= MODEL_RESET_STATES_TIME:
self._init_states()
self._last_reset_time = time.time()
def prob(self, chunk_f32: np.ndarray) -> float:
"""计算一个长度为 512 的音频块的语音概率。"""
x = np.reshape(chunk_f32, (1, -1))
if x.shape[1] != CHUNK:
raise ValueError(f"期望 {CHUNK} 个样本,实际得到 {x.shape[1]}")
x = np.concatenate((self._context, x), axis=1)
ort_inputs = {
"input": x.astype(np.float32),
"state": self._state,
"sr": np.array(16000, dtype=np.int64)
}
out, self._state = self.session.run(None, ort_inputs)
self._context = x[:, -self.context_size:]
self.maybe_reset()
return float(out[0][0])
class AsyncSmartTurnDetector:
"""异步 Smart Turn 检测器,支持预热和异步推理。"""
def __init__(self, max_workers: int = 2):
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.current_future: Optional[Future] = None
self.last_result: Optional[dict] = None
self.last_audio_hash: Optional[int] = None
self._lock = threading.Lock()
# 预热模型(首次加载较慢)
self._warmup()
def _warmup(self):
"""预热模型,减少首次推理延迟。"""
print("正在预热 Smart Turn 模型...")
dummy_audio = np.zeros(RATE, dtype=np.float32) # 1 秒静音
predict_endpoint(dummy_audio)
print("Smart Turn 模型预热完成。")
def submit_async(self, audio_segment: np.ndarray) -> Future:
"""异步提交推理任务。"""
with self._lock:
# 取消之前的任务(如果还在运行)
if self.current_future and not self.current_future.done():
self.current_future.cancel()
audio_hash = hash(audio_segment.tobytes()[-8000:]) # 只用最后 0.5 秒做哈希
self.last_audio_hash = audio_hash
self.current_future = self.executor.submit(self._run_inference, audio_segment, audio_hash)
return self.current_future
def _run_inference(self, audio_segment: np.ndarray, audio_hash: int) -> dict:
"""执行推理。"""
t0 = time.perf_counter()
result = predict_endpoint(audio_segment)
result['inference_time_ms'] = (time.perf_counter() - t0) * 1000.0
result['audio_hash'] = audio_hash
with self._lock:
self.last_result = result
return result
def get_result_if_ready(self) -> Optional[dict]:
"""非阻塞获取结果(如果已完成)。"""
with self._lock:
if self.current_future and self.current_future.done():
try:
return self.current_future.result(timeout=0)
except Exception:
return None
return None
def get_result_blocking(self, timeout: float = 0.5) -> Optional[dict]:
"""阻塞等待结果。"""
with self._lock:
future = self.current_future
if future:
try:
return future.result(timeout=timeout)
except Exception:
return None
return None
def shutdown(self):
"""关闭线程池。"""
self.executor.shutdown(wait=False)
class DynamicEndpointDetector:
"""动态端点检测器,实现智能静音判断策略。"""
def __init__(self):
self.chunk_ms = (CHUNK / RATE) * 1000.0
# 转换为 chunk 数量
self.early_check_chunks = math.ceil(EARLY_CHECK_MS / self.chunk_ms)
self.base_interval_chunks = math.ceil(CHECK_INTERVAL_MS / self.chunk_ms)
self.min_interval_chunks = math.ceil(MIN_CHECK_INTERVAL_MS / self.chunk_ms)
self.max_stop_chunks = math.ceil(MAX_STOP_MS / self.chunk_ms)
# 状态
self.last_check_silence_chunks = 0
self.last_probability = 0.0
self.check_count = 0
self.pending_inference = False
def reset(self):
"""重置检测状态。"""
self.last_check_silence_chunks = 0
self.last_probability = 0.0
self.check_count = 0
self.pending_inference = False
def get_dynamic_interval(self) -> int:
"""根据上次概率动态计算检测间隔(chunk 数量)。"""
if self.last_probability >= HIGH_CONFIDENCE:
# 高置信度:更频繁检测
return self.min_interval_chunks
elif self.last_probability >= MEDIUM_CONFIDENCE:
# 中等置信度:正常间隔
return self.base_interval_chunks
else:
# 低置信度:稍长间隔
return int(self.base_interval_chunks * 1.5)
def should_check(self, trailing_silence_chunks: int) -> bool:
"""判断是否应该进行端点检测。"""
if self.pending_inference:
return False
# 首次检测
if self.check_count == 0 and trailing_silence_chunks >= self.early_check_chunks:
return True
# 后续检测:基于动态间隔
if self.check_count > 0:
chunks_since_last = trailing_silence_chunks - self.last_check_silence_chunks
interval = self.get_dynamic_interval()
if chunks_since_last >= interval:
return True
return False
def should_force_end(self, trailing_silence_chunks: int, since_trigger_chunks: int, max_chunks: int) -> bool:
"""判断是否强制结束。"""
return (trailing_silence_chunks >= self.max_stop_chunks or
since_trigger_chunks >= max_chunks)
def on_check_started(self, trailing_silence_chunks: int):
"""记录检测开始。"""
self.pending_inference = True
def on_check_completed(self, trailing_silence_chunks: int, probability: float):
"""记录检测完成。"""
self.last_check_silence_chunks = trailing_silence_chunks
self.last_probability = probability
self.check_count += 1
self.pending_inference = False
def should_end_by_confidence(self, probability: float, trailing_silence_chunks: int) -> bool:
"""基于置信度判断是否应该结束。"""
silence_ms = trailing_silence_chunks * self.chunk_ms
if probability >= HIGH_CONFIDENCE:
# 高置信度:200ms 静音即可结束
return silence_ms >= EARLY_CHECK_MS
elif probability >= MEDIUM_CONFIDENCE:
# 中等置信度:需要更多静音确认
required_ms = EARLY_CHECK_MS + (HIGH_CONFIDENCE - probability) * 500
return silence_ms >= required_ms
else:
# 低置信度:继续等待
return False
def ensure_model(path: str = ONNX_MODEL_PATH, url: str = ONNX_MODEL_URL) -> str:
if not os.path.exists(path):
print("正在下载 Silero VAD ONNX 模型...")
urllib.request.urlretrieve(url, path)
print("ONNX 模型下载完成。")
return path
def record_and_predict():
"""主录音和预测循环。"""
# 计算派生参数
chunk_ms = (CHUNK / RATE) * 1000.0
pre_chunks = math.ceil(PRE_SPEECH_MS / chunk_ms)
max_chunks = math.ceil(MAX_DURATION_SECONDS / (CHUNK / RATE))
print(f"配置: 早期检测={EARLY_CHECK_MS}ms, 检测间隔={CHECK_INTERVAL_MS}ms, 最大静音={MAX_STOP_MS}ms")
print(f"置信度阈值: 高={HIGH_CONFIDENCE}, 中={MEDIUM_CONFIDENCE}, 低={LOW_CONFIDENCE}")
# 初始化组件
vad = SileroVAD(ensure_model())
detector = DynamicEndpointDetector()
smart_turn = AsyncSmartTurnDetector()
# 语音前环形缓冲区
pre_buffer = deque(maxlen=pre_chunks)
# 状态变量
segment = []
speech_active = False
trailing_silence = 0
since_trigger_chunks = 0
# 初始化音频流
pa = pyaudio.PyAudio()
stream = pa.open(
format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
frames_per_buffer=CHUNK,
)
print("正在监听语音...(按 Ctrl+C 停止)")
try:
while True:
# 读取音频块
data = stream.read(CHUNK, exception_on_overflow=False)
int16 = np.frombuffer(data, dtype=np.int16)
f32 = (int16.astype(np.float32)) / 32768.0
# VAD 检测
is_speech = vad.prob(f32) > VAD_THRESHOLD
if not speech_active:
# 等待语音开始
pre_buffer.append(f32)
if is_speech:
# 触发:开始新的音频段
segment = list(pre_buffer)
segment.append(f32)
speech_active = True
trailing_silence = 0
since_trigger_chunks = 1
detector.reset()
print("[语音] 🎤 开始语音")
else:
# 已在音频段中
segment.append(f32)
since_trigger_chunks += 1
if is_speech:
# 检测到语音:重置静音计数和检测状态
if trailing_silence > 0:
silence_duration_ms = trailing_silence * chunk_ms
print(f"[语音] 🔄 恢复语音(静音了 {silence_duration_ms:.0f} 毫秒)")
detector.reset() # 语音恢复,重置检测状态
trailing_silence = 0
else:
trailing_silence += 1
# 检查是否强制结束
if detector.should_force_end(trailing_silence, since_trigger_chunks, max_chunks):
stream.stop_stream()
audio_segment = np.concatenate(segment, dtype=np.float32)
final_silence_ms = trailing_silence * chunk_ms
# 同步获取最终结果
result = smart_turn.get_result_blocking(timeout=0.1)
if result is None:
result = predict_endpoint(audio_segment)
_process_segment(audio_segment, result, "强制结束", final_silence_ms)
_reset_state(segment, pre_buffer, detector)
speech_active = False
trailing_silence = 0
since_trigger_chunks = 0
stream.start_stream()
print("正在监听语音...")
continue
# 检查是否应该进行端点检测
if detector.should_check(trailing_silence):
detector.on_check_started(trailing_silence)
# 异步提交推理
audio_segment = np.concatenate(segment, dtype=np.float32)
smart_turn.submit_async(audio_segment.copy())
# 检查异步推理结果
result = smart_turn.get_result_if_ready()
if result and detector.pending_inference:
prob = result.get("probability", 0)
inference_time = result.get("inference_time_ms", 0)
detector.on_check_completed(trailing_silence, prob)
if DEBUG_LOG:
silence_ms = trailing_silence * chunk_ms
print(f"[检测 #{detector.check_count}] 静音={silence_ms:.0f}ms, "
f"概率={prob:.3f}, 推理={inference_time:.1f}ms")
# 基于置信度判断是否结束
if detector.should_end_by_confidence(prob, trailing_silence):
stream.stop_stream()
audio_segment = np.concatenate(segment, dtype=np.float32)
final_silence_ms = trailing_silence * chunk_ms
_process_segment(audio_segment, result, "置信度判断", final_silence_ms)
_reset_state(segment, pre_buffer, detector)
speech_active = False
trailing_silence = 0
since_trigger_chunks = 0
stream.start_stream()
print("正在监听语音...")
except KeyboardInterrupt:
print("\n正在停止...")
finally:
stream.stop_stream()
stream.close()
pa.terminate()
smart_turn.shutdown()
def _reset_state(segment: list, pre_buffer: deque, detector: DynamicEndpointDetector):
"""重置状态。"""
segment.clear()
pre_buffer.clear()
detector.reset()
def _process_segment(segment_audio_f32: np.ndarray, result: dict, end_reason: str = "", silence_ms: float = 0):
"""处理完成的音频段。"""
if segment_audio_f32.size == 0:
print("捕获到空的音频段,跳过。")
return
if DEBUG_SAVE_WAV:
wavfile.write(TEMP_OUTPUT_WAV, RATE, (segment_audio_f32 * 32767.0).astype(np.int16))
dur_sec = segment_audio_f32.size / RATE
pred = result.get("prediction", 0)
prob = result.get("probability", float("nan"))
inference_time = result.get("inference_time_ms", 0)
print("=" * 40)
print(f"音频段时长:{dur_sec:.2f} 秒")
print(f"结束原因:{end_reason}(静音 {silence_ms:.0f} 毫秒)")
print(f"预测结果:{'✅ 表达完整' if pred == 1 else '❌ 表达未完整'}")
print(f"完整概率:{prob:.4f}")
if inference_time > 0:
print(f"推理耗时:{inference_time:.2f} 毫秒")
print("=" * 40)
if __name__ == "__main__":
record_and_predict()原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。