前言
来啦老铁!
这两天在研究怎么检测、录制电脑的声音,预计对未来的自动化测试场景有帮助,因此记录一下学习过程。初次研究,仅具参考意义,不具指导意义哈~
仍然以 Python 语言来练手,抛出本文关键字:
-
PyAudio
同时,顺带练手一下前几期学的工具:Python 命令行工具库:Fire
本文代码仓库供参考:
- https://github.com/dylanz666/pyaudio-learning.git
学习路径
- PyAudio 模块简介;
- PyAudio 模块安装;
- 使用 PyAudio 模块操作音频;
- 使用 PyAudio 模块进行声音自动检测与录制;
1. PyAudio 模块简介;
2. PyAudio 模块安装;
(以 mac os 为例)
1. 安装 portaudio;
- 命令行执行以下命令;
brew install portaudio
- 否则安装 PyAudio 时会有如下报错:
2. 安装 PyAudio 模块;
- 命令行执行以下命令;
pip3 install PyAudio
- 安装成功后如:
3. 使用 PyAudio 模块操作音频;
在使用 PyAudio 模块操作音频之前,先贴一个可免费下载 wav 文件的网站 ,在网上,如果你想下载素材,大部分都要钱的,这个网站是我搜索的时候无意中发现的:
- https://www.aigei.com/sound/class/-wav/
接下来正式进入使用 PyAudio 模块操作音频的学习;
- 播放音频文件;
- 创建一个 python 文件用于演示播放音频文件,如 player.py ;
import pyaudio
import wave
import sys
CHUNK = 1024
if len(sys.argv) < 2:
print("Plays a wave file.\n\nUsage: %s filename.wav" % sys.argv[0])
sys.exit(-1)
wf = wave.open(sys.argv[1], 'rb')
p = pyaudio.PyAudio()
stream = p.open(format=p.get_format_from_width(wf.getsampwidth()),
channels=wf.getnchannels(),
rate=wf.getframerate(),
output=True)
data = wf.readframes(CHUNK)
while data != b"":
stream.write(data)
data = wf.readframes(CHUNK)
stream.stop_stream()
stream.close()
p.terminate()
- 使用以下命令播放音频文件;
python3 player.py test.wav
- 录音;
- 创建一个 python 文件用于演示录制音频,如 recorder.py ;
import pyaudio
import wave
CHUNK = 1024
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 44100
RECORD_SECONDS = 5
WAVE_OUTPUT_FILENAME = "output.wav"
p = pyaudio.PyAudio()
stream = p.open(format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
frames_per_buffer=CHUNK)
print("* recording")
frames = []
for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)):
data = stream.read(CHUNK)
frames.append(data)
print("* done recording")
stream.stop_stream()
stream.close()
p.terminate()
wf = wave.open(WAVE_OUTPUT_FILENAME, 'wb')
wf.setnchannels(CHANNELS)
wf.setsampwidth(p.get_sample_size(FORMAT))
wf.setframerate(RATE)
wf.writeframes(b''.join(frames))
wf.close()
- 使用以下命令录制电脑系统音频,包括麦克风;
python3 recorder.py
- 录制并马上播放;
- 创建一个 python 文件用于演示录制并马上播放,如 recordAndPlayImmediately.py;
import pyaudio
import wave
import time
import sys
if len(sys.argv) < 2:
print("Plays a wave file.\n\nUsage: %s filename.wav" % sys.argv[0])
sys.exit(-1)
wf = wave.open(sys.argv[1], 'rb')
p = pyaudio.PyAudio()
def callback(in_data, frame_count, time_info, status):
data = wf.readframes(frame_count)
return data, pyaudio.paContinue
stream = p.open(format=p.get_format_from_width(wf.getsampwidth()),
channels=wf.getnchannels(),
rate=wf.getframerate(),
output=True,
stream_callback=callback)
stream.start_stream()
while stream.is_active():
time.sleep(0.1)
stream.stop_stream()
stream.close()
wf.close()
p.terminate()
- 使用以下命令演示录制并马上播放;
python3 recordAndPlayImmediately.py
- 播放音频文件(callback 方式);
- 创建一个 python 文件用于演示播放音频文件的 callback 方式,如 playerCallbackVersion.py;
import pyaudio
import wave
import time
import sys
if len(sys.argv) < 2:
print("Plays a wave file.\n\nUsage: %s filename.wav" % sys.argv[0])
sys.exit(-1)
wf = wave.open(sys.argv[1], 'rb')
p = pyaudio.PyAudio()
def callback(in_data, frame_count, time_info, status):
data = wf.readframes(frame_count)
return data, pyaudio.paContinue
stream = p.open(format=p.get_format_from_width(wf.getsampwidth()),
channels=wf.getnchannels(),
rate=wf.getframerate(),
output=True,
stream_callback=callback)
stream.start_stream()
while stream.is_active():
time.sleep(0.1)
stream.stop_stream()
stream.close()
wf.close()
p.terminate()
- 使用以下命令播放音频文件(callback 方式);
python3 playerCallbackVersion.py test.wav
- 录制并马上播放(callback 方式);
- 创建一个 python 文件用于演示录制并马上播放(callback 方式),如 recordAndPlayImmediately.py;
import pyaudio
import time
WIDTH = 2
CHANNELS = 1
RATE = 44100
p = pyaudio.PyAudio()
def callback(in_data, frame_count, time_info, status):
return in_data, pyaudio.paContinue
stream = p.open(format=p.get_format_from_width(WIDTH),
channels=CHANNELS,
rate=RATE,
input=True,
output=True,
stream_callback=callback)
stream.start_stream()
while stream.is_active():
time.sleep(0.1)
stream.stop_stream()
stream.close()
p.terminate()
- 使用以下命令播放音频文件(callback 方式);
python3 python3 recordAndPlayImmediatelyCallbackVersion.py
(以上几个示例的代码来源:https://docs.python.org/zh-cn/3/library/audioop.html)
当然,这个命令会一直运行去采集声音,因为 stream.is_active() 一直都是 True,即电脑系统、麦克风一直有声音,因此一直能够采集到声音。
想要能够自动停止采集,接下来我们来探索声音的自动采集与录制~
4. 使用 PyAudio 模块进行声音自动检测与录制;
接下来我会以这样的一个场景进行声音的采集、录制、自动根据音量大小停止采集,即:
- 在系统播放音频的时候,当有从系统采集到音频,则录制,直到没有从系统采集到音频;
这里的“有从系统采集到音频”我们可以理解为从系统采集到的音频具有一定的音响或音量,反之,“没有从系统采集到音频”则可以理解为从系统采集到的音频音响或音量低到一定程度;
为了规避麦克风的干扰,使我们能够实现自动停止音频采集,我们可以:
- 关闭电脑的麦克风;
- 设置一个音响或音量阈值,当采集到的音频的音响或音量低于这个阈值,并持续某一段时间,则认为“没有从系统采集到音频”;
关闭电脑的麦克风我就不做研究了,这是电脑设置。我们来研究第 2 个,要对 “音响或音量” 大小进行实时评估,这时候我们需要用到对声音片段进行数学处理的模块:
-
audioop
这是 python 自带的一个模块,相关文档:https://docs.python.org/zh-cn/3/library/audioop.html
我们可以用 audioop 的声音片段均方根值 rms 来评估声音片段的“音响或音量”:
然后根据电脑音量设置、测试时周边噪音大小情况(很明显,我们当然不能在噪音非常大的情况下进行声音采集,并且这种情况下采集的音频也没有意义),制定一个合适的 rms 阈值。
例如,当我电脑的音量设置为 50%,周边没有特别的噪音的情况下,rms 阈值可以设置为 100,雨天可以设置为 500,当持续采集到的声音的 rms 值均低于该阈值,则自动停止声音的采集;
其他细节,如为了边播放边录制,我们用到多进程 multiprocessing 模块,为了练习前几期学的工具:Python 命令行工具库:Fire,使用了 Python 命令行工具库 Fire。
这是我目前能想到的方案,代码位于 main.py 文件内,仅供参考:
import audioop
from multiprocessing import Process
import fire
import pyaudio
import wave
stream_format = pyaudio.paInt16
pyaudio_instance = pyaudio.PyAudio()
sample_width = pyaudio_instance.get_sample_size(stream_format)
global audio_frames
class Detector(object):
def __init__(self):
self.source_file = ""
self.channels = None
self.rate = None
self.chunk = None
self.audio_min_rms = None
self.max_low_audio_flag = None
self.recording = False
self.recording_file = ""
self.audio_frames = []
def __str__(self):
return ""
def play(self, source_file="", chunk=None):
source_file = source_file if not self.source_file else self.source_file
chunk = chunk if not self.chunk else self.chunk
f = wave.open(source_file, "rb")
p = pyaudio.PyAudio()
file_format = p.get_format_from_width(f.getsampwidth())
stream = p.open(format=file_format, channels=f.getnchannels(), rate=f.getframerate(), output=True)
data = f.readframes(chunk)
while data != b"":
stream.write(data)
data = f.readframes(chunk)
stream.stop_stream()
stream.close()
p.terminate()
return self
def detect_audio(self, channels=None, rate=None, chunk=None, audio_min_rms=None, max_low_audio_flag=None,
recording=False, recording_file=""):
channels = channels if not self.channels else self.channels
rate = rate if not self.rate else self.rate
chunk = chunk if not self.chunk else self.chunk
audio_min_rms = audio_min_rms if not self.audio_min_rms else self.audio_min_rms
max_low_audio_flag = max_low_audio_flag if not self.max_low_audio_flag else self.max_low_audio_flag
recording = recording if not self.recording else self.recording
recording_file = recording_file if not self.recording_file else self.recording_file
self.channels = channels
self.rate = rate
self.chunk = chunk
self.audio_min_rms = audio_min_rms
self.max_low_audio_flag = max_low_audio_flag
self.recording = recording
self.recording_file = recording_file
print("* start detecting audio ~")
self.channels = channels
self.rate = rate
stream = pyaudio_instance.open(format=stream_format,
channels=channels,
rate=rate,
input=True,
frames_per_buffer=chunk)
low_audio_flag = 0
detect_count = 0
while True:
detect_count += 1
stream_data = stream.read(chunk)
rms = audioop.rms(stream_data, 2)
print(f"the {detect_count} time detecting:", rms)
low_audio_flag = 0 if rms > audio_min_rms else low_audio_flag + 1
# 100 为经验值,即连续 100 次采样都是小音量,则可以认为没有音频,根据实际情况设置
if low_audio_flag > max_low_audio_flag:
print("* no audio detected, stop detecting ~")
break
self.audio_frames.append(stream_data)
stream.stop_stream()
stream.close()
pyaudio_instance.terminate()
if recording:
self.record()
return self
def record(self, recording_file=""):
recording_file = recording_file if not self.recording_file else self.recording_file
self.recording_file = recording_file
wf = wave.open(recording_file, 'wb')
wf.setnchannels(self.channels)
wf.setsampwidth(sample_width)
wf.setframerate(self.rate)
wf.writeframes(b''.join(self.audio_frames))
wf.close()
return self
def play_and_detect(self, source_file, channels, rate, chunk, audio_min_rms, max_low_audio_flag, recording,
recording_file):
self.source_file = source_file
self.channels = channels
self.rate = rate
self.chunk = chunk
self.audio_min_rms = audio_min_rms
self.max_low_audio_flag = max_low_audio_flag
self.recording = recording
self.recording_file = recording_file
play_process = Process(target=self.play)
detect_process = Process(target=self.detect_audio)
play_process.start()
detect_process.start()
play_process.join()
detect_process.join()
return self
if __name__ == '__main__':
fire.Fire(Detector)
测试:
1. 播放音频文件;
python3 main.py - play --source_file=test.wav --chunk=1024
2. 单纯检测音频;
python3 main.py - detect_audio --channels=1 --rate=44100 --chunk=1024 --audio_min_rms=500 -max_low_audio_flag=100
3. 检测并录制音频;
python3 main.py - detect_audio --channels=1 --rate=44100 --chunk=1024 --audio_min_rms=500 -max_low_audio_flag=100 - record --recording_file=recording.wav
4. 播放音频的同时录制音频;
python3 main.py - play_and_detect --source_file=test.wav --channels=1 --rate=44100 --chunk=1024 --audio_min_rms=500 -max_low_audio_flag=100 --recording=True --recording_file=recording.wav
对于录制到的音频文件,如果要进行与原音频的比对,可能还需要用到降噪能力,简单的降噪模块如 noisereduce 等,复杂的咱也还不会呀,后续有机会咱们再继续研究这方面的知识;
好了,今天先玩到这里吧,我们改日再战~
非常感谢!