Skip to content

Instantly share code, notes, and snippets.

@avipars
Last active September 1, 2024 11:47
Show Gist options
  • Save avipars/e41d76f01fd971f841ffa99f1ce110d4 to your computer and use it in GitHub Desktop.
Save avipars/e41d76f01fd971f841ffa99f1ce110d4 to your computer and use it in GitHub Desktop.
audio_watermark_spectrogram.ipynb
Display the source blob
Display the rendered blob
Raw
{
"nbformat": 4,
"nbformat_minor": 0,
"metadata": {
"colab": {
"provenance": [],
"toc_visible": true,
"authorship_tag": "ABX9TyMpNaKz5WcyWAzawV5IgwzT",
"include_colab_link": true
},
"kernelspec": {
"name": "python3",
"display_name": "Python 3"
},
"language_info": {
"name": "python"
}
},
"cells": [
{
"cell_type": "markdown",
"metadata": {
"id": "view-in-github",
"colab_type": "text"
},
"source": [
"<a href=\"https://colab.research.google.com/gist/avipars/e41d76f01fd971f841ffa99f1ce110d4/audio_watermark_spectrogram.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>"
]
},
{
"cell_type": "markdown",
"source": [
"# WAV Stegoanography / Watermarking\n",
"\n"
],
"metadata": {
"id": "ZDxE-AJ6rak9"
}
},
{
"cell_type": "markdown",
"source": [
"Slightly modified and adapted from this github repo: https://github.com/DrSDR/Audio-Spectrogram-\n",
"\n",
"\n",
"1. Includes plots/graphics\n",
"2. Now in jupyter notebook\n",
"3. See intermediary results and pictures"
],
"metadata": {
"id": "8tZkOw9KvbH-"
}
},
{
"cell_type": "markdown",
"source": [
"# Install Dependencies"
],
"metadata": {
"id": "1Qeu3VhPrwsx"
}
},
{
"cell_type": "code",
"source": [
"!pip install Pillow\n",
"!pip install scipy\n",
"!pip install matplotlib"
],
"metadata": {
"id": "2KQA-oV-slXH"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"# File Upload"
],
"metadata": {
"id": "YQQIOPWqr6sp"
}
},
{
"cell_type": "code",
"source": [
"!mkdir outputs\n",
"from google.colab import files #upload and download\n",
"\n",
"print(\"All of the files are saved temporarily on colab (till the runtime shuts down)\")\n",
"\n",
"print(\"Upload your watermark image (png preferred)\")\n",
"watermarked = files.upload()\n",
"\n",
"watermarked_val = list(watermarked.values())[0]\n",
"watermarked_path = list(watermarked.keys())[0]"
],
"metadata": {
"id": "PxNbGF8NsCJG"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"source": [
"print(\"Upload original audio file (wav)\")\n",
"original_audio = files.upload()\n",
"\n",
"original_audio_val = list(original_audio.values())[0]\n",
"original_audio_path = list(original_audio.keys())[0]"
],
"metadata": {
"id": "3LiDDxhRtPu3"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"source": [
"# @title\n",
"# for debugging or if you already uploaded the files and don't want to do that again, uncomment the following and run it\n",
"\n",
"#watermarked_path = \"watermark.png\"\n",
"# original_audio_path = \"colaco_jingle_stereo.wav\""
],
"metadata": {
"id": "eZTzM5M9BVn2"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"# Configuration\n"
],
"metadata": {
"id": "1JPiN5MnEyTj"
}
},
{
"cell_type": "code",
"source": [
"# @title Settings { run: \"auto\" }\n",
"print(\"to_flip = Mirror the Image\")\n",
"print(\"to_rotate90 = Rotate image 90 degrees\")\n",
"print(\"to_resize = Resize the Image or not (if latter, ignore the new width and new height)\")\n",
"to_flip = True # @param {\"type\":\"boolean\",\"placeholder\":\"Flip the image\"}\n",
"to_rotate90 = False # @param {\"type\":\"boolean\",\"placeholder\":\"Rotate image 90 degrees\"}\n",
"to_resize = True # @param {\"type\":\"boolean\",\"placeholder\":\"Resize the image or not\"}\n",
"\n",
"print(\"channel: What channel should the watermark go on\")\n",
"channel = \"Right\" # @param [\"Left\", \"Right\"]\n",
"if channel == \"Left\":\n",
" watermark_channel = 0\n",
"else:\n",
" watermark_channel = 1\n",
"\n",
"print(\"from 0 to 1, how birght the watermark will be\")\n",
"watermark_strength = 0.2 # @param {type:\"slider\", min:0, max:1, step:0.1}\n"
],
"metadata": {
"id": "uERuqF8q1KUN"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"source": [
"# @title Resizing\n",
"from PIL import Image # image magic\n",
"\n",
"img = Image.open(watermarked_path)\n",
"width, height = img.size\n",
"\n",
"print(f\"Original size W:{width}x H:{height}\")\n",
"\n",
"new_width = 400 # @param {\"type\":\"number\",\"placeholder\":\"New Width\", min:5}\n",
"new_height = 400 # @param {\"type\":\"number\",\"placeholder\":\"New Height\", min:5}\n",
"\n",
"if to_resize:\n",
" img = img.resize((new_width, new_height))"
],
"metadata": {
"id": "a9RVs4DR7IcO"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"#Processing image and audio"
],
"metadata": {
"id": "c64DfijFt-mb"
}
},
{
"cell_type": "code",
"source": [
"import numpy as np # fancy arrays\n",
"import matplotlib.pyplot as plt #plotting stuff\n",
"def load_and_process_image(img, to_flip, to_rotate90):\n",
" data = np.array(img, dtype='float')\n",
" data = 0.2989*data[:,:,0] + 0.5870*data[:,:,1] + 0.1140*data[:,:,2] # convert to grayscale old fashioned way\n",
" data = data / np.max(data) # normalize it\n",
"\n",
" if to_flip:\n",
" data = np.flip(data, axis=0) # flip it\n",
"\n",
" if to_rotate90:\n",
" data = np.rot90(data, k=1, axes=(0,1)) # rotate 90 degrees\n",
" return data\n",
"\n",
"image_data = load_and_process_image(img, to_flip, to_rotate90)\n",
"plt.imshow(image_data,cmap=\"gray\") # show image in colab\n",
"plt.show()"
],
"metadata": {
"id": "3ZrOX3rquFZ9"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"source": [
"from scipy.io import wavfile\n",
"def create_watermark_signal(image_data, fs, og_fs=24000):\n",
" h, w = image_data.shape\n",
" phdata = np.random.randn(h, w)\n",
" phdata = 23 * phdata\n",
" phdata = np.exp(1j * phdata)\n",
" data = image_data * phdata\n",
"\n",
" d2 = data\n",
" d1 = np.flip(data, axis=1) # flip again\n",
" d1 = d1[:, 0:-1]\n",
" d1 = np.conjugate(d1)\n",
" data = np.concatenate((d1, data), axis=1)\n",
" data = np.fft.ifftshift(data, axes=1) # inverse fast fourier transform\n",
" data = np.fft.ifft(data, axis=1)\n",
"\n",
" data = data.flatten()\n",
" data = np.real(data)\n",
" data = data / np.max(data)\n",
" data = np.multiply(data, 32767) #16 bit integer bound\n",
" data = data.astype(np.int16)\n",
"\n",
" # Adjust the length of the watermark signal to match the input audio\n",
" target_length = int(len(data) * (fs / og_fs)) # og_fs = 24000 is the original fs in the provided code\n",
" data = np.interp(np.linspace(0, len(data), target_length), np.arange(len(data)), data)\n",
" return data\n",
"\n",
"def embed_watermark(input_wav, output_wav, watermark_signal, watermark_channel=1, watermark_strength=0.1):\n",
" \"\"\"\n",
" watermark_channel 0 = left\n",
" watermark_channel 1 = right\n",
" \"\"\"\n",
"\n",
" # Load the input WAV file\n",
" fs, audio = wavfile.read(input_wav) #fs = Sample rate of WAV file.\n",
"\n",
" # ensure the audio is stereo, if not then have the same audio track go to both\n",
" if len(audio.shape) == 1:\n",
" print(\"Audio is mono, converting to stereo\")\n",
" audio = np.column_stack((audio, audio))\n",
"\n",
" # Adjust watermark length to match audio length\n",
" if len(watermark_signal) > len(audio):\n",
" print(\"Warning: Watermark length is longer than audio length. Padding with zeros.\")\n",
" watermark_signal = watermark_signal[:len(audio)]\n",
" else:\n",
"\n",
" watermark_signal = np.pad(watermark_signal, (0, len(audio) - len(watermark_signal)))\n",
"\n",
" # Embed the watermark in the specified channel from start\n",
" audio[:, watermark_channel] = audio[:, watermark_channel] + (watermark_signal * watermark_strength).astype(np.int16)\n",
" wavfile.write(output_wav, original_fs, audio) # Save the watermarked audio\n",
" return audio\n"
],
"metadata": {
"id": "babXmVSWuLwc"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"Now we can save the new output"
],
"metadata": {
"id": "srBZ_lXK0jas"
}
},
{
"cell_type": "code",
"source": [
"import time # filename { run: \"auto\" }\n",
"print(\"Processing the audio and image...\")\n",
"\n",
"original_fs, original_audio = wavfile.read(original_audio_path) #fs = Sample rate of WAV file.\n",
"print(f\"Sampling rate {original_fs}\")\n",
"\n",
"watermark_signal = create_watermark_signal(image_data, 24000)\n",
"output_wav = 'outputs/watermarked_output{}.wav'.format(str(int(time.time()))[-5:])\n",
"watermarked_audio = embed_watermark(original_audio_path, output_wav,watermark_signal, watermark_channel,watermark_strength)"
],
"metadata": {
"id": "BHaW2i7IuRmQ"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"source": [
"# make a 30 second one first\n",
"watermarked_audio = watermarked_audio[:original_fs * 30]\n",
"# save that and then if i like it, ill wait for the full\n",
"short_output_wav = 'outputs/short_watermarked_output{}.wav'.format(str(int(time.time()))[-5:])\n",
"print(f\"short Watermarked audio saved as {short_output_wav}\")\n",
"\n",
"wavfile.write(short_output_wav, original_fs, watermarked_audio)\n",
"files.download(short_output_wav)"
],
"metadata": {
"id": "C9UhKS26PL_N"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"source": [
"print(f\"Watermarked audio saved as {output_wav}\")\n",
"\n",
"files.download(output_wav)"
],
"metadata": {
"id": "rbFpNExwVhOU"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"#Spectrogram and other Graphs"
],
"metadata": {
"id": "NjHkctTvyv5M"
}
},
{
"cell_type": "code",
"source": [
"# plots\n",
"from scipy.signal import spectrogram # for graphs\n",
"\n",
"\n",
"def plot_spectrogram(audio_data, sample_rate, title=\"Spectrogram\", duration=None):\n",
" \"\"\"\n",
" Plots the spectrogram of the provided audio data.\n",
"\n",
" Args:\n",
" audio_data: The audio data as a NumPy array.\n",
" sample_rate: The sample rate of the audio data.\n",
" title: The title for the spectrogram plot (default: \"Spectrogram\").\n",
" \"\"\"\n",
" frequencies, times, Sxx = spectrogram(audio_data, sample_rate, nperseg=1024)\n",
" Sxx_dB = 10 * np.log10(Sxx) # decibels are on the logarithmic scale\n",
"\n",
" plt.figure(figsize=(10, 6))\n",
" plt.pcolormesh(times, frequencies, Sxx_dB, shading='gouraud', cmap='inferno')\n",
" plt.ylabel('Hz')\n",
" plt.xlabel('Time [sec]')\n",
" plt.colorbar(label='Intensity [dB]')\n",
" plt.tight_layout()\n",
"\n",
" plt.title(title)\n",
" plt.show()\n",
"\n",
"\n",
"def channels(audio_data):\n",
" if len(audio_data.shape) == 2: # Check if stereo and plot accordingly\n",
" left_channel, right_channel = audio_data.T\n",
" plot_spectrogram(left_channel, original_fs, title=\"Left Channel Spectrogram\")\n",
" plot_spectrogram(right_channel, original_fs, title=\"Right Channel Spectrogram\")\n",
" else:\n",
" plot_spectrogram(audio_data, original_fs)\n",
"\n",
"# Extract the first 10 seconds of audio data\n",
"num_samples_10sec = original_fs * 10\n",
"first_10sec_audio_og = original_audio[:num_samples_10sec]\n",
"first_10sec_audio_watermarked = watermarked_audio[:num_samples_10sec]\n",
"# Load the WAV file\n",
"original_fs, original_audio = wavfile.read(original_audio_path) #fs = Sample rate of WAV file.\n",
"print(\"original\")\n",
"channels(first_10sec_audio_og)\n",
"print(\"watermarked\")\n",
"channels(first_10sec_audio_watermarked)"
],
"metadata": {
"id": "pumVF_bCy0Hu"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"source": [
"def plot_waveform(original_audio, watermarked_audio, fs):\n",
" \"\"\"\n",
" This won't reveal the watermark but is kinda cool to see if you dont have audio editing software\n",
" \"\"\"\n",
" fig, axs = plt.subplots(2, 1, figsize=(12, 20))\n",
" # Plot waveforms\n",
" axs[0].plot(original_audio[:, 0], label='Original Ch0')\n",
" axs[0].plot(original_audio[:, 1], label='Original Ch1')\n",
" axs[0].set_title('Original Audio Waveform')\n",
" axs[0].set_xlabel('Time')\n",
" axs[0].set_ylabel('Hz')\n",
" axs[0].legend()\n",
"\n",
" axs[1].plot(watermarked_audio[:, 0], label='Watermarked Ch0')\n",
" axs[1].plot(watermarked_audio[:, 1], label='Watermarked Ch1')\n",
" axs[1].set_title('Watermarked Audio Waveform')\n",
" axs[1].set_xlabel('Time')\n",
" axs[1].set_ylabel('Hz')\n",
" axs[1].legend()\n",
"\n",
" plt.tight_layout()\n",
" plt.show()\n",
"\n",
"plot_waveform(original_audio, watermarked_audio, original_fs)\n"
],
"metadata": {
"id": "ATCf9vdP-BLU"
},
"execution_count": null,
"outputs": []
}
]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment