{ "nbformat": 4, "nbformat_minor": 0, "metadata": { "colab": { "provenance": [], "toc_visible": true, "authorship_tag": "ABX9TyMujeFaLZIcasVWse4WeRoD" }, "kernelspec": { "name": "python3", "display_name": "Python 3" }, "language_info": { "name": "python" } }, "cells": [ { "cell_type": "markdown", "source": [ "# WAV Stegoanography / Watermarking\n", "\n" ], "metadata": { "id": "ZDxE-AJ6rak9" } }, { "cell_type": "markdown", "source": [ "# Install Dependencies" ], "metadata": { "id": "1Qeu3VhPrwsx" } }, { "cell_type": "code", "source": [ "!pip install Pillow\n", "!pip install scipy\n", "!pip install matplotlib" ], "metadata": { "id": "2KQA-oV-slXH" }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": [ "# File Upload" ], "metadata": { "id": "YQQIOPWqr6sp" } }, { "cell_type": "code", "source": [ "!mkdir outputs\n", "from google.colab import files #upload and download\n", "\n", "print(\"All of the files are saved temporarily on colab (till the runtime shuts down)\")\n", "\n", "print(\"Upload your watermark image (png preferred)\")\n", "watermarked = files.upload()\n", "\n", "watermarked_val = list(watermarked.values())[0]\n", "watermarked_path = list(watermarked.keys())[0]" ], "metadata": { "id": "PxNbGF8NsCJG" }, "execution_count": null, "outputs": [] }, { "cell_type": "code", "source": [ "print(\"Upload original audio file (wav)\")\n", "original_audio = files.upload()\n", "\n", "original_audio_val = list(original_audio.values())[0]\n", "original_audio_path = list(original_audio.keys())[0]" ], "metadata": { "id": "3LiDDxhRtPu3" }, "execution_count": null, "outputs": [] }, { "cell_type": "code", "source": [ "# @title\n", "# for debugging or if you already uploaded the files and don't want to do that again, uncomment the following and run it\n", "\n", "watermarked_path = \"watermark.png\"\n", "# original_audio_path = \"colaco_jingle_stereo.wav\"" ], "metadata": { "id": "eZTzM5M9BVn2" }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": [ "# Configuration\n" ], "metadata": { "id": "1JPiN5MnEyTj" } }, { "cell_type": "code", "source": [ "# @title Settings { run: \"auto\" }\n", "print(\"to_flip = Mirror the Image\")\n", "print(\"to_rotate90 = Rotate image 90 degrees\")\n", "print(\"to_resize = Resize the Image or not (if latter, ignore the new width and new height)\")\n", "to_flip = True # @param {\"type\":\"boolean\",\"placeholder\":\"Flip the image\"}\n", "to_rotate90 = False # @param {\"type\":\"boolean\",\"placeholder\":\"Rotate image 90 degrees\"}\n", "to_resize = True # @param {\"type\":\"boolean\",\"placeholder\":\"Resize the image or not\"}\n", "\n", "print(\"channel: What channel should the watermark go on\")\n", "channel = \"Right\" # @param [\"Left\", \"Right\"]\n", "if channel == \"Left\":\n", " watermark_channel = 0\n", "else:\n", " watermark_channel = 1\n", "\n", "print(\"from 0 to 1, how birght the watermark will be\")\n", "watermark_strength = 0.2 # @param {type:\"slider\", min:0, max:1, step:0.1}\n" ], "metadata": { "id": "uERuqF8q1KUN" }, "execution_count": null, "outputs": [] }, { "cell_type": "code", "source": [ "# @title Resizing\n", "from PIL import Image # image magic\n", "\n", "img = Image.open(watermarked_path)\n", "width, height = img.size\n", "\n", "print(f\"Original size W:{width}x H:{height}\")\n", "\n", "new_width = 400 # @param {\"type\":\"number\",\"placeholder\":\"New Width\", min:5}\n", "new_height = 400 # @param {\"type\":\"number\",\"placeholder\":\"New Height\", min:5}\n", "\n", "if to_resize:\n", " img = img.resize((new_width, new_height))" ], "metadata": { "id": "a9RVs4DR7IcO" }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": [ "#Processing image and audio" ], "metadata": { "id": "c64DfijFt-mb" } }, { "cell_type": "code", "source": [ "import numpy as np # fancy arrays\n", "import matplotlib.pyplot as plt #plotting stuff\n", "def load_and_process_image(img, to_flip, to_rotate90):\n", " data = np.array(img, dtype='float')\n", " data = 0.2989*data[:,:,0] + 0.5870*data[:,:,1] + 0.1140*data[:,:,2] # convert to grayscale old fashioned way\n", " data = data / np.max(data) # normalize it\n", "\n", " if to_flip:\n", " data = np.flip(data, axis=0) # flip it\n", "\n", " if to_rotate90:\n", " data = np.rot90(data, k=1, axes=(0,1)) # rotate 90 degrees\n", " return data\n", "\n", "image_data = load_and_process_image(img, to_flip, to_rotate90)\n", "plt.imshow(image_data,cmap=\"gray\") # show image in colab\n", "plt.show()" ], "metadata": { "id": "3ZrOX3rquFZ9" }, "execution_count": null, "outputs": [] }, { "cell_type": "code", "source": [ "from scipy.io import wavfile\n", "def create_watermark_signal(image_data, fs, og_fs=24000):\n", " h, w = image_data.shape\n", " phdata = np.random.randn(h, w)\n", " phdata = 23 * phdata\n", " phdata = np.exp(1j * phdata)\n", " data = image_data * phdata\n", "\n", " d2 = data\n", " d1 = np.flip(data, axis=1) # flip again\n", " d1 = d1[:, 0:-1]\n", " d1 = np.conjugate(d1)\n", " data = np.concatenate((d1, data), axis=1)\n", " data = np.fft.ifftshift(data, axes=1) # inverse fast fourier transform\n", " data = np.fft.ifft(data, axis=1)\n", "\n", " data = data.flatten()\n", " data = np.real(data)\n", " data = data / np.max(data)\n", " data = np.multiply(data, 32767) #16 bit integer bound\n", " data = data.astype(np.int16)\n", "\n", " # Adjust the length of the watermark signal to match the input audio\n", " target_length = int(len(data) * (fs / og_fs)) # og_fs = 24000 is the original fs in the provided code\n", " data = np.interp(np.linspace(0, len(data), target_length), np.arange(len(data)), data)\n", " return data\n", "\n", "def embed_watermark(input_wav, output_wav, watermark_signal, watermark_channel=1, watermark_strength=0.1):\n", " \"\"\"\n", " watermark_channel 0 = left\n", " watermark_channel 1 = right\n", " \"\"\"\n", "\n", " # Load the input WAV file\n", " fs, audio = wavfile.read(input_wav) #fs = Sample rate of WAV file.\n", "\n", " # ensure the audio is stereo, if not then have the same audio track go to both\n", " if len(audio.shape) == 1:\n", " print(\"Audio is mono, converting to stereo\")\n", " audio = np.column_stack((audio, audio))\n", "\n", " # Adjust watermark length to match audio length\n", " if len(watermark_signal) > len(audio):\n", " print(\"Warning: Watermark length is longer than audio length. Padding with zeros.\")\n", " watermark_signal = watermark_signal[:len(audio)]\n", " else:\n", "\n", " watermark_signal = np.pad(watermark_signal, (0, len(audio) - len(watermark_signal)))\n", "\n", " # Embed the watermark in the specified channel from start\n", " audio[:, watermark_channel] = audio[:, watermark_channel] + (watermark_signal * watermark_strength).astype(np.int16)\n", " wavfile.write(output_wav, original_fs, audio) # Save the watermarked audio\n", " return audio\n" ], "metadata": { "id": "babXmVSWuLwc" }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": [ "Now we can save the new output" ], "metadata": { "id": "srBZ_lXK0jas" } }, { "cell_type": "code", "source": [ "import time # filename { run: \"auto\" }\n", "print(\"Processing the audio and image...\")\n", "\n", "original_fs, original_audio = wavfile.read(original_audio_path) #fs = Sample rate of WAV file.\n", "print(f\"Sampling rate {original_fs}\")\n", "\n", "watermark_signal = create_watermark_signal(image_data, 24000)\n", "output_wav = 'outputs/watermarked_output{}.wav'.format(str(int(time.time()))[-5:])\n", "watermarked_audio = embed_watermark(original_audio_path, output_wav,watermark_signal, watermark_channel,watermark_strength)" ], "metadata": { "id": "BHaW2i7IuRmQ" }, "execution_count": null, "outputs": [] }, { "cell_type": "code", "source": [ "# make a 30 second one first\n", "watermarked_audio = watermarked_audio[:original_fs * 30]\n", "# save that and then if i like it, ill wait for the full\n", "short_output_wav = 'outputs/short_watermarked_output{}.wav'.format(str(int(time.time()))[-5:])\n", "print(f\"short Watermarked audio saved as {short_output_wav}\")\n", "\n", "wavfile.write(short_output_wav, original_fs, watermarked_audio)\n", "files.download(short_output_wav)" ], "metadata": { "id": "C9UhKS26PL_N" }, "execution_count": null, "outputs": [] }, { "cell_type": "code", "source": [ "print(f\"Watermarked audio saved as {output_wav}\")\n", "\n", "files.download(output_wav)" ], "metadata": { "id": "rbFpNExwVhOU" }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": [ "#Spectrogram and other Graphs" ], "metadata": { "id": "NjHkctTvyv5M" } }, { "cell_type": "code", "source": [ "# plots\n", "from scipy.signal import spectrogram # for graphs\n", "\n", "\n", "def plot_spectrogram(audio_data, sample_rate, title=\"Spectrogram\", duration=None):\n", " \"\"\"\n", " Plots the spectrogram of the provided audio data.\n", "\n", " Args:\n", " audio_data: The audio data as a NumPy array.\n", " sample_rate: The sample rate of the audio data.\n", " title: The title for the spectrogram plot (default: \"Spectrogram\").\n", " \"\"\"\n", " frequencies, times, Sxx = spectrogram(audio_data, sample_rate, nperseg=1024)\n", " Sxx_dB = 10 * np.log10(Sxx) # decibels are on the logarithmic scale\n", "\n", " plt.figure(figsize=(10, 6))\n", " plt.pcolormesh(times, frequencies, Sxx_dB, shading='gouraud', cmap='inferno')\n", " plt.ylabel('Hz')\n", " plt.xlabel('Time [sec]')\n", " plt.colorbar(label='Intensity [dB]')\n", " plt.tight_layout()\n", "\n", " plt.title(title)\n", " plt.show()\n", "\n", "\n", "def channels(audio_data):\n", " if len(audio_data.shape) == 2: # Check if stereo and plot accordingly\n", " left_channel, right_channel = audio_data.T\n", " plot_spectrogram(left_channel, original_fs, title=\"Left Channel Spectrogram\")\n", " plot_spectrogram(right_channel, original_fs, title=\"Right Channel Spectrogram\")\n", " else:\n", " plot_spectrogram(audio_data, original_fs)\n", "\n", "# Extract the first 10 seconds of audio data\n", "num_samples_10sec = original_fs * 10\n", "first_10sec_audio_og = original_audio[:num_samples_10sec]\n", "first_10sec_audio_watermarked = watermarked_audio[:num_samples_10sec]\n", "# Load the WAV file\n", "original_fs, original_audio = wavfile.read(original_audio_path) #fs = Sample rate of WAV file.\n", "print(\"original\")\n", "channels(first_10sec_audio_og)\n", "print(\"watermarked\")\n", "channels(first_10sec_audio_watermarked)" ], "metadata": { "id": "pumVF_bCy0Hu" }, "execution_count": null, "outputs": [] }, { "cell_type": "code", "source": [ "def plot_waveform(original_audio, watermarked_audio, fs):\n", " \"\"\"\n", " This won't reveal the watermark but is kinda cool to see if you dont have audio editing software\n", " \"\"\"\n", " fig, axs = plt.subplots(2, 1, figsize=(12, 20))\n", " # Plot waveforms\n", " axs[0].plot(original_audio[:, 0], label='Original Ch0')\n", " axs[0].plot(original_audio[:, 1], label='Original Ch1')\n", " axs[0].set_title('Original Audio Waveform')\n", " axs[0].set_xlabel('Time')\n", " axs[0].set_ylabel('Hz')\n", " axs[0].legend()\n", "\n", " axs[1].plot(watermarked_audio[:, 0], label='Watermarked Ch0')\n", " axs[1].plot(watermarked_audio[:, 1], label='Watermarked Ch1')\n", " axs[1].set_title('Watermarked Audio Waveform')\n", " axs[1].set_xlabel('Time')\n", " axs[1].set_ylabel('Hz')\n", " axs[1].legend()\n", "\n", " plt.tight_layout()\n", " plt.show()\n", "\n", "plot_waveform(original_audio, watermarked_audio, original_fs)\n" ], "metadata": { "id": "ATCf9vdP-BLU" }, "execution_count": null, "outputs": [] } ] }