前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Python 语音录制与识别

Python 语音录制与识别

原创
作者头像
dandelion1990
发布2024-03-22 18:48:24
2630
发布2024-03-22 18:48:24
举报
文章被收录于专栏:dandelion1990的专栏

本文介绍一些 Python 中常用的语音能力的包,以及如何通过调用云服务商的 API 进行语音识别

录音

主要使用 pyaudio 包,它可以以字节流的方式录制/播放音频

安装:

代码语言:bash
复制
pip install pyaudio

列出可以录音的设备

代码语言:python
代码运行次数:0
复制
import pyaudio

p = pyaudio.PyAudio()

# Get the number of audio I/O devices
devices = p.get_device_count()

for i in range(devices):
    device_info = p.get_device_info_by_index(i)
    if device_info.get('maxInputChannels') > 0:
		print(f"{device_info.get('index')}: {device_info.get('name')}")

开始录音 5 秒,这里将录到的音频存到一个 io.BytesIO 对象中

代码语言:python
代码运行次数:0
复制
import io

FORMAT = pyaudio.paInt16 # format of audio samples
CHANNELS = 1 # audio channels(1: mono, 2: stereo)
RATE=44100 # sample rate
CHUNK=1024 # number of frames per buffer
RECORD_SECONDS = 5

p = pyaudio.PyAudio()
stream = p.open(
    format=FORMAT, # format of audio samples
    channels=CHANNELS, # audio channels(1: mono, 2: stereo)
    rate=RATE, # sample rate
    frames_per_buffer=CHUNK, # number of frames per buffer
    input=True,
)

print("Recording...")
buffer = io.BytesIO()
for _ in range(0, int(RATE / CHUNK * RECORD_SECONDS))
    data = stream.read(CHUNK)
    buffer.write(data)

stream.stop_stream()
stream.close()
p.terminate()

保存音频文件

使用标准库中的 wave 包将音频字节保存到 wav 文件中,它会将 wav 格式写入文件头部,详见文档:The Python Standard Library - wave

代码语言:python
代码运行次数:0
复制
import wave

with wave.open(output, 'wb') as wf:
    wf.setnchannels(CHANNELS)
    wf.setsampwidth(pyaudio.get_sample_size(pyaudio.paInt16))
    wf.setframerate(RATE)
    wf.writeframes(data)
print(f"Recording saved as {output}")

如果不需要写到文件中,只希望它帮忙把 wav 文件的头部加到字节流前,可以如下操作

代码语言:python
代码运行次数:0
复制
output = io.BytesIO()
with wave.open(output, 'wb') as wf:
    wf.setnchannels(CHANNELS)
    wf.setsampwidth(pyaudio.get_sample_size(pyaudio.paInt16))
    wf.setframerate(RATE)
    wf.writeframes(data)
output.getvalue()

语音识别

腾讯云

腾讯云的语音识别服务有多种,这里只尝试了“一句话识别”和“录音文件识别极速版”

一句话识别

参考文档,将录音字节转为 base64 编码后,通过 API 接口发送,可以得到识别结果

注意,这里的字节数据是要包含对应文件格式文件头的,也就是说如果是上文中 pyaudio 中得到字节流需要先用 wave 模块补上文件头,否则腾讯云接口会报格式识别错误。

代码语言:python
代码运行次数:0
复制
import base64
import json

from tencentcloud.common.common_client import CommonClient
from tencentcloud.common import credential
from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException
from tencentcloud.common.profile.client_profile import ClientProfile

try:
    cred = credential.Credential(SECRET_ID, SECRET_KEY)

    httpProfile = HttpProfile()
    httpProfile.endpoint = "asr.tencentcloudapi.com"
    clientProfile = ClientProfile()
    clientProfile.httpProfile = httpProfile

    encoded = base64.b64encode(sentence_bytes).decode()
    print(encoded)

    data = {
        'EngSerViceType': '16k_zh',
        'SourceType': 1,
        'VoiceFormat': 'wav',
        'Data': encoded,
        'DataLen': len(sentence_bytes)
    }
    params = json.dumps(data)
    common_client = CommonClient("asr", "2019-06-14", cred, "", profile=clientProfile)
    print(common_client.call_json("SentenceRecognition", json.loads(params)))
except TencentCloudSDKException as err:
    print(err)
录音文件识别极速版

参考官方示例代码

代码语言:python
代码运行次数:0
复制
import requests
import hmac
import hashlib
import base64
import time
import json
from tencentcloud.common.profile.http_profile import HttpProfile

class FlashRecognitionRequest:
    def __init__(self, engine_type):
        self.engine_type = engine_type
        self.speaker_diarization = 0
        self.hotword_id = ""
        self.customization_id = ""
        self.filter_dirty = 0
        self.filter_modal = 0
        self.filter_punc = 0
        self.convert_num_mode = 1
        self.word_info = 0
        self.voice_format = ""
        self.first_channel_only = 1
        self.reinforce_hotword = 0
        self.sentence_max_length = 0

    def set_first_channel_only(self, first_channel_only):
        self.first_channel_only = first_channel_only

    def set_speaker_diarization(self, speaker_diarization):
        self.speaker_diarization = speaker_diarization

    def set_filter_dirty(self, filter_dirty):
        self.filter_dirty = filter_dirty

    def set_filter_modal(self, filter_modal):
        self.filter_modal = filter_modal

    def set_filter_punc(self, filter_punc):
        self.filter_punc = filter_punc

    def set_convert_num_mode(self, convert_num_mode):
        self.convert_num_mode = convert_num_mode

    def set_word_info(self, word_info):
        self.word_info = word_info

    def set_hotword_id(self, hotword_id):
        self.hotword_id = hotword_id

    def set_customization_id(self, customization_id):
        self.customization_id = customization_id

    def set_voice_format(self, voice_format):
        self.voice_format = voice_format

    def set_sentence_max_length(self, sentence_max_length):
        self.sentence_max_length = sentence_max_length

    def set_reinforce_hotword(self, reinforce_hotword):
        self.reinforce_hotword = reinforce_hotword
        
class FlashRecognizer:
    def __init__(self):
        pass

    def _format_sign_string(self, param):
        signstr = "POSTasr.cloud.tencent.com/asr/flash/v1/"
        for t in param:
            if 'appid' in t:
                signstr += str(t[1])
                break
        signstr += "?"
        for x in param:
            tmp = x
            if 'appid' in x:
                continue
            for t in tmp:
                signstr += str(t)
                signstr += "="
            signstr = signstr[:-1]
            signstr += "&"
        signstr = signstr[:-1]
        return signstr

    def _build_header(self):
        header = dict()
        header["Host"] = "asr.cloud.tencent.com"
        return header

    def _sign(self, signstr, secret_key):
        hmacstr = hmac.new(secret_key.encode('utf-8'),
                           signstr.encode('utf-8'), hashlib.sha1).digest()
        s = base64.b64encode(hmacstr)
        s = s.decode('utf-8')
        return s

    def _build_req_with_signature(self, secret_key, params, header):
        query = sorted(params.items(), key=lambda d: d[0])
        signstr = self._format_sign_string(query)
        signature = self._sign(signstr, secret_key)
        header["Authorization"] = signature
        requrl = "https://"
        requrl += signstr[4::]
        return requrl

    def _create_query_arr(self, req):
        query_arr = dict()
        query_arr['appid'] = APP_ID
        query_arr['secretid'] = SECRET_ID
        query_arr['timestamp'] = str(int(time.time()))
        query_arr['engine_type'] = req.engine_type
        query_arr['voice_format'] = req.voice_format
        query_arr['speaker_diarization'] = req.speaker_diarization
        query_arr['hotword_id'] = req.hotword_id
        query_arr['customization_id'] = req.customization_id
        query_arr['filter_dirty'] = req.filter_dirty
        query_arr['filter_modal'] = req.filter_modal
        query_arr['filter_punc'] = req.filter_punc
        query_arr['convert_num_mode'] = req.convert_num_mode
        query_arr['word_info'] = req.word_info
        query_arr['first_channel_only'] = req.first_channel_only
        query_arr['reinforce_hotword'] = req.reinforce_hotword
        query_arr['sentence_max_length'] = req.sentence_max_length
        return query_arr

    def recognize(self, req, data):
        header = self._build_header()
        query_arr = self._create_query_arr(req)
        req_url = self._build_req_with_signature(SECRET_KEY, query_arr, header)
        r = requests.post(req_url, headers=header, data=data)
        return r.text


recognizer = FlashRecognizer()

# 新建识别请求
req = FlashRecognitionRequest('16k_zh')
req.set_filter_modal(0)
req.set_filter_punc(0)
req.set_filter_dirty(0)
req.set_voice_format("wav")
req.set_word_info(0)
req.set_convert_num_mode(1)

#执行识别
resultData = recognizer.recognize(req, audio_bytes)
resp = json.loads(resultData)
request_id = resp["request_id"]
code = resp["code"]
if code != 0:
    print("recognize faild! request_id: ", request_id, " code: ", code, ", message: ", resp["message"])
    return ''

#一个channl_result对应一个声道的识别结果
#大多数音频是单声道,对应一个channl_result
try:
    result = resp["flash_result"][0]['text']
except Exception as e:
    print(f'parse error: {e}')

与“一句话识别”类似,上传数据同样需要带上对应格式的文件头

科大讯飞

这里试用了科大讯飞的实时语音转写接口,通过 websocket 的方式,推送字节流到 websocket server,并接受识别结果

这里参考了官方示例,并用 async/await 方式改写了程序

代码语言:python
代码运行次数:0
复制
from datetime import datetime
import time
from urllib.parse import urlencode
from wsgiref.handlers import format_date_time
import asyncio
import base64
import hashlib
import hmac
import websockets
import json

STATUS_FIRST_FRAME = 0  # 第一帧的标识
STATUS_CONTINUE_FRAME = 1  # 中间帧标识
STATUS_LAST_FRAME = 2  # 最后一帧的标识

def get_url(app_key: str, app_secret: str) -> str:
    url = 'wss://ws-api.xfyun.cn/v2/iat'
    # 生成RFC1123格式的时间戳
    now = datetime.now()
    date = format_date_time(time.mktime(now.timetuple()))

    # 拼接字符串
    signature_origin = "host: " + "ws-api.xfyun.cn" + "\n"
    signature_origin += "date: " + date + "\n"
    signature_origin += "GET " + "/v2/iat " + "HTTP/1.1"
    # 进行hmac-sha256进行加密
    signature_sha = hmac.new(app_secret.encode('utf-8'), signature_origin.encode('utf-8'),
                                digestmod=hashlib.sha256).digest()
    signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8')

    authorization_origin = "api_key=\"%s\", algorithm=\"%s\", headers=\"%s\", signature=\"%s\"" % (
        app_key, "hmac-sha256", "host date request-line", signature_sha)
    authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')
    # 将请求的鉴权参数组合为字典
    v = {
        "authorization": authorization,
        "date": date,
        "host": "ws-api.xfyun.cn"
    }
        # 拼接鉴权参数,生成url
    url = url + '?' + urlencode(v)
    return url

async def iflytek_recognition(data: bytes):
    url = get_url(app_key=APP_KEY, app_secret=APP_SECRET)
    async with websockets.connect(url) as ws:
        frameSize = 8000  # 每一帧的音频大小
        intervel = 0.04  # 发送音频间隔(单位:s)
        status = STATUS_FIRST_FRAME  # 音频的状态信息,标识音频是第一帧,还是中间帧、最后一帧
        
        common_args = {"app_id": APP_ID}
        business_args = {"domain": "iat", "language": "zh_cn", "accent": "mandarin", "vinfo":1, "vad_eos":10000}
        
        i = 0
        while True:
            buf = data[i*frameSize:(i+1)*frameSize]
            i += 1
            print(len(buf))
            # 文件结束
            if not buf:
                status = STATUS_LAST_FRAME
            # 第一帧处理
            # 发送第一帧音频,带business 参数
            # appid 必须带上,只需第一帧发送
            if status == STATUS_FIRST_FRAME:

                d = {"common": common_args,
                        "business": business_args,
                        "data": {"status": 0, "format": "audio/L16;rate=16000",
                                "audio": str(base64.b64encode(buf), 'utf-8'),
                                "encoding": "raw"}}
                d = json.dumps(d)
                await ws.send(d)
                status = STATUS_CONTINUE_FRAME
            # 中间帧处理
            elif status == STATUS_CONTINUE_FRAME:
                d = {"data": {"status": 1, "format": "audio/L16;rate=16000",
                                "audio": str(base64.b64encode(buf), 'utf-8'),
                                "encoding": "raw"}}
                await ws.send(json.dumps(d))
            # 最后一帧处理
            elif status == STATUS_LAST_FRAME:
                d = {"data": {"status": 2, "format": "audio/L16;rate=16000",
                                "audio": str(base64.b64encode(buf), 'utf-8'),
                                "encoding": "raw"}}
                await ws.send(json.dumps(d))
                break
            # 模拟音频采样间隔
            time.sleep(intervel)
        message = await ws.recv()
    result = ''
    try:
        code = json.loads(message)["code"]
        sid = json.loads(message)["sid"]
        if code != 0:
            errMsg = json.loads(message)["message"]
            print("sid:%s call error:%s code is:%s" % (sid, errMsg, code))
        else:
            data = json.loads(message)["data"]["result"]["ws"]
            # print(json.loads(message))
            for i in data:
                for w in i["cw"]:
                    result += w["w"]
            print("sid:%s call success!,data is:%s" % (sid, json.dumps(data, ensure_ascii=False)))
    except Exception as e:
        print("receive msg,but parse exception:", e)
    return result

async def main():
    with open('sample.wav', 'rb') as f:
        await iflytek_recognition(f.read())

一个简单 Demo

这里写一个通过键盘空格键控制录音开关,并打印语音识别结果的 demo

键盘监控使用 pynput 包,它通过一个线程监听键盘事件并响应

安装

代码语言:python
代码运行次数:0
复制
pip install pynput

完整代码如下

代码语言:python
代码运行次数:0
复制
import pyaudio
import wave
from pynput import keyboard
import threading
import io

from list_devices import list_devices
from tencent import sentence_recognition, flash_recognition
from iflytek import iflytek_recognition
import asyncio

FORMAT = pyaudio.paInt16 # format of audio samples
CHANNELS = 1 # audio channels(1: mono, 2: stereo)
RATE=16000 # sample rate
CHUNK=1024 # number of frames per buffer

is_recording = False
device_index = 0

def save(data: bytes, output: str):
    with wave.open(output, 'wb') as wf:
        wf.setnchannels(CHANNELS)
        wf.setsampwidth(pyaudio.get_sample_size(pyaudio.paInt16))
        wf.setframerate(RATE)
        wf.writeframes(data)
    print(f"Recording saved as {output}")


def to_wav_bytes(data: bytes) -> bytes:
    output = io.BytesIO()
    with wave.open(output, 'wb') as wf:
        wf.setnchannels(CHANNELS)
        wf.setsampwidth(pyaudio.get_sample_size(pyaudio.paInt16))
        wf.setframerate(RATE)
        wf.writeframes(data)
    return output.getvalue()

def do_record(device_index: int, output: str):
    global is_recording
    print("Recording...")
    p = pyaudio.PyAudio()
    buffer = io.BytesIO()
    stream = p.open(
        format=FORMAT,
        channels=CHANNELS,
        rate=RATE,
        frames_per_buffer=CHUNK,
        input=True,
        input_device_index=device_index,
    )
        
    while is_recording:
        data = stream.read(CHUNK)
        buffer.write(data)
    stream.stop_stream()
    stream.close()
        
        
    print("Finished.")
    p.terminate()
    
    
    data = buffer.getvalue()
    # result = sentence_recognition(to_wav_bytes(data))
    result = flash_recognition(to_wav_bytes(data))
    # result = asyncio.run(iflytek_recognition(data))
    print(result)
    save(data, output)
    

def on_release(key):
    global is_recording, device_index
    try:
        if key == keyboard.Key.space:
            if not is_recording:
                is_recording = True
                threading.Thread(target=do_record, args=(device_index, 'output.wav')).start()
            else:
                is_recording = False
        elif key == keyboard.Key.esc:
            is_recording = False
            # Stop listener
            return False
            
    except AttributeError:
        print('special key {0} pressed'.format(
            key))

def main():
    global device_index
    list_devices()
    device_index = int(input("Please select input device:"))
    with keyboard.Listener(on_release=on_release) as listener:
        listener.join()


if __name__ == '__main__':
    main()

尝试下来,腾讯云的录音文件识别极速版速度非常快,符合日常简单应用的语音入口的场景

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 录音
  • 保存音频文件
  • 语音识别
    • 腾讯云
      • 一句话识别
      • 录音文件识别极速版
    • 科大讯飞
    • 一个简单 Demo
    相关产品与服务
    语音识别
    腾讯云语音识别(Automatic Speech Recognition,ASR)是将语音转化成文字的PaaS产品,为企业提供精准而极具性价比的识别服务。被微信、王者荣耀、腾讯视频等大量业务使用,适用于录音质检、会议实时转写、语音输入法等多个场景。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档