Open KabaTubare opened 6 months ago
The screenshots show that the embedding that was put in initially was not detected on the detection side. I have documented that this worked flawlessly prior with no code changes. Curious to understand more.
Hey, could you paste here the meat of your code to make your issue self sufficient so we can help.
can you still confirm that this working example still extracts the message correctly, (it works for me).
from audioseal import AudioSeal
# model name corresponds to the YAML card file name found in audioseal/cards
model = AudioSeal.load_generator("audioseal_wm_16bits")
# Other way is to load directly from the checkpoint
# model = Watermarker.from_pretrained(checkpoint_path, device = wav.device)
# a torch tensor of shape (batch, channels, samples) and a sample rate
# It is important to process the audio to the same sample rate as the model
# expectes. In our case, we support 16khz audio
wav, sr = ..., 16000
watermark = model.get_watermark(wav, sr)
# Optional: you can add a 16-bit message to embed in the watermark
# msg = torch.randint(0, 2, (wav.shape(0), model.msg_processor.nbits), device=wav.device)
# watermark = model.get_watermark(wav, message = msg)
watermarked_audio = wav + watermark
detector = AudioSeal.load_detector("audioseal_detector_16bits")
# To detect the messages in the high-level.
result, message = detector.detect_watermark(watermarked_audio, sr)
print(result) # result is a float number indicating the probability of the audio being watermarked,
print(message) # message is a binary vector of 16 bits
# To detect the messages in the low-level.
result, message = detector(watermarked_audio, sr)
# result is a tensor of size batch x 2 x frames, indicating the probability (positive and negative) of watermarking for each frame
# A watermarked audio should have result[:, 1, :] > 0.5
print(result[:, 1 , :])
# Message is a tensor of size batch x 16, indicating of the probability of each bit to be 1.
# message will be a random tensor if the detector detects no watermarking from the audio
print(message)
Hello Hady
This is also the HF space on this that my application was based on. This was the initial code that was working just fine until the change whereby it no longer worked. I attached the videos that correspond to this code in order to demonstrate that the model seems to have shifted in its capabilities despite absolutely no changes in the code base. I made an update based on your share but still no correspondence between watermarking message and corresponding detection of that message, represented as a hex identitifer https://huggingface.co/spaces/Kabatubare/audioseal_watermarking_and_audioseal_detection_CRYPTOGRAPHIC/settings
Screen Recording 2024-05-09 at 4.11.09.mov https://drive.google.com/file/d/1qv2VuDaZ7z9hCJy2a-g_zd18anFSEKlv/view?usp=drive_web Screen Recording 2024-05-09 at 4.14.21.mov https://drive.google.com/file/d/18Rn7v0c6diIpoqfqTmd4X8bW5ef7xhgn/view?usp=drive_web
On Thu, May 16, 2024 at 6:16 AM Hady Elsahar @.***> wrote:
Hey, could you paste here the meat of your code to make your issue self sufficient so we can help. can you still confirm that this working example still extracts the message correctly, (it works for me).
from audioseal import AudioSeal
model name corresponds to the YAML card file name found in audioseal/cards
model = AudioSeal.load_generator("audioseal_wm_16bits")
Other way is to load directly from the checkpoint
model = Watermarker.from_pretrained(checkpoint_path, device = wav.device)
a torch tensor of shape (batch, channels, samples) and a sample rate
It is important to process the audio to the same sample rate as the model
expectes. In our case, we support 16khz audio
wav, sr = ..., 16000
watermark = model.get_watermark(wav, sr)
Optional: you can add a 16-bit message to embed in the watermark
msg = torch.randint(0, 2, (wav.shape(0), model.msg_processor.nbits), device=wav.device)
watermark = model.get_watermark(wav, message = msg)
watermarked_audio = wav + watermark
detector = AudioSeal.load_detector("audioseal_detector_16bits")
To detect the messages in the high-level.
result, message = detector.detect_watermark(watermarked_audio, sr)
print(result) # result is a float number indicating the probability of the audio being watermarked, print(message) # message is a binary vector of 16 bits
To detect the messages in the low-level.
result, message = detector(watermarked_audio, sr)
result is a tensor of size batch x 2 x frames, indicating the probability (positive and negative) of watermarking for each frame
A watermarked audio should have result[:, 1, :] > 0.5
print(result[:, 1 , :])
Message is a tensor of size batch x 16, indicating of the probability of each bit to be 1.
message will be a random tensor if the detector detects no watermarking from the audio
print(message)
— Reply to this email directly, view it on GitHub https://github.com/facebookresearch/audioseal/issues/33#issuecomment-2115221614, or unsubscribe https://github.com/notifications/unsubscribe-auth/A7AVNVSOVZUZY73UL5QWG63ZCSWUTAVCNFSM6AAAAABHPZN6HSVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDCMJVGIZDCNRRGQ . You are receiving this because you authored the thread.Message ID: @.***>
-- Kind regards,
Troy Woodson
import gradio as gr import torch import torchaudio import tempfile import logging from audioseal import AudioSeal import random import string from pathlib import Path from datetime import datetime import json import os from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric import padding, rsa from cryptography.hazmat.backends import default_backend from cryptography.exceptions import InvalidSignature
def generate_keys(): private_key = rsa.generate_private_key(backend=default_backend(), public_exponent=65537, key_size=2048) public_key = private_key.public_key() return private_key, public_key def sign_message(private_key, message): signature = private_key.sign(message.encode(), padding.PSS(mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH), hashes.SHA256()) return signature def verify_signature(public_key, message, signature): try: public_key.verify( signature, message.encode(), padding.PSS(mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH), hashes.SHA256() ) return True except InvalidSignature: return False except Exception as e: logger.error(f"Unexpected exception in verify_signature: {e}") return False
logging.basicConfig(level=logging.DEBUG, filename='app.log', filemode='w', format='%(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(name)
metadata_file = 'audio_metadata.json' if not os.path.exists(metadata_file): with open(metadata_file, 'w') as f: json.dump({}, f)
def generate_unique_message(length=16): characters = string.asciiletters + string.digits return ''.join(random.choice(characters) for in range(length)) def message_to_binary(message, bit_length=16): binary_message = ''.join(format(ord(c), '08b') for c in message) return binary_message[:bit_length].ljust(bit_length, '0') def binary_to_hex(binary_str): return hex(int(binary_str, 2))[2:].zfill(4) def load_and_resample_audio(audio_file_path, target_sample_rate=16000): waveform, sample_rate = torchaudio.load(audio_file_path) if sample_rate != target_sample_rate: resampler = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=target_sample_rate) waveform = resampler(waveform) return waveform, target_sample_rate def generate_enhanced_identifier(): timestamp = datetime.now().strftime('%Y%m%d%H%M%S%f') sequential_number = str(get_next_sequential_number()).zfill(6) return f"{timestamp}-{sequential_number}" def get_next_sequential_number(): with open(metadata_file, 'r+') as f: data = json.load(f) next_number = data.get('next_sequential_number', 1) data['next_sequential_number'] = next_number + 1 f.seek(0) json.dump(data, f, indent=4) f.truncate() return next_number def save_audio_metadata(unique_id, original_hex, enhanced_id, signature_hex ): with open(metadata_file, 'r+') as f: data = json.load(f) data['audio_files'] = data.get('audio_files', {}) data['audio_files'][unique_id] = {'original_hex': original_hex, 'enhanced_id': enhanced_id, 'signature': signature_hex} f.seek(0) json.dump(data, f, indent=4) f.truncate() private_key, public_key = generate_keys() def watermark_audio(audio_file_path, unique_message): waveform, sample_rate = load_and_resample_audio(audio_file_path, target_sample_rate=16000) waveform = torch.clamp(waveform, min=-1.0, max=1.0)
if len(waveform.shape) == 2: waveform = waveform.unsqueeze(0) model = AudioSeal.load_generator("audioseal_wm_16bits") binary_message = message_to_binary(unique_message, bit_length=16) hex_message = binary_to_hex(binary_message)
message_tensor = torch.tensor([int(bit) for bit in binary_message], dtype=torch.int32).unsqueeze(0) watermark = model.get_watermark(waveform, sample_rate) watermarked_audio = waveform + watermark temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.wav') torchaudio.save(temp_file.name, watermarked_audio.squeeze(0), sample_rate) enhanced_id = generate_enhanced_identifier() signature = sign_message(private_key, unique_message) signature_hex = signature.hex() save_audio_metadata(unique_message, hex_message, enhanced_id, signature_hex) return temp_file.name, hex_message, enhanced_id, signature_hex def detect_watermark(audio_file_path, original_hex_message=None, signature_hex=None): waveform, sample_rate = load_and_resample_audio(audio_file_path, target_sample_rate=16000)
if len(waveform.shape) == 2: waveform = waveform.unsqueeze(0) detector = AudioSeal.load_detector("audioseal_detector_16bits")
result, message_tensor = detector.detect_watermark(waveform, sample_rate=sample_rate) binary_message = ''.join(str(bit) for bit in message_tensor[0].tolist()) detected_hex_message = binary_to_hex(binary_message)
low_level_result, low_level_message_tensor = detector(waveform, sample_rate=sample_rate) match_result = "Not compared" signature_verified = "Signature verification not performed" if original_hex_message: match_result = "Match" if detected_hex_message.upper() == original_hex_message.upper() else "No Match" if signature_hex: original_message_binary = format(int(original_hex_message, 16), f'0{len (original_hex_message)*4}b') signature_verified = "Verified" if verify_signature( public_key, original_message_binary, bytes.fromhex(signature_hex) ) else "Verification Failed" return result, detected_hex_message, match_result, signature_verified, low_level_result, low_level_message_tensor
On Thu, May 16, 2024 at 6:16 AM Hady Elsahar @.***> wrote:
Hey, could you paste here the meat of your code to make your issue self sufficient so we can help. can you still confirm that this working example still extracts the message correctly, (it works for me).
from audioseal import AudioSeal
model name corresponds to the YAML card file name found in audioseal/cards
model = AudioSeal.load_generator("audioseal_wm_16bits")
Other way is to load directly from the checkpoint
model = Watermarker.from_pretrained(checkpoint_path, device = wav.device)
a torch tensor of shape (batch, channels, samples) and a sample rate
It is important to process the audio to the same sample rate as the model
expectes. In our case, we support 16khz audio
wav, sr = ..., 16000
watermark = model.get_watermark(wav, sr)
Optional: you can add a 16-bit message to embed in the watermark
msg = torch.randint(0, 2, (wav.shape(0), model.msg_processor.nbits), device=wav.device)
watermark = model.get_watermark(wav, message = msg)
watermarked_audio = wav + watermark
detector = AudioSeal.load_detector("audioseal_detector_16bits")
To detect the messages in the high-level.
result, message = detector.detect_watermark(watermarked_audio, sr)
print(result) # result is a float number indicating the probability of the audio being watermarked, print(message) # message is a binary vector of 16 bits
To detect the messages in the low-level.
result, message = detector(watermarked_audio, sr)
result is a tensor of size batch x 2 x frames, indicating the probability (positive and negative) of watermarking for each frame
A watermarked audio should have result[:, 1, :] > 0.5
print(result[:, 1 , :])
Message is a tensor of size batch x 16, indicating of the probability of each bit to be 1.
message will be a random tensor if the detector detects no watermarking from the audio
print(message)
— Reply to this email directly, view it on GitHub https://github.com/facebookresearch/audioseal/issues/33#issuecomment-2115221614, or unsubscribe https://github.com/notifications/unsubscribe-auth/A7AVNVSOVZUZY73UL5QWG63ZCSWUTAVCNFSM6AAAAABHPZN6HSVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDCMJVGIZDCNRRGQ . You are receiving this because you authored the thread.Message ID: @.***>
-- Kind regards,
Troy Woodson
I had several projects whereby the code was able to accurately detect the watermarks’ unique message embedding, but that accuracy literally evaporated. I made a video documenting the code working on a spaces app on HF, because it had failed me in a full stack application I made for this on GCS, and I wanted to draw a comparison. How could detection that works flawlessly for weeks suddenly fail to accurately detect the unique embedding? It baffled me until I considered that perhapos there was a change in the model capabilities? It worked in HF, and I have the video to prove it. But when I rebuilt the app (from a cold start) on HF spaces the watermarking / detection function failed to accurately embedd the unique message, despite embedding an identieir of some sort, or detect an accurate message / identifier. The code was not changed in any way. So wanted to see what changed on the Audioseal end of things. https://huggingface.co/spaces/Kabatubare/audioseal_watermarking_and_audioseal_detection_CRYPTOGRAPHIC