当前位置: 首页>大数据>正文

基于 Python 的声音检测:PyAudio

前言

来啦老铁!

这两天在研究怎么检测、录制电脑的声音,预计对未来的自动化测试场景有帮助,因此记录一下学习过程。初次研究,仅具参考意义,不具指导意义哈~

仍然以 Python 语言来练手,抛出本文关键字:

  • PyAudio

同时,顺带练手一下前几期学的工具:Python 命令行工具库:Fire

本文代码仓库供参考:

  • https://github.com/dylanz666/pyaudio-learning.git

学习路径

  1. PyAudio 模块简介;
  2. PyAudio 模块安装;
  3. 使用 PyAudio 模块操作音频;
  4. 使用 PyAudio 模块进行声音自动检测与录制;

1. PyAudio 模块简介;

2. PyAudio 模块安装;

(以 mac os 为例)

1. 安装 portaudio;

  • 命令行执行以下命令;
brew install portaudio
基于 Python 的声音检测:PyAudio,第1张
安装 portaudio
  • 否则安装 PyAudio 时会有如下报错:
基于 Python 的声音检测:PyAudio,第2张
安装 PyAudio 时的报错

2. 安装 PyAudio 模块;

  • 命令行执行以下命令;
pip3 install PyAudio
  • 安装成功后如:
基于 Python 的声音检测:PyAudio,第3张
安装 PyAudio 模块

3. 使用 PyAudio 模块操作音频;

在使用 PyAudio 模块操作音频之前,先贴一个可免费下载 wav 文件的网站 ,在网上,如果你想下载素材,大部分都要钱的,这个网站是我搜索的时候无意中发现的:

  • https://www.aigei.com/sound/class/-wav/

接下来正式进入使用 PyAudio 模块操作音频的学习;

  1. 播放音频文件;
  • 创建一个 python 文件用于演示播放音频文件,如 player.py ;
import pyaudio
import wave
import sys

CHUNK = 1024

if len(sys.argv) < 2:
    print("Plays a wave file.\n\nUsage: %s filename.wav" % sys.argv[0])
    sys.exit(-1)

wf = wave.open(sys.argv[1], 'rb')

p = pyaudio.PyAudio()

stream = p.open(format=p.get_format_from_width(wf.getsampwidth()),
                channels=wf.getnchannels(),
                rate=wf.getframerate(),
                output=True)

data = wf.readframes(CHUNK)

while data != b"":
    stream.write(data)
    data = wf.readframes(CHUNK)

stream.stop_stream()
stream.close()

p.terminate()

  • 使用以下命令播放音频文件;
python3 player.py test.wav
  1. 录音;
  • 创建一个 python 文件用于演示录制音频,如 recorder.py ;
import pyaudio
import wave

CHUNK = 1024
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 44100
RECORD_SECONDS = 5
WAVE_OUTPUT_FILENAME = "output.wav"

p = pyaudio.PyAudio()

stream = p.open(format=FORMAT,
                channels=CHANNELS,
                rate=RATE,
                input=True,
                frames_per_buffer=CHUNK)

print("* recording")

frames = []

for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)):
    data = stream.read(CHUNK)
    frames.append(data)

print("* done recording")

stream.stop_stream()
stream.close()
p.terminate()

wf = wave.open(WAVE_OUTPUT_FILENAME, 'wb')
wf.setnchannels(CHANNELS)
wf.setsampwidth(p.get_sample_size(FORMAT))
wf.setframerate(RATE)
wf.writeframes(b''.join(frames))
wf.close()

  • 使用以下命令录制电脑系统音频,包括麦克风;
python3 recorder.py
  1. 录制并马上播放;
  • 创建一个 python 文件用于演示录制并马上播放,如 recordAndPlayImmediately.py;
import pyaudio
import wave
import time
import sys

if len(sys.argv) < 2:
    print("Plays a wave file.\n\nUsage: %s filename.wav" % sys.argv[0])
    sys.exit(-1)

wf = wave.open(sys.argv[1], 'rb')

p = pyaudio.PyAudio()


def callback(in_data, frame_count, time_info, status):
    data = wf.readframes(frame_count)
    return data, pyaudio.paContinue


stream = p.open(format=p.get_format_from_width(wf.getsampwidth()),
                channels=wf.getnchannels(),
                rate=wf.getframerate(),
                output=True,
                stream_callback=callback)

stream.start_stream()

while stream.is_active():
    time.sleep(0.1)

stream.stop_stream()
stream.close()
wf.close()

p.terminate()
  • 使用以下命令演示录制并马上播放;
python3 recordAndPlayImmediately.py
  1. 播放音频文件(callback 方式);
  • 创建一个 python 文件用于演示播放音频文件的 callback 方式,如 playerCallbackVersion.py;
import pyaudio
import wave
import time
import sys

if len(sys.argv) < 2:
    print("Plays a wave file.\n\nUsage: %s filename.wav" % sys.argv[0])
    sys.exit(-1)

wf = wave.open(sys.argv[1], 'rb')

p = pyaudio.PyAudio()


def callback(in_data, frame_count, time_info, status):
    data = wf.readframes(frame_count)
    return data, pyaudio.paContinue


stream = p.open(format=p.get_format_from_width(wf.getsampwidth()),
                channels=wf.getnchannels(),
                rate=wf.getframerate(),
                output=True,
                stream_callback=callback)

stream.start_stream()

while stream.is_active():
    time.sleep(0.1)

stream.stop_stream()
stream.close()
wf.close()

p.terminate()

  • 使用以下命令播放音频文件(callback 方式);
python3 playerCallbackVersion.py test.wav
  1. 录制并马上播放(callback 方式);
  • 创建一个 python 文件用于演示录制并马上播放(callback 方式),如 recordAndPlayImmediately.py;
import pyaudio
import time

WIDTH = 2
CHANNELS = 1
RATE = 44100

p = pyaudio.PyAudio()


def callback(in_data, frame_count, time_info, status):
    return in_data, pyaudio.paContinue


stream = p.open(format=p.get_format_from_width(WIDTH),
                channels=CHANNELS,
                rate=RATE,
                input=True,
                output=True,
                stream_callback=callback)

stream.start_stream()

while stream.is_active():
    time.sleep(0.1)

stream.stop_stream()
stream.close()

p.terminate()

  • 使用以下命令播放音频文件(callback 方式);
python3 python3 recordAndPlayImmediatelyCallbackVersion.py

(以上几个示例的代码来源:https://docs.python.org/zh-cn/3/library/audioop.html)

当然,这个命令会一直运行去采集声音,因为 stream.is_active() 一直都是 True,即电脑系统、麦克风一直有声音,因此一直能够采集到声音。

想要能够自动停止采集,接下来我们来探索声音的自动采集与录制~

4. 使用 PyAudio 模块进行声音自动检测与录制;

接下来我会以这样的一个场景进行声音的采集、录制、自动根据音量大小停止采集,即:

  • 在系统播放音频的时候,当有从系统采集到音频,则录制,直到没有从系统采集到音频;

这里的“有从系统采集到音频”我们可以理解为从系统采集到的音频具有一定的音响或音量,反之,“没有从系统采集到音频”则可以理解为从系统采集到的音频音响或音量低到一定程度;

为了规避麦克风的干扰,使我们能够实现自动停止音频采集,我们可以:
  • 关闭电脑的麦克风;
  • 设置一个音响或音量阈值,当采集到的音频的音响或音量低于这个阈值,并持续某一段时间,则认为“没有从系统采集到音频”;

关闭电脑的麦克风我就不做研究了,这是电脑设置。我们来研究第 2 个,要对 “音响或音量” 大小进行实时评估,这时候我们需要用到对声音片段进行数学处理的模块:

  • audioop

这是 python 自带的一个模块,相关文档:https://docs.python.org/zh-cn/3/library/audioop.html

我们可以用 audioop 的声音片段均方根值 rms 来评估声音片段的“音响或音量”:

基于 Python 的声音检测:PyAudio,第4张
rms

然后根据电脑音量设置、测试时周边噪音大小情况(很明显,我们当然不能在噪音非常大的情况下进行声音采集,并且这种情况下采集的音频也没有意义),制定一个合适的 rms 阈值。

例如,当我电脑的音量设置为 50%,周边没有特别的噪音的情况下,rms 阈值可以设置为 100,雨天可以设置为 500,当持续采集到的声音的 rms 值均低于该阈值,则自动停止声音的采集;

其他细节,如为了边播放边录制,我们用到多进程 multiprocessing 模块,为了练习前几期学的工具:Python 命令行工具库:Fire,使用了 Python 命令行工具库 Fire。

这是我目前能想到的方案,代码位于 main.py 文件内,仅供参考:

import audioop
from multiprocessing import Process

import fire
import pyaudio
import wave

stream_format = pyaudio.paInt16
pyaudio_instance = pyaudio.PyAudio()
sample_width = pyaudio_instance.get_sample_size(stream_format)
global audio_frames


class Detector(object):
    def __init__(self):
        self.source_file = ""
        self.channels = None
        self.rate = None
        self.chunk = None
        self.audio_min_rms = None
        self.max_low_audio_flag = None
        self.recording = False
        self.recording_file = ""
        self.audio_frames = []

    def __str__(self):
        return ""

    def play(self, source_file="", chunk=None):
        source_file = source_file if not self.source_file else self.source_file
        chunk = chunk if not self.chunk else self.chunk

        f = wave.open(source_file, "rb")
        p = pyaudio.PyAudio()

        file_format = p.get_format_from_width(f.getsampwidth())
        stream = p.open(format=file_format, channels=f.getnchannels(), rate=f.getframerate(), output=True)

        data = f.readframes(chunk)

        while data != b"":
            stream.write(data)
            data = f.readframes(chunk)

        stream.stop_stream()
        stream.close()
        p.terminate()
        return self

    def detect_audio(self, channels=None, rate=None, chunk=None, audio_min_rms=None, max_low_audio_flag=None,
                     recording=False, recording_file=""):
        channels = channels if not self.channels else self.channels
        rate = rate if not self.rate else self.rate
        chunk = chunk if not self.chunk else self.chunk
        audio_min_rms = audio_min_rms if not self.audio_min_rms else self.audio_min_rms
        max_low_audio_flag = max_low_audio_flag if not self.max_low_audio_flag else self.max_low_audio_flag
        recording = recording if not self.recording else self.recording
        recording_file = recording_file if not self.recording_file else self.recording_file
        self.channels = channels
        self.rate = rate
        self.chunk = chunk
        self.audio_min_rms = audio_min_rms
        self.max_low_audio_flag = max_low_audio_flag
        self.recording = recording
        self.recording_file = recording_file

        print("* start detecting audio ~")
        self.channels = channels
        self.rate = rate

        stream = pyaudio_instance.open(format=stream_format,
                                       channels=channels,
                                       rate=rate,
                                       input=True,
                                       frames_per_buffer=chunk)
        low_audio_flag = 0
        detect_count = 0
        while True:
            detect_count += 1

            stream_data = stream.read(chunk)

            rms = audioop.rms(stream_data, 2)
            print(f"the {detect_count} time detecting:", rms)

            low_audio_flag = 0 if rms > audio_min_rms else low_audio_flag + 1

            # 100 为经验值,即连续 100 次采样都是小音量,则可以认为没有音频,根据实际情况设置
            if low_audio_flag > max_low_audio_flag:
                print("* no audio detected, stop detecting ~")
                break
            self.audio_frames.append(stream_data)
        stream.stop_stream()
        stream.close()
        pyaudio_instance.terminate()
        if recording:
            self.record()
        return self

    def record(self, recording_file=""):
        recording_file = recording_file if not self.recording_file else self.recording_file
        self.recording_file = recording_file

        wf = wave.open(recording_file, 'wb')
        wf.setnchannels(self.channels)
        wf.setsampwidth(sample_width)
        wf.setframerate(self.rate)
        wf.writeframes(b''.join(self.audio_frames))
        wf.close()
        return self

    def play_and_detect(self, source_file, channels, rate, chunk, audio_min_rms, max_low_audio_flag, recording,
                        recording_file):
        self.source_file = source_file
        self.channels = channels
        self.rate = rate
        self.chunk = chunk
        self.audio_min_rms = audio_min_rms
        self.max_low_audio_flag = max_low_audio_flag
        self.recording = recording
        self.recording_file = recording_file

        play_process = Process(target=self.play)
        detect_process = Process(target=self.detect_audio)
        play_process.start()
        detect_process.start()

        play_process.join()
        detect_process.join()
        return self


if __name__ == '__main__':
    fire.Fire(Detector)

测试:

1. 播放音频文件;
python3 main.py - play --source_file=test.wav --chunk=1024
2. 单纯检测音频;
python3 main.py - detect_audio --channels=1 --rate=44100 --chunk=1024 --audio_min_rms=500 -max_low_audio_flag=100
3. 检测并录制音频;
python3 main.py - detect_audio --channels=1 --rate=44100 --chunk=1024 --audio_min_rms=500 -max_low_audio_flag=100 - record --recording_file=recording.wav
4. 播放音频的同时录制音频;
python3 main.py - play_and_detect --source_file=test.wav --channels=1 --rate=44100 --chunk=1024 --audio_min_rms=500 -max_low_audio_flag=100 --recording=True --recording_file=recording.wav

对于录制到的音频文件,如果要进行与原音频的比对,可能还需要用到降噪能力,简单的降噪模块如 noisereduce 等,复杂的咱也还不会呀,后续有机会咱们再继续研究这方面的知识;

好了,今天先玩到这里吧,我们改日再战~

非常感谢!


https://www.xamrdz.com/bigdata/79z1869681.html

相关文章: