Skip to content

Instantly share code, notes, and snippets.

@tb1402
Created May 30, 2021 13:59
Show Gist options
  • Save tb1402/2ae9b169f467a8492e3871779161c254 to your computer and use it in GitHub Desktop.
Save tb1402/2ae9b169f467a8492e3871779161c254 to your computer and use it in GitHub Desktop.
Record pulseaudio stream
import subprocess
import os
import time
'''
This script does the following:
- creat pulseaudio null sink
- prompt the user with a selection of all available audio streams
- moves the selected stream to the created null sink
- starts parec to record audio from the null sink and pipes the audio to sox so it can be written to a file
- creates a loopback stream so stream which is recorded is also audible
'''
null_sink_name = "recorder"
null_sink_id = -1
playback_during_recording = True
loopback_id = -1
def create_null_sink(sink_name):
global null_sink_id
pa_ctl_load = subprocess.Popen(['pactl', 'load-module', 'module-null-sink', f'sink_name="{sink_name}"'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
stdout_load, stderr_load = pa_ctl_load.communicate()
if stderr_load != "":
return False
null_sink_id = int(stdout_load.replace("\n", "").replace("\r", ""))
return True
def setup_loopback():
global loopback_id, null_sink_name
pa_ctl_load = subprocess.Popen(['pactl', 'load-module', 'module-loopback', f"source=\"{null_sink_name}.monitor\"", 'sink=0'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
stdout_load, stderr_load = pa_ctl_load.communicate()
if stderr_load != "":
return False
loopback_id = int(stdout_load.replace("\n", "").replace("\r", ""))
return True
def parse_streams():
pa_ctl_inputs = subprocess.Popen(['pactl', 'list', 'sink-inputs'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
stdout_list, stderr_list = pa_ctl_inputs.communicate()
if stderr_list != "":
print("Error while parsing streams")
exit(1)
raw_split = stdout_list.split("Ziel-Eingabe ")
parsed_streams = []
for sp in raw_split:
if sp == "": # skip empty entries
continue
stream_id = -1
name = None
muted = False
audio_info = None
# try to parse id
next_linebreak = sp.find("\n") # find next linebreak, as first line will be id, as first bit "Ziel Eingabe " is split target above
try:
stream_id = int(sp[1:next_linebreak]) # from 1, as first char is #, id has format #xxx
except ValueError:
print("Couldn't parse id")
pass
# try to parse name (combine media. and application.name property)
media_name_pos = sp.find("media.name = ")
if media_name_pos != -1:
buffer = sp[media_name_pos:]
next_linebreak = buffer.find("\n")
if next_linebreak != -1:
name = buffer[14:next_linebreak - 1]
application_name_pos = sp.find("application.name = ")
if application_name_pos != -1:
buffer = sp[application_name_pos:]
next_linebreak = buffer.find("\n")
if next_linebreak != -1:
if name is None:
name = buffer[20:next_linebreak - 1]
else:
name += " - " + buffer[20:next_linebreak - 1]
# try to parse mute state
mute_pos = sp.find("Stumm: ")
if mute_pos != -1:
buffer = sp[mute_pos:]
next_linebreak = buffer.find("\n")
if next_linebreak != -1:
word_state = buffer[7:next_linebreak]
muted = word_state != "nein"
# try to parse audio info
pos_info = sp.find("Abtastwert-Angabe: ")
if pos_info != -1:
buffer = sp[pos_info:]
next_linebreak = buffer.find("\n")
if next_linebreak != -1:
audio_info = buffer[19:next_linebreak]
parsed_streams.append({
"id": stream_id,
"name": name,
"muted": muted,
"info": audio_info
})
return parsed_streams
# execute pactl and check if null sink with specified name is already present
pa_ctl = subprocess.Popen(['pactl', 'list', 'sinks'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
stdout, stderr = pa_ctl.communicate()
if stderr != "":
print("List sinks failed")
exit(1)
# try to find sink with given name
pos = stdout.find(f"Name: {null_sink_name}")
if pos == -1:
if not create_null_sink(null_sink_name):
print("Failure while creating null sink")
exit(1)
else: # sink seems to be already there, extract id
id_pos = stdout[pos:].find("Besitzer-Modul: ") # find module id
if id_pos == -1:
print("There already is a sink with given name, but the id couldn't be extracted!")
exit(1)
null_sink_id = int(stdout[pos + id_pos + 16:pos + id_pos + 18])
print(str(null_sink_id))
# parse audio streams and prompt for selection
streams = parse_streams()
print("---- Available streams ----")
for x in range(0, len(streams)):
print(f"{x}: {streams[x]['name']}; Muted: {streams[x]['muted']}; {streams[x]['info']}")
# get index for stream which should be moved
raw = input("Enter index for stream that should be moved: ")
index = None
try:
index = int(raw)
except ValueError:
pass
try:
streams[index]
except (TypeError, IndexError):
print("Entered index wasn't found.")
exit(1)
print(f"Selected stream {streams[index]['name']}")
print("Moving")
# move stream to null sink
pa_ctl = subprocess.Popen(['pactl', 'move-sink-input', str(streams[index]["id"]), null_sink_name], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
stdout, stderr = pa_ctl.communicate()
if stderr != "":
print("Move stream failed" + stderr)
exit(1)
print("Moved")
if playback_during_recording:
print("Setting up loopback module...")
if not setup_loopback():
print("Failure on setting up loopback")
exit(1)
print("Starting recording")
# start recording
pa_rec = subprocess.Popen(['parec', '-d', f"{null_sink_name}.monitor", '--rate', '44100'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
sox = subprocess.Popen(['sox', '-c', '2', '-r', '44100', '-b', '16', '-e', 'signed', '--endian', 'little', '-S', '-t', 'raw', '-', 'recording.wav'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=pa_rec.stdout, text=True)
print("Ctrl+C to stop recording")
try:
while True:
os.system('clear')
out = sox.stderr.readline()
if sox.poll() is not None:
break
if out:
print(out.strip())
time.sleep(0.2)
except KeyboardInterrupt:
print("Stopping")
pa_rec.terminate()
sox.terminate()
if playback_during_recording:
print("Removing loopback")
pa_ctl = subprocess.Popen(['pactl', 'unload-module', str(loopback_id)], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
stdout, stderr = pa_ctl.communicate()
if stderr != "":
print("Failure on removing loopback " + stderr)
exit(1)
print("Deleting null sink")
pa_ctl = subprocess.Popen(['pactl', 'unload-module', str(null_sink_id)], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
stdout, stderr = pa_ctl.communicate()
if stderr != "":
print("Failure on removing null sink " + stderr)
exit(1)
@tb1402
Copy link
Author

tb1402 commented May 30, 2021

This script uses the german version of the pactl command output. If it is used on a system which has the language set to something other than german, the strings to search for will need adjustment or the script will not work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment