Last active
September 1, 2024 11:47
-
-
Save avipars/e41d76f01fd971f841ffa99f1ce110d4 to your computer and use it in GitHub Desktop.
audio_watermark_spectrogram.ipynb
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"nbformat": 4, | |
"nbformat_minor": 0, | |
"metadata": { | |
"colab": { | |
"provenance": [], | |
"toc_visible": true, | |
"authorship_tag": "ABX9TyMpNaKz5WcyWAzawV5IgwzT", | |
"include_colab_link": true | |
}, | |
"kernelspec": { | |
"name": "python3", | |
"display_name": "Python 3" | |
}, | |
"language_info": { | |
"name": "python" | |
} | |
}, | |
"cells": [ | |
{ | |
"cell_type": "markdown", | |
"metadata": { | |
"id": "view-in-github", | |
"colab_type": "text" | |
}, | |
"source": [ | |
"<a href=\"https://colab.research.google.com/gist/avipars/e41d76f01fd971f841ffa99f1ce110d4/audio_watermark_spectrogram.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"source": [ | |
"# WAV Stegoanography / Watermarking\n", | |
"\n" | |
], | |
"metadata": { | |
"id": "ZDxE-AJ6rak9" | |
} | |
}, | |
{ | |
"cell_type": "markdown", | |
"source": [ | |
"Slightly modified and adapted from this github repo: https://github.com/DrSDR/Audio-Spectrogram-\n", | |
"\n", | |
"\n", | |
"1. Includes plots/graphics\n", | |
"2. Now in jupyter notebook\n", | |
"3. See intermediary results and pictures" | |
], | |
"metadata": { | |
"id": "8tZkOw9KvbH-" | |
} | |
}, | |
{ | |
"cell_type": "markdown", | |
"source": [ | |
"# Install Dependencies" | |
], | |
"metadata": { | |
"id": "1Qeu3VhPrwsx" | |
} | |
}, | |
{ | |
"cell_type": "code", | |
"source": [ | |
"!pip install Pillow\n", | |
"!pip install scipy\n", | |
"!pip install matplotlib" | |
], | |
"metadata": { | |
"id": "2KQA-oV-slXH" | |
}, | |
"execution_count": null, | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "markdown", | |
"source": [ | |
"# File Upload" | |
], | |
"metadata": { | |
"id": "YQQIOPWqr6sp" | |
} | |
}, | |
{ | |
"cell_type": "code", | |
"source": [ | |
"!mkdir outputs\n", | |
"from google.colab import files #upload and download\n", | |
"\n", | |
"print(\"All of the files are saved temporarily on colab (till the runtime shuts down)\")\n", | |
"\n", | |
"print(\"Upload your watermark image (png preferred)\")\n", | |
"watermarked = files.upload()\n", | |
"\n", | |
"watermarked_val = list(watermarked.values())[0]\n", | |
"watermarked_path = list(watermarked.keys())[0]" | |
], | |
"metadata": { | |
"id": "PxNbGF8NsCJG" | |
}, | |
"execution_count": null, | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "code", | |
"source": [ | |
"print(\"Upload original audio file (wav)\")\n", | |
"original_audio = files.upload()\n", | |
"\n", | |
"original_audio_val = list(original_audio.values())[0]\n", | |
"original_audio_path = list(original_audio.keys())[0]" | |
], | |
"metadata": { | |
"id": "3LiDDxhRtPu3" | |
}, | |
"execution_count": null, | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "code", | |
"source": [ | |
"# @title\n", | |
"# for debugging or if you already uploaded the files and don't want to do that again, uncomment the following and run it\n", | |
"\n", | |
"#watermarked_path = \"watermark.png\"\n", | |
"# original_audio_path = \"colaco_jingle_stereo.wav\"" | |
], | |
"metadata": { | |
"id": "eZTzM5M9BVn2" | |
}, | |
"execution_count": null, | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "markdown", | |
"source": [ | |
"# Configuration\n" | |
], | |
"metadata": { | |
"id": "1JPiN5MnEyTj" | |
} | |
}, | |
{ | |
"cell_type": "code", | |
"source": [ | |
"# @title Settings { run: \"auto\" }\n", | |
"print(\"to_flip = Mirror the Image\")\n", | |
"print(\"to_rotate90 = Rotate image 90 degrees\")\n", | |
"print(\"to_resize = Resize the Image or not (if latter, ignore the new width and new height)\")\n", | |
"to_flip = True # @param {\"type\":\"boolean\",\"placeholder\":\"Flip the image\"}\n", | |
"to_rotate90 = False # @param {\"type\":\"boolean\",\"placeholder\":\"Rotate image 90 degrees\"}\n", | |
"to_resize = True # @param {\"type\":\"boolean\",\"placeholder\":\"Resize the image or not\"}\n", | |
"\n", | |
"print(\"channel: What channel should the watermark go on\")\n", | |
"channel = \"Right\" # @param [\"Left\", \"Right\"]\n", | |
"if channel == \"Left\":\n", | |
" watermark_channel = 0\n", | |
"else:\n", | |
" watermark_channel = 1\n", | |
"\n", | |
"print(\"from 0 to 1, how birght the watermark will be\")\n", | |
"watermark_strength = 0.2 # @param {type:\"slider\", min:0, max:1, step:0.1}\n" | |
], | |
"metadata": { | |
"id": "uERuqF8q1KUN" | |
}, | |
"execution_count": null, | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "code", | |
"source": [ | |
"# @title Resizing\n", | |
"from PIL import Image # image magic\n", | |
"\n", | |
"img = Image.open(watermarked_path)\n", | |
"width, height = img.size\n", | |
"\n", | |
"print(f\"Original size W:{width}x H:{height}\")\n", | |
"\n", | |
"new_width = 400 # @param {\"type\":\"number\",\"placeholder\":\"New Width\", min:5}\n", | |
"new_height = 400 # @param {\"type\":\"number\",\"placeholder\":\"New Height\", min:5}\n", | |
"\n", | |
"if to_resize:\n", | |
" img = img.resize((new_width, new_height))" | |
], | |
"metadata": { | |
"id": "a9RVs4DR7IcO" | |
}, | |
"execution_count": null, | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "markdown", | |
"source": [ | |
"#Processing image and audio" | |
], | |
"metadata": { | |
"id": "c64DfijFt-mb" | |
} | |
}, | |
{ | |
"cell_type": "code", | |
"source": [ | |
"import numpy as np # fancy arrays\n", | |
"import matplotlib.pyplot as plt #plotting stuff\n", | |
"def load_and_process_image(img, to_flip, to_rotate90):\n", | |
" data = np.array(img, dtype='float')\n", | |
" data = 0.2989*data[:,:,0] + 0.5870*data[:,:,1] + 0.1140*data[:,:,2] # convert to grayscale old fashioned way\n", | |
" data = data / np.max(data) # normalize it\n", | |
"\n", | |
" if to_flip:\n", | |
" data = np.flip(data, axis=0) # flip it\n", | |
"\n", | |
" if to_rotate90:\n", | |
" data = np.rot90(data, k=1, axes=(0,1)) # rotate 90 degrees\n", | |
" return data\n", | |
"\n", | |
"image_data = load_and_process_image(img, to_flip, to_rotate90)\n", | |
"plt.imshow(image_data,cmap=\"gray\") # show image in colab\n", | |
"plt.show()" | |
], | |
"metadata": { | |
"id": "3ZrOX3rquFZ9" | |
}, | |
"execution_count": null, | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "code", | |
"source": [ | |
"from scipy.io import wavfile\n", | |
"def create_watermark_signal(image_data, fs, og_fs=24000):\n", | |
" h, w = image_data.shape\n", | |
" phdata = np.random.randn(h, w)\n", | |
" phdata = 23 * phdata\n", | |
" phdata = np.exp(1j * phdata)\n", | |
" data = image_data * phdata\n", | |
"\n", | |
" d2 = data\n", | |
" d1 = np.flip(data, axis=1) # flip again\n", | |
" d1 = d1[:, 0:-1]\n", | |
" d1 = np.conjugate(d1)\n", | |
" data = np.concatenate((d1, data), axis=1)\n", | |
" data = np.fft.ifftshift(data, axes=1) # inverse fast fourier transform\n", | |
" data = np.fft.ifft(data, axis=1)\n", | |
"\n", | |
" data = data.flatten()\n", | |
" data = np.real(data)\n", | |
" data = data / np.max(data)\n", | |
" data = np.multiply(data, 32767) #16 bit integer bound\n", | |
" data = data.astype(np.int16)\n", | |
"\n", | |
" # Adjust the length of the watermark signal to match the input audio\n", | |
" target_length = int(len(data) * (fs / og_fs)) # og_fs = 24000 is the original fs in the provided code\n", | |
" data = np.interp(np.linspace(0, len(data), target_length), np.arange(len(data)), data)\n", | |
" return data\n", | |
"\n", | |
"def embed_watermark(input_wav, output_wav, watermark_signal, watermark_channel=1, watermark_strength=0.1):\n", | |
" \"\"\"\n", | |
" watermark_channel 0 = left\n", | |
" watermark_channel 1 = right\n", | |
" \"\"\"\n", | |
"\n", | |
" # Load the input WAV file\n", | |
" fs, audio = wavfile.read(input_wav) #fs = Sample rate of WAV file.\n", | |
"\n", | |
" # ensure the audio is stereo, if not then have the same audio track go to both\n", | |
" if len(audio.shape) == 1:\n", | |
" print(\"Audio is mono, converting to stereo\")\n", | |
" audio = np.column_stack((audio, audio))\n", | |
"\n", | |
" # Adjust watermark length to match audio length\n", | |
" if len(watermark_signal) > len(audio):\n", | |
" print(\"Warning: Watermark length is longer than audio length. Padding with zeros.\")\n", | |
" watermark_signal = watermark_signal[:len(audio)]\n", | |
" else:\n", | |
"\n", | |
" watermark_signal = np.pad(watermark_signal, (0, len(audio) - len(watermark_signal)))\n", | |
"\n", | |
" # Embed the watermark in the specified channel from start\n", | |
" audio[:, watermark_channel] = audio[:, watermark_channel] + (watermark_signal * watermark_strength).astype(np.int16)\n", | |
" wavfile.write(output_wav, original_fs, audio) # Save the watermarked audio\n", | |
" return audio\n" | |
], | |
"metadata": { | |
"id": "babXmVSWuLwc" | |
}, | |
"execution_count": null, | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "markdown", | |
"source": [ | |
"Now we can save the new output" | |
], | |
"metadata": { | |
"id": "srBZ_lXK0jas" | |
} | |
}, | |
{ | |
"cell_type": "code", | |
"source": [ | |
"import time # filename { run: \"auto\" }\n", | |
"print(\"Processing the audio and image...\")\n", | |
"\n", | |
"original_fs, original_audio = wavfile.read(original_audio_path) #fs = Sample rate of WAV file.\n", | |
"print(f\"Sampling rate {original_fs}\")\n", | |
"\n", | |
"watermark_signal = create_watermark_signal(image_data, 24000)\n", | |
"output_wav = 'outputs/watermarked_output{}.wav'.format(str(int(time.time()))[-5:])\n", | |
"watermarked_audio = embed_watermark(original_audio_path, output_wav,watermark_signal, watermark_channel,watermark_strength)" | |
], | |
"metadata": { | |
"id": "BHaW2i7IuRmQ" | |
}, | |
"execution_count": null, | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "code", | |
"source": [ | |
"# make a 30 second one first\n", | |
"watermarked_audio = watermarked_audio[:original_fs * 30]\n", | |
"# save that and then if i like it, ill wait for the full\n", | |
"short_output_wav = 'outputs/short_watermarked_output{}.wav'.format(str(int(time.time()))[-5:])\n", | |
"print(f\"short Watermarked audio saved as {short_output_wav}\")\n", | |
"\n", | |
"wavfile.write(short_output_wav, original_fs, watermarked_audio)\n", | |
"files.download(short_output_wav)" | |
], | |
"metadata": { | |
"id": "C9UhKS26PL_N" | |
}, | |
"execution_count": null, | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "code", | |
"source": [ | |
"print(f\"Watermarked audio saved as {output_wav}\")\n", | |
"\n", | |
"files.download(output_wav)" | |
], | |
"metadata": { | |
"id": "rbFpNExwVhOU" | |
}, | |
"execution_count": null, | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "markdown", | |
"source": [ | |
"#Spectrogram and other Graphs" | |
], | |
"metadata": { | |
"id": "NjHkctTvyv5M" | |
} | |
}, | |
{ | |
"cell_type": "code", | |
"source": [ | |
"# plots\n", | |
"from scipy.signal import spectrogram # for graphs\n", | |
"\n", | |
"\n", | |
"def plot_spectrogram(audio_data, sample_rate, title=\"Spectrogram\", duration=None):\n", | |
" \"\"\"\n", | |
" Plots the spectrogram of the provided audio data.\n", | |
"\n", | |
" Args:\n", | |
" audio_data: The audio data as a NumPy array.\n", | |
" sample_rate: The sample rate of the audio data.\n", | |
" title: The title for the spectrogram plot (default: \"Spectrogram\").\n", | |
" \"\"\"\n", | |
" frequencies, times, Sxx = spectrogram(audio_data, sample_rate, nperseg=1024)\n", | |
" Sxx_dB = 10 * np.log10(Sxx) # decibels are on the logarithmic scale\n", | |
"\n", | |
" plt.figure(figsize=(10, 6))\n", | |
" plt.pcolormesh(times, frequencies, Sxx_dB, shading='gouraud', cmap='inferno')\n", | |
" plt.ylabel('Hz')\n", | |
" plt.xlabel('Time [sec]')\n", | |
" plt.colorbar(label='Intensity [dB]')\n", | |
" plt.tight_layout()\n", | |
"\n", | |
" plt.title(title)\n", | |
" plt.show()\n", | |
"\n", | |
"\n", | |
"def channels(audio_data):\n", | |
" if len(audio_data.shape) == 2: # Check if stereo and plot accordingly\n", | |
" left_channel, right_channel = audio_data.T\n", | |
" plot_spectrogram(left_channel, original_fs, title=\"Left Channel Spectrogram\")\n", | |
" plot_spectrogram(right_channel, original_fs, title=\"Right Channel Spectrogram\")\n", | |
" else:\n", | |
" plot_spectrogram(audio_data, original_fs)\n", | |
"\n", | |
"# Extract the first 10 seconds of audio data\n", | |
"num_samples_10sec = original_fs * 10\n", | |
"first_10sec_audio_og = original_audio[:num_samples_10sec]\n", | |
"first_10sec_audio_watermarked = watermarked_audio[:num_samples_10sec]\n", | |
"# Load the WAV file\n", | |
"original_fs, original_audio = wavfile.read(original_audio_path) #fs = Sample rate of WAV file.\n", | |
"print(\"original\")\n", | |
"channels(first_10sec_audio_og)\n", | |
"print(\"watermarked\")\n", | |
"channels(first_10sec_audio_watermarked)" | |
], | |
"metadata": { | |
"id": "pumVF_bCy0Hu" | |
}, | |
"execution_count": null, | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "code", | |
"source": [ | |
"def plot_waveform(original_audio, watermarked_audio, fs):\n", | |
" \"\"\"\n", | |
" This won't reveal the watermark but is kinda cool to see if you dont have audio editing software\n", | |
" \"\"\"\n", | |
" fig, axs = plt.subplots(2, 1, figsize=(12, 20))\n", | |
" # Plot waveforms\n", | |
" axs[0].plot(original_audio[:, 0], label='Original Ch0')\n", | |
" axs[0].plot(original_audio[:, 1], label='Original Ch1')\n", | |
" axs[0].set_title('Original Audio Waveform')\n", | |
" axs[0].set_xlabel('Time')\n", | |
" axs[0].set_ylabel('Hz')\n", | |
" axs[0].legend()\n", | |
"\n", | |
" axs[1].plot(watermarked_audio[:, 0], label='Watermarked Ch0')\n", | |
" axs[1].plot(watermarked_audio[:, 1], label='Watermarked Ch1')\n", | |
" axs[1].set_title('Watermarked Audio Waveform')\n", | |
" axs[1].set_xlabel('Time')\n", | |
" axs[1].set_ylabel('Hz')\n", | |
" axs[1].legend()\n", | |
"\n", | |
" plt.tight_layout()\n", | |
" plt.show()\n", | |
"\n", | |
"plot_waveform(original_audio, watermarked_audio, original_fs)\n" | |
], | |
"metadata": { | |
"id": "ATCf9vdP-BLU" | |
}, | |
"execution_count": null, | |
"outputs": [] | |
} | |
] | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment