spatialaudio / python-sounddevice

:sound: Play and Record Sound with Python :snake:
https://python-sounddevice.readthedocs.io/
MIT License
1.01k stars 149 forks source link

Question on How to Get R and L Mic/StereoMix Volume Data #545

Open ns96 opened 1 month ago

ns96 commented 1 month ago

Hi, I am working on a small script which reads data from the Mic/StereoMix devices on Windows and want to be able to get the volume data for the R and L channels separately. I suspect it should be an easy thing to do, but couldn't find any examples searching online. Here is the code I have so far which gets the volume data as a single value, and sends it to the PeppyMeter program over http.

# script to read mic or any input device and send it to PeppyMeter for display
#!/usr/bin/env python3
import numpy as np
import sounddevice as sd
import requests

# specify the input device number in this case stereo mix
input_device = 18
peppy_url = 'http://localhost:8001/vumeter'
#peppy_url = 'http://192.168.1.240:8001/vumeter'

print(sd.query_devices())

print("\nEnter 'q' or 'Q' to Exit")

# function to handle data from audio input device 
def audio_callback(indata, frames, time, status):
    volume_norm = int(np.linalg.norm(indata) * 10)
    #print(volume_norm)
    print("|" * volume_norm)

    # send json volume data
    volume_data = {"left": volume_norm, "right": volume_norm, "mono":volume_norm}
    response = requests.put(peppy_url, json=volume_data)
    #print(response)

try:
    stream = sd.InputStream(device=input_device, callback=audio_callback)
    with stream:
        while True:
            response = input()
            if response in ('', 'q', 'Q'):
                break

except KeyboardInterrupt:
    print('Interrupted by user')
except Exception as e:
    print(type(e).__name__ + ': ' + str(e))  
mgeier commented 1 month ago

The variable indata is a NumPy array where each audio channel is stored as a column. You can get a single column with the usual NumPy indexing. For example, to get all values for the second (i.e. the right) channel, you can use this:

indata[:, 1]
abelgladstone commented 4 weeks ago

Printing in the callback is something that should be avoided. I might suggest using a queue to post the input data to another thread. Then calculate the norm in that thread and send the request to the peppy URL. Your modified code may look something like this.

# script to read mic or any input device and send it to PeppyMeter for display
#!/usr/bin/env python3
import numpy as np
import sounddevice as sd
import requests
import queue
from threading import Thread, Event

# specify the input device number in this case stereo mix
input_device = 18
peppy_url = 'http://localhost:8001/vumeter'
#peppy_url = 'http://192.168.1.240:8001/vumeter'
data_q = queue.Queue()
exit_event = Event()
consumer_thread = Thread(target = calculate_volume_norm, args=(exit_event, data_q), daemon=True)

print(sd.query_devices())
print("\nEnter 'q' or 'Q' to Exit")

def calculate_volume_norm(stop_event, in_q):
    while not stop_event.is_set():
        try:
            data = in_q.get(timeout = 0.1)
            volume_norm = int(np.linalg.norm(data) * 10)
            print("|" * volume_norm)

            # send json volume data
            volume_data = {"left": volume_norm, "right": volume_norm, "mono":volume_norm}
            response = requests.put(peppy_url, json=volume_data)
        except queue.Empty:
           # We timed out. 
            ...

# function to handle data from audio input device 
def audio_callback(indata, frames, time, status, q_in):
    if status:
        print('status')
    q_in.put_nowait(indata)

try:
    stream = sd.InputStream(device=input_device, callback=lambda x,y,z,w: audio_callback(x,y,z,w,data_q))
    exit_event.clear()
    consumer_thread.start()
    with stream:
        while True:
            response = input()
            if response in ('', 'q', 'Q'):
                break

except KeyboardInterrupt:
    print('Interrupted by user')
except Exception as e:
    print(type(e).__name__ + ': ' + str(e)) 
finally:
    exit_event.set()
    consumer_thread.join()