Created
May 30, 2021 13:59
-
-
Save tb1402/2ae9b169f467a8492e3871779161c254 to your computer and use it in GitHub Desktop.
Record pulseaudio stream
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import subprocess | |
import os | |
import time | |
''' | |
This script does the following: | |
- creat pulseaudio null sink | |
- prompt the user with a selection of all available audio streams | |
- moves the selected stream to the created null sink | |
- starts parec to record audio from the null sink and pipes the audio to sox so it can be written to a file | |
- creates a loopback stream so stream which is recorded is also audible | |
''' | |
null_sink_name = "recorder" | |
null_sink_id = -1 | |
playback_during_recording = True | |
loopback_id = -1 | |
def create_null_sink(sink_name): | |
global null_sink_id | |
pa_ctl_load = subprocess.Popen(['pactl', 'load-module', 'module-null-sink', f'sink_name="{sink_name}"'], | |
stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) | |
stdout_load, stderr_load = pa_ctl_load.communicate() | |
if stderr_load != "": | |
return False | |
null_sink_id = int(stdout_load.replace("\n", "").replace("\r", "")) | |
return True | |
def setup_loopback(): | |
global loopback_id, null_sink_name | |
pa_ctl_load = subprocess.Popen(['pactl', 'load-module', 'module-loopback', f"source=\"{null_sink_name}.monitor\"", 'sink=0'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) | |
stdout_load, stderr_load = pa_ctl_load.communicate() | |
if stderr_load != "": | |
return False | |
loopback_id = int(stdout_load.replace("\n", "").replace("\r", "")) | |
return True | |
def parse_streams(): | |
pa_ctl_inputs = subprocess.Popen(['pactl', 'list', 'sink-inputs'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) | |
stdout_list, stderr_list = pa_ctl_inputs.communicate() | |
if stderr_list != "": | |
print("Error while parsing streams") | |
exit(1) | |
raw_split = stdout_list.split("Ziel-Eingabe ") | |
parsed_streams = [] | |
for sp in raw_split: | |
if sp == "": # skip empty entries | |
continue | |
stream_id = -1 | |
name = None | |
muted = False | |
audio_info = None | |
# try to parse id | |
next_linebreak = sp.find("\n") # find next linebreak, as first line will be id, as first bit "Ziel Eingabe " is split target above | |
try: | |
stream_id = int(sp[1:next_linebreak]) # from 1, as first char is #, id has format #xxx | |
except ValueError: | |
print("Couldn't parse id") | |
pass | |
# try to parse name (combine media. and application.name property) | |
media_name_pos = sp.find("media.name = ") | |
if media_name_pos != -1: | |
buffer = sp[media_name_pos:] | |
next_linebreak = buffer.find("\n") | |
if next_linebreak != -1: | |
name = buffer[14:next_linebreak - 1] | |
application_name_pos = sp.find("application.name = ") | |
if application_name_pos != -1: | |
buffer = sp[application_name_pos:] | |
next_linebreak = buffer.find("\n") | |
if next_linebreak != -1: | |
if name is None: | |
name = buffer[20:next_linebreak - 1] | |
else: | |
name += " - " + buffer[20:next_linebreak - 1] | |
# try to parse mute state | |
mute_pos = sp.find("Stumm: ") | |
if mute_pos != -1: | |
buffer = sp[mute_pos:] | |
next_linebreak = buffer.find("\n") | |
if next_linebreak != -1: | |
word_state = buffer[7:next_linebreak] | |
muted = word_state != "nein" | |
# try to parse audio info | |
pos_info = sp.find("Abtastwert-Angabe: ") | |
if pos_info != -1: | |
buffer = sp[pos_info:] | |
next_linebreak = buffer.find("\n") | |
if next_linebreak != -1: | |
audio_info = buffer[19:next_linebreak] | |
parsed_streams.append({ | |
"id": stream_id, | |
"name": name, | |
"muted": muted, | |
"info": audio_info | |
}) | |
return parsed_streams | |
# execute pactl and check if null sink with specified name is already present | |
pa_ctl = subprocess.Popen(['pactl', 'list', 'sinks'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) | |
stdout, stderr = pa_ctl.communicate() | |
if stderr != "": | |
print("List sinks failed") | |
exit(1) | |
# try to find sink with given name | |
pos = stdout.find(f"Name: {null_sink_name}") | |
if pos == -1: | |
if not create_null_sink(null_sink_name): | |
print("Failure while creating null sink") | |
exit(1) | |
else: # sink seems to be already there, extract id | |
id_pos = stdout[pos:].find("Besitzer-Modul: ") # find module id | |
if id_pos == -1: | |
print("There already is a sink with given name, but the id couldn't be extracted!") | |
exit(1) | |
null_sink_id = int(stdout[pos + id_pos + 16:pos + id_pos + 18]) | |
print(str(null_sink_id)) | |
# parse audio streams and prompt for selection | |
streams = parse_streams() | |
print("---- Available streams ----") | |
for x in range(0, len(streams)): | |
print(f"{x}: {streams[x]['name']}; Muted: {streams[x]['muted']}; {streams[x]['info']}") | |
# get index for stream which should be moved | |
raw = input("Enter index for stream that should be moved: ") | |
index = None | |
try: | |
index = int(raw) | |
except ValueError: | |
pass | |
try: | |
streams[index] | |
except (TypeError, IndexError): | |
print("Entered index wasn't found.") | |
exit(1) | |
print(f"Selected stream {streams[index]['name']}") | |
print("Moving") | |
# move stream to null sink | |
pa_ctl = subprocess.Popen(['pactl', 'move-sink-input', str(streams[index]["id"]), null_sink_name], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) | |
stdout, stderr = pa_ctl.communicate() | |
if stderr != "": | |
print("Move stream failed" + stderr) | |
exit(1) | |
print("Moved") | |
if playback_during_recording: | |
print("Setting up loopback module...") | |
if not setup_loopback(): | |
print("Failure on setting up loopback") | |
exit(1) | |
print("Starting recording") | |
# start recording | |
pa_rec = subprocess.Popen(['parec', '-d', f"{null_sink_name}.monitor", '--rate', '44100'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) | |
sox = subprocess.Popen(['sox', '-c', '2', '-r', '44100', '-b', '16', '-e', 'signed', '--endian', 'little', '-S', '-t', 'raw', '-', 'recording.wav'], | |
stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=pa_rec.stdout, text=True) | |
print("Ctrl+C to stop recording") | |
try: | |
while True: | |
os.system('clear') | |
out = sox.stderr.readline() | |
if sox.poll() is not None: | |
break | |
if out: | |
print(out.strip()) | |
time.sleep(0.2) | |
except KeyboardInterrupt: | |
print("Stopping") | |
pa_rec.terminate() | |
sox.terminate() | |
if playback_during_recording: | |
print("Removing loopback") | |
pa_ctl = subprocess.Popen(['pactl', 'unload-module', str(loopback_id)], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) | |
stdout, stderr = pa_ctl.communicate() | |
if stderr != "": | |
print("Failure on removing loopback " + stderr) | |
exit(1) | |
print("Deleting null sink") | |
pa_ctl = subprocess.Popen(['pactl', 'unload-module', str(null_sink_id)], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) | |
stdout, stderr = pa_ctl.communicate() | |
if stderr != "": | |
print("Failure on removing null sink " + stderr) | |
exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This script uses the german version of the pactl command output. If it is used on a system which has the language set to something other than german, the strings to search for will need adjustment or the script will not work.