首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >Smart Turn v3.1 实战入门

Smart Turn v3.1 实战入门

原创
作者头像
buzzfrog
发布2025-12-07 13:45:42
发布2025-12-07 13:45:42
650
举报
文章被收录于专栏:云上修行云上修行

一、问题背景:从 VAD 到“智能轮次结束”

在语音对话系统中,“什么时候开始说话”和“什么时候停止说话”决定了交互的流畅度。传统系统大量依赖 VAD(Voice Activity Detection,语音活动检测)来区分“有声 / 无声”,然后在静音处粗略认为“用户说完了”。这种做法存在两个典型问题:

  • 用户尚未说完就被打断(过早响应)
  • 用户说完后长时间静音,系统才开始回复(响应滞后)

Smart Turn v3.1 针对的是“轮次结束检测(turn end detection)”这一更高层次问题:在已有 VAD 分段的基础上,判断当前这一段语音是否已经表达完整、是否应触发系统应答。相较于“仅有 VAD”,Smart Turn 利用了语音的时序模式和内容线索,使得轮次结束判断更加接近人的直觉。

在本项目中,Smart Turn v3.1 已以 ONNX 形式提供,并通过脚本 record_and_predict.py (原始的升级后的版本,详见附录)实现了一个从麦克风实时采集、VAD 分段、模型推理到日志打印的端到端 Demo,非常适合作为实战入门。


二、整体架构与数据流概览

record_and_predict.py 为核心,Smart Turn v3.1 的本地实时推理流程可以概括为:

  1. 音频采集 使用 PyAudio 从系统默认麦克风采集 16 kHz、单声道、int16 PCM 音频,以固定块大小 CHUNK = 512 连续读取。
  2. VAD 语音检测(Silero VAD) 对每个块调用 Silero VAD ONNX 模型,输出当前块为“语音”的概率,并与阈值 VAD_THRESHOLD 比较,得到布尔值 is_speech
    • True:认为当前块属于语音
    • False:认为当前块属于静音
  3. 语音段聚合与静音跟踪
    • 当检测到从静音切换到语音时,启动一个新的“语音段”,并把最近 PRE_SPEECH_MS 时间内的音频一并纳入,避免剪掉起始音。
    • 在语音段内部,持续记录音频块,并对“当前连续静音时长”(以块数换算成毫秒)进行跟踪。
  4. 动态端点检测(DynamicEndpointDetector) 使用一个专门的“动态端点检测器”根据静音时长和模型最近一次输出的概率,决定:
    • 何时发起一次 Smart Turn 推理
    • 是否已达到“应当结束本轮”的条件
    • 是否因静音或时长过长而“强制结束”
  5. Smart Turn 推理(AsyncSmartTurnDetector + predict_endpoint 每当判定需要检测时:
    • 将当前完整语音段复制出来,作为 float32 数组([-1, 1])
    • 通过 AsyncSmartTurnDetector 在后台线程调用 predict_endpoint(audio_array)
    • predict_endpoint 会:
      • 截断/填充音频到 8 秒
      • 使用 Whisper 特征抽取器生成模型输入特征
      • 调用 ONNX Runtime 执行 Smart Turn v3.1 推理
      • 返回 prediction(0/1)和 probability(表达完整的概率)
  6. 日志与重置 每一轮结束时,_process_segment() 负责打印本段的统计信息,包括:
    • 段时长、结束原因(置信度判断 / 强制结束)、预测结果及概率、推理耗时undefined然后通过 _reset_state() 重置内部状态,等待下一轮语音段。

三、环境准备与依赖安装

1. Python 虚拟环境

建议使用 Python 3.10+,并在项目目录创建虚拟环境:

代码语言:bash
复制
cd /Users/huyiyang/Workspace/smart-turn

python3.10 -m venv venv
source venv/bin/activate

pip install -r requirements.txt

如只进行推理实验,也可以基于 requirements_inference.txt 安装精简依赖,但运行 record_and_predict.py 需要包含 PyAudio、ONNX Runtime、NumPy 等组件。

2. PortAudio 与 PyAudio

record_and_predict.py 依赖 PyAudio,后者在安装和运行时依赖系统级 PortAudio 库。以 macOS + Homebrew 为例:

代码语言:bash
复制
brew install portaudio
pip install pyaudio

如果使用 pip install -r requirements.txt,确保在此之前已完成 PortAudio 的系统安装。

3. ONNX 模型文件

本项目中已包含:

  • silero_vad.onnx:Silero VAD 模型
  • smart-turn-v3.1.onnx:Smart Turn v3.1 ONNX 权重

其中 VAD 模型在缺失时会由 ensure_model() 自动从 GitHub 下载;Smart Turn ONNX 则需提前放置在仓库根目录(同脚本目录)或调整 inference.py 中的模型路径。


四、核心脚本 record_and_predict.py 结构解析

1. 基础配置与 VAD 模型封装

脚本开头设置了一系列基础常量:

  • 采样配置
    • RATE = 16000:采样率 16 kHz
    • CHUNK = 512:每次从设备读取 512 个样本(约 32 ms)
    • CHANNELS = 1:单声道
  • VAD 参数
    • VAD_THRESHOLD = 0.5:语音概率高于 0.5 判定为“语音”
    • PRE_SPEECH_MS = 200:在触发语音开始前保留 200 ms 的历史音频

SileroVAD 类对 VAD ONNX 模型进行了封装,主要特性包括:

  • 使用 ONNX Runtime 加载 silero_vad.onnx,限制线程数,避免干扰主业务。
  • 维护 _state_context 以保证跨块连续性。
  • 每次 prob(chunk_f32) 接收长度为 CHUNKfloat32 数组,返回当前块的语音概率,并在必要时重置内部状态(MODEL_RESET_STATES_TIME 控制重置间隔)。

2. 异步 Smart Turn 推理器:AsyncSmartTurnDetector

AsyncSmartTurnDetector 负责把 Smart Turn 推理从主循环中“解耦”出来:

  • 初始化时调用 _warmup()
    • 使用 1 秒静音数据调用一次 predict_endpoint(),完成模型加载与图优化的预热。
  • submit_async(audio_segment)
    • 在内部线程池中提交一个任务,执行 _run_inference()
      • 调用 predict_endpoint(audio_segment)
      • 测量推理耗时(毫秒)
      • 生成结果字典:包含 predictionprobabilityinference_time_ms 以及一个简单的 audio_hash 标识。
  • get_result_if_ready()get_result_blocking() 分别提供了非阻塞与带超时阻塞的结果获取方式,便于主循环在不同场景下选择使用。

这种设计兼顾了两个目标:

  • 首次推理延迟被预热摊薄,用户启动脚本后立即说话时不会遭遇“首轮明显卡顿”。
  • Smart Turn 的计算在后台进行,不会阻塞音频读取和静音检测逻辑。

3. 动态端点检测器:DynamicEndpointDetector

DynamicEndpointDetector 是整个脚本体现“工程策略”的关键部分,其职责是:根据静音长度与上次模型置信度,动态决定检测频率与结束条件

其核心机制包括:

  • 把若干时间参数(如 EARLY_CHECK_MSCHECK_INTERVAL_MSMIN_CHECK_INTERVAL_MSMAX_STOP_MS)统一转换为“块数”,方便与采集循环对接。
  • 根据最近一次概率 last_probability 计算“下一次允许检测前需要等待的块数”(get_dynamic_interval()):
    • 高置信度(≥ HIGH_CONFIDENCE):检测间隔缩短,允许更频繁尝试结束
    • 中等置信度:使用基础间隔
    • 低置信度:适当拉长检测间隔,避免过于频繁地调用 Smart Turn
  • should_check(trailing_silence_chunks)
    • 控制何时触发一次新检测(首次检测与后续检测逻辑略有差异)。
  • should_force_end(trailing_silence_chunks, since_trigger_chunks, max_chunks)
    • 一旦静音时长或总块数超过上限,直接“强制结束”当前轮次,避免无限拖延。
  • should_end_by_confidence(probability, trailing_silence_chunks)
    • 根据当前概率与静音时长综合判断是否可以结束。
    • 高频段:较少静音即可结束;中概率段:需要更多静音来确认;低概率段:倾向继续等待。

通过这套机制,系统在“用户停顿较短、模型信心很高”的情况下可以快速响应;在“不确定用户是否说完”的情况下,则倾向保守,避免打断。

4. 主循环:record_and_predict()

record_and_predict() 是整个 Demo 的核心控制逻辑,其主要步骤为:

  1. 参数派生与组件初始化
    • 计算 chunk_mspre_chunksmax_chunks 等派生量。
    • 初始化 SileroVADDynamicEndpointDetectorAsyncSmartTurnDetector
    • 创建前置环形缓冲区 pre_buffer,用于存放触发前的若干块音频。
    • 初始化 PyAudio,打开输入流。
  2. 主循环:逐块读取与状态机控制: 每次循环:
  • 从音频流中读取一块 CHUNK 大小的 int16 数据,并转换为 float32
  • 调用 VAD 得到 is_speech

根据当前是否处于“语音段内”分为两种情况:

  • 尚未进入语音段
    • 持续把块放入 pre_buffer
    • 一旦 is_speech 为真,创建新语音段:
      • segment = list(pre_buffer) + [当前块]
      • speech_active = True,重置静音与计数,重置检测器状态。
  • 已处于语音段内
    • 将块追加到 segment,递增 since_trigger_chunks
    • is_speech 为真:
      • 若此前存在静音,打印“恢复语音”日志,并重置检测器。
      • trailing_silence 置零。
    • is_speech 为假:
      • 递增 trailing_silence

随后依次执行:

  • 强制结束判断
    • 调用 detector.should_force_end(...)
    • 若为真:
      • 停止音频流;
      • 合并当前 segment 为连续数组 audio_segment
      • 尝试通过 get_result_blocking() 获取 Smart Turn 结果,若失败则同步调用 predict_endpoint(audio_segment)
      • 调用 _process_segment() 打印日志;
      • 重置状态并重启音频流。
  • 端点检测触发与异步推理
    • detector.should_check(trailing_silence) 为真:
      • 记录检测开始(on_check_started);
      • 合并 segment 并调用 smart_turn.submit_async(audio_segment.copy())
  • 检测异步推理结果
    • 调用 smart_turn.get_result_if_ready() 检查是否有新结果。
    • 若有且当前处于“检测中”状态:
      • 更新检测器状态(on_check_completed);
      • 若开启 DEBUG_LOG,打印检测次数、静音时长、概率、推理耗时等信息。
      • 调用 should_end_by_confidence() 决定是否结束:
        • 若需要结束:停止流、合并音频、调用 _process_segment()、重置状态并重启流。
  1. 异常与资源回收
    • 通过 KeyboardInterrupt 友好退出,关闭音频流并释放资源。
    • 调用 smart_turn.shutdown() 关闭线程池。

整体而言,record_and_predict.py 实现了一个完整的实时语音 → 智能轮次结束判断 → 日志展示闭环,足以作为集成 Smart Turn v3.1 到生产系统前的“实验台”。


五、实战:如何运行与观察效果

在依赖安装完毕(包括 PortAudio / PyAudio、ONNX Runtime、NumPy、Transformers 等)后,只需运行:

代码语言:bash
复制
python record_and_predict.py

脚本启动后会:

  • 如有必要,下载 Silero VAD ONNX 模型;
  • 预热 Smart Turn v3.1 模型;
  • 进入“正在监听语音…”状态。

此时,你可以尝试以下话术来观察模型表现:

  • 半句停顿、不说完:
    • “我想退一下这个订单,但是……”(故意不补充原因)
  • 完整表达:
    • “我想退一下这个订单,因为收到的商品和描述不一致。”

每次语音段结束后,控制台会打印:

  • 段时长(秒)
  • 结束原因(如“置信度判断”“强制结束”)
  • 预测结果(“表达完整”/“表达未完整”)
  • 完整概率及推理耗时

对比你的主观感受与模型判断,可以直观评估当前阈值和策略是否适合你的目标场景。


六、参数调优与集成建议

1. 参数调优思路

在不同业务场景中,可以围绕以下几个参数进行调优:

  • VAD_THRESHOLD
    • 较低:提高对弱语音的敏感性,但更易受噪声干扰。
    • 较高:更干净的语音段,但可能漏检小声对话。
  • EARLY_CHECK_MSMAX_STOP_MS
    • 客服/聊天场景:可以适当增加 EARLY_CHECK_MS,减少过早检测;同时控制 MAX_STOP_MS,防止长时间静音。
    • 指令式交互:可以减小 EARLY_CHECK_MSMAX_STOP_MS,以追求更快响应。
  • HIGH_CONFIDENCE / MEDIUM_CONFIDENCE / LOW_CONFIDENCE
    • 若模型经常“过早结束”,可以提高 HIGH_CONFIDENCE 或增加中等置信度下所需静音时长。
    • 若系统整体显得“响应迟缓”,则可以适当降低阈值或缩短对应静音时间。

建议在本地环境中多轮对话测试,通过观测日志中概率与结束原因,逐步收敛到适合你应用场景的一组参数组合。

2. 集成到现有系统的典型方式

在已有语音系统中引入 Smart Turn v3.1 时,推荐的工程模式是:

  • 保持现有 VAD 作为“语音段切分器”
  • 在判定“本轮可能结束”的静音节点上,对当前整段语音调用 Smart Turn(可以仿照 record_and_predict.py 的数据处理与 predict_endpoint 调用逻辑);
  • 根据 probability 与业务阈值决定是否触发系统回复,或继续等待更多语音。

record_and_predict.py 提供了一套从麦克风采集到策略决策的参考实现,你可以按以下层次拆分并迁移到自己的代码库中:

  • 底层音频与 VAD 封装(音频流 + Silero VAD)
  • Smart Turn 模型封装(inference.py / predict_endpoint
  • 策略层(静音判定 + 动态检测间隔 + 置信度驱动的结束逻辑)

七、总结

  • Smart Turn v3.1 通过对连续语音片段进行二分类预测,解决了基于 VAD 的轮次结束检测不稳定、容易打断或延迟的问题。
  • 本项目中的 record_and_predict.py 提供了一个 仅依赖麦克风输入的完整实战 Demo:包括 Silero VAD、动态端点检测策略、Smart Turn ONNX 推理与异步调度。
  • 通过适当调节 VAD 阈值、静音时长、置信度策略等参数,可以在“响应速度”和“避免打断”之间找到适合具体业务场景的平衡点。

在掌握并跑通 record_and_predict.py 的基础上,你可以逐步将 Smart Turn v3.1 集成到自己的语音对话系统中,将“何时说话”这一能力提升到更接近人类直觉的水平。


附录

代码语言:python
复制
# record_and_predict.py 
import os
import time
import math
import urllib.request
import threading
from collections import deque
from concurrent.futures import ThreadPoolExecutor, Future
from typing import Optional

import numpy as np
import pyaudio
from scipy.io import wavfile
import onnxruntime as ort

from inference import predict_endpoint

# --- 基础配置(固定 16 kHz 单声道,512 样本块)---
RATE = 16000
CHUNK = 512                     # Silero VAD 在 16 kHz 下需要 512 个样本
FORMAT = pyaudio.paInt16
CHANNELS = 1

# --- VAD 配置 ---
VAD_THRESHOLD = 0.5             # 语音概率阈值
PRE_SPEECH_MS = 200             # 触发前保留的毫秒数

# --- 动态端点检测配置 ---
EARLY_CHECK_MS = 80            # 静音后多久开始第一次检测
CHECK_INTERVAL_MS = 150         # 基础检测间隔
MIN_CHECK_INTERVAL_MS = 80      # 高置信度时的最小检测间隔
MAX_STOP_MS = 1500              # 最大静音等待时间(兜底)
MAX_DURATION_SECONDS = 8        # 每段音频的最大时长上限

# --- 置信度阈值 ---
HIGH_CONFIDENCE = 0.70          # 高置信度阈值,可立即结束
MEDIUM_CONFIDENCE = 0.50        # 中等置信度
LOW_CONFIDENCE = 0.30           # 低置信度,需继续等待

# --- 调试配置 ---
DEBUG_SAVE_WAV = False
TEMP_OUTPUT_WAV = "temp_output.wav"
DEBUG_LOG = True                # 是否打印检测日志

# --- Silero ONNX 模型 ---
ONNX_MODEL_URL = (
    "https://github.com/snakers4/silero-vad/raw/master/src/silero_vad/data/silero_vad.onnx"
)
ONNX_MODEL_PATH = "silero_vad.onnx"
MODEL_RESET_STATES_TIME = 5.0


class SileroVAD:
    """Silero VAD ONNX 封装类,适用于 16 kHz 单声道,块大小为 512。"""

    def __init__(self, model_path: str):
        opts = ort.SessionOptions()
        opts.inter_op_num_threads = 1
        opts.intra_op_num_threads = 1
        self.session = ort.InferenceSession(
            model_path, providers=["CPUExecutionProvider"], sess_options=opts
        )
        self.context_size = 64
        self._state = None
        self._context = None
        self._last_reset_time = time.time()
        self._init_states()

    def _init_states(self):
        self._state = np.zeros((2, 1, 128), dtype=np.float32)
        self._context = np.zeros((1, self.context_size), dtype=np.float32)

    def maybe_reset(self):
        if (time.time() - self._last_reset_time) >= MODEL_RESET_STATES_TIME:
            self._init_states()
            self._last_reset_time = time.time()

    def prob(self, chunk_f32: np.ndarray) -> float:
        """计算一个长度为 512 的音频块的语音概率。"""
        x = np.reshape(chunk_f32, (1, -1))
        if x.shape[1] != CHUNK:
            raise ValueError(f"期望 {CHUNK} 个样本,实际得到 {x.shape[1]}")
        x = np.concatenate((self._context, x), axis=1)

        ort_inputs = {
            "input": x.astype(np.float32),
            "state": self._state,
            "sr": np.array(16000, dtype=np.int64)
        }
        out, self._state = self.session.run(None, ort_inputs)
        self._context = x[:, -self.context_size:]
        self.maybe_reset()

        return float(out[0][0])


class AsyncSmartTurnDetector:
    """异步 Smart Turn 检测器,支持预热和异步推理。"""

    def __init__(self, max_workers: int = 2):
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
        self.current_future: Optional[Future] = None
        self.last_result: Optional[dict] = None
        self.last_audio_hash: Optional[int] = None
        self._lock = threading.Lock()

        # 预热模型(首次加载较慢)
        self._warmup()

    def _warmup(self):
        """预热模型,减少首次推理延迟。"""
        print("正在预热 Smart Turn 模型...")
        dummy_audio = np.zeros(RATE, dtype=np.float32)  # 1 秒静音
        predict_endpoint(dummy_audio)
        print("Smart Turn 模型预热完成。")

    def submit_async(self, audio_segment: np.ndarray) -> Future:
        """异步提交推理任务。"""
        with self._lock:
            # 取消之前的任务(如果还在运行)
            if self.current_future and not self.current_future.done():
                self.current_future.cancel()

            audio_hash = hash(audio_segment.tobytes()[-8000:])  # 只用最后 0.5 秒做哈希
            self.last_audio_hash = audio_hash
            self.current_future = self.executor.submit(self._run_inference, audio_segment, audio_hash)
            return self.current_future

    def _run_inference(self, audio_segment: np.ndarray, audio_hash: int) -> dict:
        """执行推理。"""
        t0 = time.perf_counter()
        result = predict_endpoint(audio_segment)
        result['inference_time_ms'] = (time.perf_counter() - t0) * 1000.0
        result['audio_hash'] = audio_hash

        with self._lock:
            self.last_result = result

        return result

    def get_result_if_ready(self) -> Optional[dict]:
        """非阻塞获取结果(如果已完成)。"""
        with self._lock:
            if self.current_future and self.current_future.done():
                try:
                    return self.current_future.result(timeout=0)
                except Exception:
                    return None
            return None

    def get_result_blocking(self, timeout: float = 0.5) -> Optional[dict]:
        """阻塞等待结果。"""
        with self._lock:
            future = self.current_future

        if future:
            try:
                return future.result(timeout=timeout)
            except Exception:
                return None
        return None

    def shutdown(self):
        """关闭线程池。"""
        self.executor.shutdown(wait=False)


class DynamicEndpointDetector:
    """动态端点检测器,实现智能静音判断策略。"""

    def __init__(self):
        self.chunk_ms = (CHUNK / RATE) * 1000.0

        # 转换为 chunk 数量
        self.early_check_chunks = math.ceil(EARLY_CHECK_MS / self.chunk_ms)
        self.base_interval_chunks = math.ceil(CHECK_INTERVAL_MS / self.chunk_ms)
        self.min_interval_chunks = math.ceil(MIN_CHECK_INTERVAL_MS / self.chunk_ms)
        self.max_stop_chunks = math.ceil(MAX_STOP_MS / self.chunk_ms)

        # 状态
        self.last_check_silence_chunks = 0
        self.last_probability = 0.0
        self.check_count = 0
        self.pending_inference = False

    def reset(self):
        """重置检测状态。"""
        self.last_check_silence_chunks = 0
        self.last_probability = 0.0
        self.check_count = 0
        self.pending_inference = False

    def get_dynamic_interval(self) -> int:
        """根据上次概率动态计算检测间隔(chunk 数量)。"""
        if self.last_probability >= HIGH_CONFIDENCE:
            # 高置信度:更频繁检测
            return self.min_interval_chunks
        elif self.last_probability >= MEDIUM_CONFIDENCE:
            # 中等置信度:正常间隔
            return self.base_interval_chunks
        else:
            # 低置信度:稍长间隔
            return int(self.base_interval_chunks * 1.5)

    def should_check(self, trailing_silence_chunks: int) -> bool:
        """判断是否应该进行端点检测。"""
        if self.pending_inference:
            return False

        # 首次检测
        if self.check_count == 0 and trailing_silence_chunks >= self.early_check_chunks:
            return True

        # 后续检测:基于动态间隔
        if self.check_count > 0:
            chunks_since_last = trailing_silence_chunks - self.last_check_silence_chunks
            interval = self.get_dynamic_interval()
            if chunks_since_last >= interval:
                return True

        return False

    def should_force_end(self, trailing_silence_chunks: int, since_trigger_chunks: int, max_chunks: int) -> bool:
        """判断是否强制结束。"""
        return (trailing_silence_chunks >= self.max_stop_chunks or
                since_trigger_chunks >= max_chunks)

    def on_check_started(self, trailing_silence_chunks: int):
        """记录检测开始。"""
        self.pending_inference = True

    def on_check_completed(self, trailing_silence_chunks: int, probability: float):
        """记录检测完成。"""
        self.last_check_silence_chunks = trailing_silence_chunks
        self.last_probability = probability
        self.check_count += 1
        self.pending_inference = False

    def should_end_by_confidence(self, probability: float, trailing_silence_chunks: int) -> bool:
        """基于置信度判断是否应该结束。"""
        silence_ms = trailing_silence_chunks * self.chunk_ms

        if probability >= HIGH_CONFIDENCE:
            # 高置信度:200ms 静音即可结束
            return silence_ms >= EARLY_CHECK_MS
        elif probability >= MEDIUM_CONFIDENCE:
            # 中等置信度:需要更多静音确认
            required_ms = EARLY_CHECK_MS + (HIGH_CONFIDENCE - probability) * 500
            return silence_ms >= required_ms
        else:
            # 低置信度:继续等待
            return False


def ensure_model(path: str = ONNX_MODEL_PATH, url: str = ONNX_MODEL_URL) -> str:
    if not os.path.exists(path):
        print("正在下载 Silero VAD ONNX 模型...")
        urllib.request.urlretrieve(url, path)
        print("ONNX 模型下载完成。")
    return path


def record_and_predict():
    """主录音和预测循环。"""
    # 计算派生参数
    chunk_ms = (CHUNK / RATE) * 1000.0
    pre_chunks = math.ceil(PRE_SPEECH_MS / chunk_ms)
    max_chunks = math.ceil(MAX_DURATION_SECONDS / (CHUNK / RATE))

    print(f"配置: 早期检测={EARLY_CHECK_MS}ms, 检测间隔={CHECK_INTERVAL_MS}ms, 最大静音={MAX_STOP_MS}ms")
    print(f"置信度阈值: 高={HIGH_CONFIDENCE}, 中={MEDIUM_CONFIDENCE}, 低={LOW_CONFIDENCE}")

    # 初始化组件
    vad = SileroVAD(ensure_model())
    detector = DynamicEndpointDetector()
    smart_turn = AsyncSmartTurnDetector()

    # 语音前环形缓冲区
    pre_buffer = deque(maxlen=pre_chunks)

    # 状态变量
    segment = []
    speech_active = False
    trailing_silence = 0
    since_trigger_chunks = 0

    # 初始化音频流
    pa = pyaudio.PyAudio()
    stream = pa.open(
        format=FORMAT,
        channels=CHANNELS,
        rate=RATE,
        input=True,
        frames_per_buffer=CHUNK,
    )

    print("正在监听语音...(按 Ctrl+C 停止)")

    try:
        while True:
            # 读取音频块
            data = stream.read(CHUNK, exception_on_overflow=False)
            int16 = np.frombuffer(data, dtype=np.int16)
            f32 = (int16.astype(np.float32)) / 32768.0

            # VAD 检测
            is_speech = vad.prob(f32) > VAD_THRESHOLD

            if not speech_active:
                # 等待语音开始
                pre_buffer.append(f32)
                if is_speech:
                    # 触发:开始新的音频段
                    segment = list(pre_buffer)
                    segment.append(f32)
                    speech_active = True
                    trailing_silence = 0
                    since_trigger_chunks = 1
                    detector.reset()
                    print("[语音] 🎤 开始语音")
            else:
                # 已在音频段中
                segment.append(f32)
                since_trigger_chunks += 1

                if is_speech:
                    # 检测到语音:重置静音计数和检测状态
                    if trailing_silence > 0:
                        silence_duration_ms = trailing_silence * chunk_ms
                        print(f"[语音] 🔄 恢复语音(静音了 {silence_duration_ms:.0f} 毫秒)")
                        detector.reset()  # 语音恢复,重置检测状态
                    trailing_silence = 0
                else:
                    trailing_silence += 1

                # 检查是否强制结束
                if detector.should_force_end(trailing_silence, since_trigger_chunks, max_chunks):
                    stream.stop_stream()
                    audio_segment = np.concatenate(segment, dtype=np.float32)
                    final_silence_ms = trailing_silence * chunk_ms

                    # 同步获取最终结果
                    result = smart_turn.get_result_blocking(timeout=0.1)
                    if result is None:
                        result = predict_endpoint(audio_segment)

                    _process_segment(audio_segment, result, "强制结束", final_silence_ms)
                    _reset_state(segment, pre_buffer, detector)
                    speech_active = False
                    trailing_silence = 0
                    since_trigger_chunks = 0
                    stream.start_stream()
                    print("正在监听语音...")
                    continue

                # 检查是否应该进行端点检测
                if detector.should_check(trailing_silence):
                    detector.on_check_started(trailing_silence)

                    # 异步提交推理
                    audio_segment = np.concatenate(segment, dtype=np.float32)
                    smart_turn.submit_async(audio_segment.copy())

                # 检查异步推理结果
                result = smart_turn.get_result_if_ready()
                if result and detector.pending_inference:
                    prob = result.get("probability", 0)
                    inference_time = result.get("inference_time_ms", 0)
                    detector.on_check_completed(trailing_silence, prob)

                    if DEBUG_LOG:
                        silence_ms = trailing_silence * chunk_ms
                        print(f"[检测 #{detector.check_count}] 静音={silence_ms:.0f}ms, "
                              f"概率={prob:.3f}, 推理={inference_time:.1f}ms")

                    # 基于置信度判断是否结束
                    if detector.should_end_by_confidence(prob, trailing_silence):
                        stream.stop_stream()
                        audio_segment = np.concatenate(segment, dtype=np.float32)
                        final_silence_ms = trailing_silence * chunk_ms
                        _process_segment(audio_segment, result, "置信度判断", final_silence_ms)
                        _reset_state(segment, pre_buffer, detector)
                        speech_active = False
                        trailing_silence = 0
                        since_trigger_chunks = 0
                        stream.start_stream()
                        print("正在监听语音...")

    except KeyboardInterrupt:
        print("\n正在停止...")
    finally:
        stream.stop_stream()
        stream.close()
        pa.terminate()
        smart_turn.shutdown()


def _reset_state(segment: list, pre_buffer: deque, detector: DynamicEndpointDetector):
    """重置状态。"""
    segment.clear()
    pre_buffer.clear()
    detector.reset()


def _process_segment(segment_audio_f32: np.ndarray, result: dict, end_reason: str = "", silence_ms: float = 0):
    """处理完成的音频段。"""
    if segment_audio_f32.size == 0:
        print("捕获到空的音频段,跳过。")
        return

    if DEBUG_SAVE_WAV:
        wavfile.write(TEMP_OUTPUT_WAV, RATE, (segment_audio_f32 * 32767.0).astype(np.int16))

    dur_sec = segment_audio_f32.size / RATE
    pred = result.get("prediction", 0)
    prob = result.get("probability", float("nan"))
    inference_time = result.get("inference_time_ms", 0)

    print("=" * 40)
    print(f"音频段时长:{dur_sec:.2f} 秒")
    print(f"结束原因:{end_reason}(静音 {silence_ms:.0f} 毫秒)")
    print(f"预测结果:{'✅ 表达完整' if pred == 1 else '❌ 表达未完整'}")
    print(f"完整概率:{prob:.4f}")
    if inference_time > 0:
        print(f"推理耗时:{inference_time:.2f} 毫秒")
    print("=" * 40)


if __name__ == "__main__":
    record_and_predict()

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、问题背景:从 VAD 到“智能轮次结束”
  • 二、整体架构与数据流概览
  • 三、环境准备与依赖安装
    • 1. Python 虚拟环境
    • 2. PortAudio 与 PyAudio
    • 3. ONNX 模型文件
  • 四、核心脚本 record_and_predict.py 结构解析
    • 1. 基础配置与 VAD 模型封装
    • 2. 异步 Smart Turn 推理器:AsyncSmartTurnDetector
    • 3. 动态端点检测器:DynamicEndpointDetector
    • 4. 主循环:record_and_predict()
  • 五、实战:如何运行与观察效果
  • 六、参数调优与集成建议
    • 1. 参数调优思路
    • 2. 集成到现有系统的典型方式
  • 七、总结
  • 附录
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档