inc0 / cracken

Voice mixer
12 stars 6 forks source link

How to do continuous FFT #1

Open joem5636 opened 2 years ago

joem5636 commented 2 years ago

After many hours of research I finally saw that a fairly simple approach to Overlap/Add can be used to implement continuous FFT filtering. Here is my version of the original SoundDevice wire.py that has continuous FFT with no glitches (I think):

!/usr/bin/env python3

"""Pass input directly to output. https://app.assembla.com/spaces/portaudio/git/source/master/test/patest_wire.c """ import argparse

import sounddevice as sd import numpy # Make sure NumPy is loaded before it is used in the callback assert numpy # avoid "imported but unused" message (W0611)

def int_or_str(text): """Helper function for argument parsing.""" try: return int(text) except ValueError: return text

parser = argparse.ArgumentParser(add_help=False) parser.add_argument( '-l', '--list-devices', action='store_true', help='show list of audio devices and exit') args, remaining = parser.parse_known_args() if args.list_devices: print(sd.query_devices()) parser.exit(0) parser = argparse.ArgumentParser( description=doc, formatter_class=argparse.RawDescriptionHelpFormatter, parents=[parser]) parser.add_argument( '-i', '--input-device', type=int_or_str, help='input device (numeric ID or substring)') parser.add_argument( '-o', '--output-device', type=int_or_str, help='output device (numeric ID or substring)') parser.add_argument( '-c', '--channels', type=int, default=2, help='number of channels') parser.add_argument('--dtype', help='audio data type') parser.add_argument('--samplerate', type=float, help='sampling rate') parser.add_argument('--blocksize', type=int, help='block size', default=1024) parser.add_argument('--latency', type=float, help='latency in seconds') args = parser.parse_args(remaining)

first = True parser.parse_args() # needed to prevent blocksize from being 'none' in overlap computation blocksize = args.blocksize overlap = blocksize // 2

create global areas for buffering and processing of channel data

inbuffer = numpy.zeros([blocksize 3, 2]) outbuffer = numpy.zeros([blocksize + overlap 2, 2]) timedata = numpy.zeros([blocksize + overlap 2, 2]) channel = numpy.zeros([blocksize + overlap 2]) # needs to be power of 2 mask = numpy.hanning(blocksize + overlap * 2) # blackman is another possibility

def callback(indata, outdata, frames, time, status): global first, blocksize, overlap, inbuffer, outbuffer, mask, timedata, channel if status: print(status)

# because of buffering, we introduce a delay of 3 reads before output
# is clean.
inbuffer[blocksize * 2:] = indata  # append to inbuffer
timedata = inbuffer[blocksize - overlap:blocksize * 2 + overlap, :]

# expand with loop to do all channels
# replace timedata with inbuffer[blocksize - overlap:blocksize * 2 + overlap, :]?
channel[:] = timedata[:, 0] * mask
# do fft
freqdata = numpy.fft.rfft(channel)
# do filtering here
channel = numpy.fft.irfft(freqdata)

outbuffer[:, 0] += channel

end of loop for channels

inbuffer[:-blocksize] = inbuffer[blocksize:]  # left shift inbuffer
inbuffer[-blocksize:] = 0  # not really needed for input buffer as replacement is done
outbuffer[:-overlap] = outbuffer[overlap:]  # left shift outbuffer
outbuffer[-overlap:] = 0  # here, we do a add to the overlap and this zeroed area

outdata[:] = outbuffer[:blocksize]
first = False

try: with sd.Stream(device=(args.input_device, args.output_device), samplerate=args.samplerate, blocksize=args.blocksize, dtype=args.dtype, latency=args.latency, channels=args.channels, callback=callback): print('#' 80) print('press Return to quit') print('#' 80) input() except KeyboardInterrupt: parser.exit('') except Exception as e: parser.exit(type(e).name + ': ' + str(e))

joem5636 commented 2 years ago

wireFFT.zip

Since the formatting got messed up in the email, here is the code file and a readme for my own project.