Last active
December 1, 2016 06:50
-
-
Save tgarc/5ffbc84f27b42e1aa357c8875cd83524 to your computer and use it in GitHub Desktop.
daemon for portaudio
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python2 | |
""" | |
Daemon for full duplex streaming with PyAudio | |
Note that recordings are always delayed by some amount and are therefore longer than the input file | |
Builds off of mpex.py: https://gist.github.com/tgarc/7120c00eb3eb5f736be93c70ade1d68a | |
""" | |
import multiprocessing, threading, Queue | |
import time | |
import sys | |
import urllib2 | |
import pyaudio as pa | |
import fnmatch | |
import numpy as np | |
import traceback | |
from cStringIO import StringIO | |
IPCMAXBLOCKFSZ = 4096 # a size that is guaranteed to be >= the IO block size | |
QSIZE = 8 # queue size needs to vary depending on | |
# the ratio of IPC block size to IO block | |
# size | |
DEBUG = 1 | |
VERBOSE = 1 | |
class PortAudio(pa.PyAudio): | |
@classmethod | |
def get_callback_status_strings(cls, status): | |
return [v for bm,v in cls._cbstatus_bitmap.iteritems() if status&bm] | |
def query(self, hostApi, devname): | |
for i in range(self.get_host_api_count()): | |
api = self.get_host_api_info_by_index(i) | |
if api['name'].lower() == hostApi.lower(): | |
hostApi = api['index'] | |
break | |
else: | |
raise IOError("Host API %s not found." % hostApi) | |
qname = devname.lower() | |
for i in range(self.get_device_count()): | |
dev = self.get_device_info_by_index(i) | |
dname = dev['name'].lower() | |
if dev['hostApi'] == hostApi and (fnmatch.fnmatch(dname, qname) or dname.startswith(qname)): | |
return dev | |
raise IOError("Match not found for device name expression %r." % devname) | |
def list_streams(self): | |
for i in range(self.get_device_count()): | |
dev = self.get_device_info_by_index(i) | |
api = self.get_host_api_info_by_index(dev['hostApi']) | |
print '%s (%s): %d in, %d out' % (dev['name'], | |
api['name'], | |
dev['maxInputChannels'], | |
dev['maxOutputChannels']) | |
class PyAudioProcess(multiprocessing.Process): | |
def __init__(self, pipe, device, host_api, rate, channels, pastreamfmt, maxframeblocksize, buffsize=10, timeout=1, **kwargs): | |
super(PyAudioProcess, self).__init__(target=self.main, args=(pipe, device, host_api, rate, channels, pastreamfmt, maxframeblocksize, buffsize), **kwargs) | |
self.exit = multiprocessing.Event() | |
self.full = multiprocessing.Condition() | |
self.ready = multiprocessing.Event() | |
self.timeout = timeout | |
self.blocksize = pa.pa.get_sample_size(pastreamfmt)*maxframeblocksize*channels | |
self.count = 0 | |
self.wridx = 0 | |
self.rdidx = 0 | |
def main(self, pipe, device, host_api, rate, channels, pastreamfmt, maxframeblocksize, buffsize=10): | |
# initialize portaudio and find device | |
painst = PortAudio() | |
padev = painst.query(host_api, device) | |
inpbuff = bytearray(self.blocksize*buffsize) | |
self.dbglog = StringIO() | |
# Wait for master to ready | |
while not self.ready.is_set(): | |
if self.exit.is_set(): break | |
time.sleep(0.1) | |
# main loop | |
while not self.exit.is_set(): | |
print >> self.dbglog, "READY!\n------" | |
self.ready.clear() | |
# we *must* write to the inp buffer before doing anything or the | |
# buffer checking logic will breakdown | |
for i in range(4): | |
self.full.acquire(); self.full.notify(); self.full.release() | |
self.wridx += pipe.recv_bytes_into(inpbuff, self.wridx) | |
# do the stream thing | |
try: | |
self.playrec(pipe, inpbuff, painst, padev['index'], rate, channels, pastreamfmt, timeout=self.timeout) | |
except KeyboardInterrupt: | |
break | |
# wait for master to ready again | |
while not self.ready.is_set(): | |
if self.exit.is_set(): break | |
time.sleep(0.1) | |
# cleanup | |
self.count = self.wridx = self.rdidx = 0 | |
with open('padaemon.log', 'w') as dbglog: | |
print >> dbglog, self.dbglog.getvalue() | |
pipe.close() | |
painst.terminate() | |
def _read(self, inpbuff, blocksize): | |
wridx = self.wridx | |
# read logic is a bit more complex than the write since we don't | |
# necessarily know the required chunk size beforehand, so we may be | |
# required to read around the edges of the receive buffer | |
nextidx = self.rdidx+blocksize | |
# we assume that once the read index catches up to the write | |
# index the stream is finished (this is technically an underflow | |
# condition, but we just assume underflow will never happen) | |
if self.rdidx <= wridx < nextidx: | |
nextidx = wridx | |
if nextidx >= len(inpbuff): # corner case: wrap around read | |
nextidx %= len(inpbuff) | |
if wridx < nextidx: # we've reached the write index | |
nextidx = wridx | |
indata = inpbuff[self.rdidx:] | |
if nextidx > 0: indata += inpbuff[:nextidx] | |
else: | |
indata = inpbuff[self.rdidx:nextidx] | |
self.rdidx = nextidx | |
return indata | |
def _receive(self, pipe, inpbuff): | |
# no more rx buffer space; wait for processing | |
if self.wridx <= self.rdidx <= self.wridx+self.blocksize: | |
return -1 | |
# Logic is a bit simplified here since we can guarantee buffer | |
# size is a multiple of the transmission size (except for the | |
# last chunk which will be smaller) | |
self.full.acquire() | |
self.full.notify() | |
self.full.release() | |
nbytes = pipe.recv_bytes_into(inpbuff, self.wridx) | |
self.wridx = (self.wridx+nbytes) % len(inpbuff) | |
if DEBUG: print >> self.dbglog, "RX:", self.count, self.wridx, self.rdidx, abs(self.wridx-self.rdidx) | |
return nbytes | |
def playrec(self, pipe, inpbuff, painst, devidx, rate, channels, pastreamfmt, timeout=1): | |
q = Queue.Queue(QSIZE) | |
itemsize = pa.pa.get_sample_size(pastreamfmt) | |
framesize = channels*itemsize | |
self.last_time = 0 | |
def pamincallback(in_data, frame_count, time_info, status): | |
pcsr = pa.paContinue | |
if DEBUG: | |
print >> self.dbglog, self.count, self.wridx, self.rdidx, abs(self.wridx-self.rdidx), q.qsize(), 1000*(time_info['current_time'] - self.last_time) | |
self.last_time = time_info['current_time'] | |
# send off input data | |
try: | |
q.put((status, in_data), block=False) | |
except Queue.Full: | |
traceback.print_exc() | |
q.queue.clear(); q.put(None) | |
return '', pa.paAbort | |
# read in the next block of frames | |
txbuff = self._read(inpbuff, frame_count*framesize) | |
out_data = np.frombuffer(txbuff, dtype='int32') | |
# This is our last callback! | |
if len(out_data) < frame_count*channels: | |
if DEBUG: | |
print >> self.dbglog, self.count, self.wridx, self.rdidx, abs(self.wridx-self.rdidx), q.qsize(), time_info['current_time'] | |
print >> self.dbglog, "----\nDone" | |
pcsr = pa.paComplete | |
q.put(None) | |
self.count += frame_count | |
return (out_data, pcsr) | |
stream = painst.open(rate=rate, channels=channels, | |
format=pastreamfmt, input=True, output=True, | |
frames_per_buffer=0, # Let backend decide buffersize | |
input_device_index=devidx, | |
output_device_index=devidx, | |
stream_callback=pamincallback, | |
start=False) | |
try: | |
stream.start_stream() | |
while stream.is_active() and not self.exit.is_set(): | |
item = q.get(timeout=timeout) | |
pipe.send(item) | |
if item is None: break | |
nbytes = self._receive(pipe, inpbuff) | |
if nbytes == -1: # buffer's full right now | |
continue | |
elif nbytes < self.blocksize: # we must be done | |
while True: | |
item = q.get() | |
pipe.send(item) | |
if item is None: break | |
break | |
finally: | |
stream.close() | |
# @classmethod | |
# def get_streamer(cls, *args, **kwargs): | |
# hostpipe, clientpipe = multiprocessing.Pipe() | |
# pap = cls(clientpipe, *args, **kwargs) | |
# pap.start(); clientpipe.close() | |
# yield functools.partial(pap.stream, hostpipe) | |
# pap.exit.set(); pap.join(); hostpipe.close() | |
def stream(self, pipe, reader=None, writer=None): | |
assert self.is_alive(), "You need to start a PyAudioProcess before you can start streaming!" | |
def pipewriter(self, pipe, reader, fill): | |
self.full.acquire() | |
while True: | |
self.full.wait() | |
if reader is None: outdata = fill | |
else: outdata = reader(self.blocksize) | |
if len(outdata): pipe.send_bytes(outdata) | |
if len(outdata) < self.blocksize: break | |
zeros = '\x00'*self.blocksize | |
prp = threading.Thread(target=pipewriter, args=(self, pipe, reader), kwargs={'fill': zeros}) | |
prp.daemon = True | |
prp.start() | |
self.ready.set() | |
while not self.exit.is_set(): | |
if pipe.poll(): | |
try: item = pipe.recv() | |
except EOFError: done=True; break | |
if item is None: done=True; break | |
stat, indata = item | |
if stat != 0: | |
raise multiprocessing.ProcessError("PortAudioError: { %s }" % ', '.join(PortAudio.get_callback_status_strings(stat))) | |
if writer is not None: writer(indata) | |
def main(argv=None): | |
if argv is None: argv=sys.argv[1:] | |
if argv[0] == '-l': | |
PortAudio().list_streams() | |
sys.exit(0) | |
inpf, outf = argv[:2] | |
try: | |
if argv[2] == '--loop': | |
loop = True | |
except IndexError: | |
loop = False | |
if inpf == '-': | |
inp_fh = sys.stdin | |
elif inpf == 'null': | |
inp_fh = None | |
elif inpf.startswith('http'): | |
inp_fh = urllib2.urlopen(inpf) | |
else: | |
inp_fh = open(inpf, 'rb') | |
if outf == '-': | |
out_fh = sys.stdout | |
elif outf == 'null': | |
out_fh = None | |
else: | |
out_fh = open(outf, 'wb+') | |
# define the device stream here | |
# dev, api, rate, channels, streamfmt = '*ALC3232*', 'alsa', 48000, 2, pa.paInt32 | |
dev, api, rate, channels, streamfmt = 'mini', 'asio', 48000, 8, pa.paInt32 | |
hostpipe, clientpipe = multiprocessing.Pipe() | |
pap = PyAudioProcess(clientpipe, dev, api, rate, channels, streamfmt, IPCMAXBLOCKFSZ, timeout=1, name='pyaudio') | |
statline = "{{{:^9d}}} {:6.3f}s {:>10s} bytes sent, {:>10s} bytes received\r" | |
pap.start() | |
clientpipe.close() | |
printout = out_fh != sys.stdout and VERBOSE | |
try: | |
while pap.is_alive(): | |
thread = threading.Thread(target=pap.stream, args=(hostpipe, inp_fh.read if inp_fh else None, out_fh.write if out_fh else None), name='streamer') | |
t1 = time.time() | |
i = 0 | |
thread.start() | |
while thread.is_alive(): | |
time.sleep(0.01) | |
if printout: | |
sys.stdout.write(statline.format(i, time.time()-t1, str(inp_fh.tell()) if inp_fh else '-', str(out_fh.tell()) if out_fh else '-')) | |
i += 1 | |
if not loop: break | |
if printout: print | |
inp_fh.seek(0) | |
except KeyboardInterrupt: | |
if DEBUG: raise | |
except: | |
pap.terminate() | |
raise | |
finally: | |
if printout: print | |
pap.exit.set() | |
hostpipe.close() | |
if out_fh is not None: out_fh.close() | |
return 0 | |
if __name__ == '__main__': | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment