Created
July 31, 2024 12:16
-
-
Save Ian-Gabaraev/03c1ff63648d44a843a179ead0e0715a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import pyaudio | |
import numpy as np | |
import matplotlib.pyplot as plt | |
from celeryapps import ( | |
analyze, | |
display_device_name, | |
kill_screen, | |
) | |
from aws import upload_file_to_s3 | |
FORMAT = pyaudio.paInt16 # Audio format (16-bit PCM) | |
CHANNELS = 1 # Mono audio | |
RATE = 48000 # Sampling rate (48 kHz) | |
CHUNK = 2048 # Buffer size (increased for better averaging) | |
DURATION = None | |
DEVICE_INDEX = None | |
def convert_to_db(rms_value, reference=32767): | |
"""Convert an RMS value to decibels.""" | |
if rms_value == 0: | |
return -np.inf # Logarithm of zero is undefined, represents silence. | |
return 20 * np.log10(rms_value / reference) | |
def plot(rms_values, upload=False, show=True): | |
plt.figure(figsize=(10, 6)) | |
plt.plot(rms_values) | |
plt.xlabel("Time (chunks)") | |
plt.ylabel("RMS") | |
plt.title("Root Median Square (RMS) Values Over Time") | |
plt.grid(True) | |
if upload: | |
plt.savefig("rms_values.png") | |
upload_file_to_s3("rms_values.png", f"rms-{datetime.datetime.now()}.png") | |
if show: | |
plt.show() | |
def get_input_device_options(p: pyaudio.PyAudio): | |
for i in range(p.get_device_count()): | |
name = p.get_device_info_by_index(i)["name"] | |
channels = p.get_device_info_by_index(i)["maxInputChannels"] | |
print(f"Index: {i}, Name: {name}, Channels: {channels}") | |
def get_device_index(p): | |
if DEVICE_INDEX: | |
return DEVICE_INDEX | |
print("Choose the device index for recording") | |
get_input_device_options(p) | |
return int(input("Enter the device index: ")) | |
def get_duration(p): | |
if DURATION: | |
return DURATION | |
print("Choose duration of recording") | |
return int(input("Duration: ")) | |
def get_device_info(p, device_index): | |
return [ | |
p.get_device_info_by_index(device_index)["name"], | |
p.get_device_info_by_index(device_index)["maxInputChannels"], | |
] | |
def get_audio_stream(p, device_index): | |
print("Using ", get_device_info(p, device_index)) | |
stream = p.open( | |
format=FORMAT, | |
channels=CHANNELS, | |
rate=RATE, | |
input=True, | |
frames_per_buffer=CHUNK, | |
input_device_index=device_index, | |
) | |
print("Stream opened") | |
return stream | |
def launch(): | |
p = pyaudio.PyAudio() | |
device_index = get_device_index(p) | |
duration = get_duration(p) | |
stream = get_audio_stream(p, device_index) | |
relay_audio_to_celery(p, stream, duration) | |
def order_device_info_display(p, device_index): | |
device_name = p.get_device_info_by_index(device_index)["name"] | |
display_device_name.delay(device_name) | |
def relay_audio_to_celery(p, stream, duration): | |
rms_values = [] | |
rms_for_analysis = [] | |
for i in range(0, int(RATE / CHUNK * duration)): | |
data = stream.read(CHUNK) | |
audio_data = np.frombuffer(data, dtype=np.int16) | |
audio_data = audio_data.astype(np.int32) | |
rms = np.sqrt(np.mean(audio_data**2)) | |
rms_values.append(rms) | |
rms_for_analysis.append(float(rms)) | |
if len(rms_for_analysis) == 20: | |
print("Collected a batch of 20 RMS values") | |
analyze.delay(rms_for_analysis) | |
rms_for_analysis.clear() | |
stream.stop_stream() | |
stream.close() | |
p.terminate() | |
kill_screen.delay() | |
return rms_values | |
if __name__ == "__main__": | |
launch() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment