Last active
November 10, 2021 21:13
-
-
Save psobot/973dacfdc8699eff7d3e496897ba19d8 to your computer and use it in GitHub Desktop.
MIDI Sample Dump Receiver
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
MIDI Sample Dump receive script | |
by Peter Sobot, Nov 6, 2021 | |
@psobot / [email protected] | |
Requirements: pip install python-rtmidi tqdm numpy pysoundfile | |
""" | |
import argparse | |
import rtmidi | |
from tqdm import tqdm | |
import numpy as np | |
import soundfile as sf | |
from typing import Tuple | |
HEADER_TAG = 0x01 | |
PACKET_TAG = 0x02 | |
def get_midi_device_named(midi_identifier: str) -> Tuple[rtmidi.MidiIn, rtmidi.MidiOut]: | |
midi_out = rtmidi.MidiOut() | |
midi_in = rtmidi.MidiIn(queue_size_limit=8192) | |
midi_in.ignore_types(sysex=False) | |
in_port_names = set(midi_in.get_ports()) | |
out_port_names = set(midi_out.get_ports()) | |
bidirectional_port_names = in_port_names & out_port_names | |
if not bidirectional_port_names: | |
raise ValueError("No bidirectional MIDI interface found. Is one connected?") | |
if midi_identifier: | |
matching_port_names = [ | |
port_name | |
for port_name in bidirectional_port_names | |
if midi_identifier.lower() in port_name.lower() | |
] | |
if not matching_port_names: | |
raise ValueError( | |
f"No bidirectional MIDI interface containing {midi_identifier}" | |
f" found (options: {bidirectional_port_names})." | |
) | |
port_name = matching_port_names[0] | |
else: | |
port_name = bidirectional_port_names.pop() | |
return midi_in.get_ports().index(port_name), midi_out.get_ports().index(port_name) | |
def chunks(lst, n): | |
"""Yield successive n-sized chunks from lst.""" | |
for i in range(0, len(lst), n): | |
yield lst[i : i + n] | |
def main(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
"--interface-name", help="The name of the MIDI interface to use for dumping.", default="Uno" | |
) | |
parser.add_argument("--output-file", required=True, help="The output file to dump WAV data to.") | |
args = parser.parse_args() | |
if not args.output_file.endswith(".wav"): | |
raise ValueError("Output filename must end in '.wav'!") | |
midi_in_port, midi_out_port = get_midi_device_named(args.interface_name) | |
midiout = rtmidi.MidiOut() | |
midiout.open_port(midi_out_port) | |
midiin = rtmidi.MidiIn() | |
midiin.open_port(midi_in_port) | |
midiin.ignore_types(sysex=False) | |
print(f"Waiting to receive dump (initiate on device side)...") | |
last_packet_received = -1 | |
total_received = 0 | |
pbar = None | |
output_file = None | |
try: | |
while True: | |
message = midiin.get_message() | |
if message: | |
message_bytes, time = message | |
message_bytes = bytes(message_bytes) | |
# Check for the Sysex Sample Dump header: | |
if message_bytes[0:2] != b'\xf0\x7e': | |
continue | |
if message_bytes[3] == HEADER_TAG: | |
sample_resolution = message_bytes[6] | |
if sample_resolution != 16: | |
raise NotImplementedError( | |
"Support for bit depths other than 16-bit is not yet implemented." | |
) | |
sample_period_ns = ( | |
message_bytes[7] | (message_bytes[8] << 7) | (message_bytes[9] << 14) | |
) | |
sample_rate_hz = int(1_000_000_000 / sample_period_ns) | |
print( | |
f"Length in words bytes: {message_bytes[10]} {message_bytes[11]}" | |
f" {message_bytes[12]}" | |
) | |
length_in_words = ( | |
message_bytes[10] | (message_bytes[11] << 7) | (message_bytes[12] << 14) | |
) | |
if length_in_words == 1: | |
# Could be roll over! | |
length_in_words = 2097152 | |
print( | |
f"Receiving a dump of {length_in_words:,} samples at {sample_rate_hz:,}Hz" | |
f" {sample_resolution}-bit." | |
) | |
output_file = sf.SoundFile( | |
args.output_file, | |
'w', | |
samplerate=sample_rate_hz, | |
channels=1, | |
subtype='PCM_16' | |
) | |
pbar = tqdm(total=length_in_words) | |
elif message_bytes[3] == PACKET_TAG: | |
packet_number = message_bytes[4] | |
if packet_number != last_packet_received + 1: | |
print( | |
f"Warning: dropped packet! Received #{packet_number}, expected" | |
f" #{last_packet_received:,}." | |
) | |
continue | |
data = message_bytes[5:125] | |
# Decode data from 3-byte format: | |
for a, b, c in chunks(data, 3): | |
sample = (a << 9 | b << 2 | c >> 5) - 32768 | |
output_file.write(np.array([sample], dtype=np.int16)) | |
total_received += 1 | |
pbar.update() | |
output_file.flush() | |
# TODO: checksum! Second-last byte contains: | |
# This is the XOR of the bytes 0x7E, cc, 0x02, kk, and all | |
# 120 bytes of waveform data (with bit 7 of result masked off to 0). | |
if last_packet_received == 126: | |
last_packet_received = -1 | |
else: | |
last_packet_received += 1 | |
except KeyboardInterrupt: | |
pass | |
if pbar: | |
pbar.close() | |
if output_file: | |
output_file.close() | |
print(f"Done! Received {total_received:,} samples of data.") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment