Open jamieden opened 1 year ago
I've put together a script that sits in between rtl_fm
and multimon-ng
to handle the recording, then uses UDP packets to control recording. This may not be a great solution for this package in general, but sharing for possible inspiration. I intend on using this for other pipelines too so went with something more generic.
I've put together a script that sits in between
rtl_fm
andmultimon-ng
to handle the recording, then uses UDP packets to control recording. This may not be a great solution for this package in general, but sharing for possible inspiration. I intend on using this for other pipelines too so went with something more generic.
Sounds like a great idea! Do you mind if I implement your code into mine? Credit given, of course.
Feel free! No credit needed even, I'm just happy if it makes a better product that I get to use
Also side note: I have only tested this on a Linux Mint system. With the subprocesses I'm sure it may be different for other OSes
@tonyborries Would you be able to help me out? I can't seem to find a way to implement your code into mine. I see the basic concept, I just can't implement it.
This is what I have so far:
if args.source == 'rtl':
try:
rtl_fm_cmd = ['rtl_fm', '-f', str(args.frequency[0]) + 'M', '-M', 'fm', '-s', '22050', '-E', 'dc', '-p',
str(args.ppm[0]), '-']
multimon_ng_cmd = ['multimon-ng', '-t', 'raw', '-a', 'EAS', '-']
rtl_fm_process = subprocess.Popen(rtl_fm_cmd, stdout=subprocess.PIPE, shell=True)
multimon_ng_process = subprocess.Popen(multimon_ng_cmd, stdin=rtl_fm_process.stdout,
stdout=subprocess.PIPE, shell=True)
# sox_process = subprocess.Popen(['C:\\Program Files (x86)\\sox-14-4-2\\sox.exe', '-V1', '-b', '16',
# '-c', '1', '-e', 'signed-integer', '-r', '48k', '-t', 'raw', '-',
# '-t', 'waveaudio', 'default'], stdin=rtl_fm_process.stdout)
source_process = multimon_ng_process
except Exception as detail:
logging.error(detail)
return
And as far as I know, this works fine. I just need a way to get the audio to be able to be recorded/listened to. I also can't seem to get your code to work. Is it updated to work with Python 3?
EDIT: I made changes to the code above to make it look like this:
if args.source == 'rtl':
try:
rtl_fm_cmd = ['rtl_fm', '-f', str(args.frequency[0]) + 'M', '-M', 'fm', '-s', '22050', '-E', 'dc', '-p',
str(args.ppm[0]), '-']
multimon_ng_cmd = ['multimon-ng', '-t', 'raw', '-a', 'EAS', '-']
sox_cmd = ['C:\\Program Files (x86)\\sox-14-4-2\\sox.exe', '-V1', '-b', '16',
'-c', '1', '-e', 'signed-integer', '-r', '22050', '-t', 'raw', '-',
'-t', 'waveaudio', 'default']
rtl_fm_process = subprocess.Popen(rtl_fm_cmd, stdout=subprocess.PIPE, shell=True)
multimon_ng_process = subprocess.Popen(multimon_ng_cmd, stdin=rtl_fm_process.stdout,
stdout=subprocess.PIPE, shell=True)
# noinspection PyUnusedLocal
sox_process = subprocess.Popen(sox_cmd, stdin=rtl_fm_process.stdout)
source_process = multimon_ng_process
except Exception as detail:
logging.error(detail)
return
and SoX picks the data stream up, it's just too laggy to do anything with. I think I'm splitting the subprocess output pipe too much for it to handle, and I can't use multiprocessing to give each program their own process split.
@tonyborries So I was screwing with your code to make it work with Windows, and I got the code modified up to this point:
import argparse
from collections import deque
import datetime
import os
import socket
import subprocess
import sys
import threading
import time
import keyboard
class AudioSplitter():
"""
Main class that will receive input and split to outputs
"""
def __init__(self, args):
self.args = args
# Circular Audio Buffer
# each item is a bytearray of a complete frame of audio
# (nBbytes * nChannels)
self.AUDIO_BUFFER = deque(maxlen=args.sample_rate * args.buffer_size)
# used to store partial audio frames
self.SAMPLE_BUFFER = bytearray()
self.recording = False
self.monitoring = False
self.streamOutputs = set() # Send immediately
self.recordOutputs = set() # Send only on record
self.monitorOutputs = set()
self.bufferControlLock = threading.Lock()
def addOutput(self, output, stream=False, record=False, monitor=False):
if stream:
self.streamOutputs.add(output)
if record:
self.recordOutputs.add(output)
if monitor:
self.monitorOutputs.add(output)
def setMode(self, record=None, monitor=None):
with self.bufferControlLock:
if record is not None:
self.recording = record
if monitor is not None:
self.monitoring = monitor
# Monitors to stop
allOutputs = self.recordOutputs | self.monitorOutputs
activeOutputs = set()
if self.recording:
activeOutputs |= self.recordOutputs
if self.monitoring:
activeOutputs |= self.monitorOutputs
for output in (allOutputs - activeOutputs):
output.stop()
# Start monitors instantly
if self.monitoring:
for output in self.monitorOutputs:
if not output.isActive():
output.start()
# If recording, start and add buffer
if self.recording:
startedOutputs = set()
for output in self.recordOutputs:
if not output.isActive():
output.start()
startedOutputs.add(output)
if startedOutputs:
for sample in list(self.AUDIO_BUFFER):
for output in startedOutputs:
output.write(sample)
self.AUDIO_BUFFER.clear()
def process_input(self, i_bytes: bytearray):
"""
Either add to the buffer, or write to file if recording is active
"""
with self.bufferControlLock:
# Send streaming immediately
for output in self.streamOutputs:
output.write(i_bytes)
# This bundle of joy is my naive attempt at ensuring synchronized
# framing through the buffer
def _outputFrame(b):
if self.monitoring:
for output in self.monitorOutputs:
output.write(b)
if self.recording:
recordOutputs = self.recordOutputs
if self.monitoring:
recordOutputs -= self.monitorOutputs
for output in recordOutputs:
output.write(b)
else:
self.AUDIO_BUFFER.append(b)
x = 0
input_len = len(i_bytes)
if self.SAMPLE_BUFFER:
while len(self.SAMPLE_BUFFER) < self.args.sample_bytes and x < input_len:
self.SAMPLE_BUFFER.append(i_bytes[x])
x += 1
if len(self.SAMPLE_BUFFER) == self.args.sample_bytes:
_outputFrame(self.SAMPLE_BUFFER)
self.SAMPLE_BUFFER = bytearray()
while x < input_len:
if (input_len - x) >= self.args.sample_bytes:
_outputFrame(i_bytes[x:x + self.args.sample_bytes])
x += self.args.sample_bytes
else:
self.SAMPLE_BUFFER = i_bytes[x:]
break
class OutputBase():
def __init__(self, args):
self.active = False
self.args = args
def write(self, b: bytearray):
pass
def start(self):
"""
May be called multiple times, ensure to check if started prior
"""
self.active = True
def stop(self):
"""
May be called multiple times, ensure to check if started prior
"""
self.active = False
def isActive(self):
return self.active
class StdoutOuput(OutputBase):
def write(self, b: bytearray):
sys.stdout.buffer.write(b)
class BufferedSubProcessOutputBase(OutputBase):
def __init__(self, args):
super().__init__(args)
self.process = None
def write(self, b):
if self.process:
self.process.stdin.write(b)
def stop(self):
if self.process:
self.process.terminate()
self.process = None
def isActive(self):
return self.process is not None
class BufferedWavOutput(BufferedSubProcessOutputBase):
def start(self):
if self.process:
return
filename = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") + ".wav"
filepath = os.path.join(self.args.recording_dir, filename)
self.process = subprocess.Popen(
f'"C:\\Program Files (x86)\\sox-14-4-2\\sox.exe" -t raw -b {self.args.sample_bytes * 8} -e signed -r {self.args.sample_rate} -c1 - "{filepath}"',
shell=True,
stdin=subprocess.PIPE,
)
class BufferedMP3Output(BufferedSubProcessOutputBase):
def start(self):
if self.process:
return
filename = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") + ".mp3"
filepath = os.path.join(self.args.recording_dir, filename)
self.process = subprocess.Popen(
f'"C:\\Program Files (x86)\\sox-14-4-2\\sox.exe" -t raw -b {self.args.sample_bytes * 8} -e signed -r {self.args.sample_rate} -c1 - "{filepath}"',
shell=True,
stdin=subprocess.PIPE,
)
class BufferedAudioOutput(BufferedSubProcessOutputBase):
def start(self):
if self.process:
return
self.process = subprocess.Popen(
f'"C:\\Program Files (x86)\\sox-14-4-2\\sox.exe" -t raw -r {self.args.sample_rate} -es -b {self.args.sample_bytes * 8} -c 1 -V1 - -t waveaudio default',
shell=True,
stdin=subprocess.PIPE,
)
def parse_arguments():
parser = argparse.ArgumentParser(
description="Splits an audio stream on stdin to dynamic destinations",
prog="cliAudioTee",
)
###
# UDP Control Port
parser.add_argument(
'--udp_port',
default=12345,
help='UDP port for monitor/record control'
)
parser.add_argument(
'--udp_host',
default="0.0.0.0",
help='UDP bind host for monitor/record control'
)
###
# Audio Format
parser.add_argument(
'--sample_rate',
default=22050,
help='Audio Sample Rate (hz)',
)
parser.add_argument(
'--sample_bytes',
default=2,
help='Audio Sample Byte-depth (bit depth/8)',
)
###
# Recording
parser.add_argument(
'--recording_dir',
default="Recordings",
help='Directory for recordings',
)
parser.add_argument(
'--buffer_size',
default=5,
help='Audio Buffer Size for pre-recording (seconds)',
)
args, unknown = parser.parse_known_args()
return args
args = parse_arguments()
audioSplitter = AudioSplitter(args)
audioSplitter.addOutput(StdoutOuput(args), stream=True)
audioSplitter.addOutput(BufferedWavOutput(args), record=True)
audioSplitter.addOutput(BufferedMP3Output(args), record=True)
audioSplitter.addOutput(BufferedAudioOutput(args), record=True, monitor=True)
shutdownEvent = threading.Event()
def shutdown():
# only call from the main thread
shutdownEvent.set()
time.sleep(0.5)
sys.exit()
def input_thread():
while True:
i_bytes = bytes(sys.stdin.buffer.read(128))
if shutdownEvent.is_set():
return
if i_bytes:
audioSplitter.process_input(i_bytes)
else:
sys.stderr.write("Lost Input - Terminating\n")
shutdownEvent.set()
return
def udp_control_thread(host, port):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind( (host, port) )
sock.settimeout(0.01)
while True:
data = None
try:
data = sock.recv(1)
except TimeoutError:
pass
if shutdownEvent.is_set():
return
if data:
audioSplitter.setMode(
record=data[0] & 0x01,
monitor=data[0] & 0x10,
)
i_thread = threading.Thread(target=input_thread)
i_thread.daemon = True
i_thread.start()
control_thread = threading.Thread(
target=udp_control_thread,
args=(args.udp_host, args.udp_port),
)
control_thread.daemon = True
control_thread.start()
def keyboard_control_thread():
while True:
if keyboard.is_pressed('s'):
audioSplitter.setMode(record=True)
elif keyboard.is_pressed('x'):
audioSplitter.setMode(record=False, monitor=False)
elif keyboard.is_pressed('q'):
shutdown()
return
time.sleep(0.1)
control_thread = threading.Thread(target=keyboard_control_thread)
control_thread.daemon = True
control_thread.start()
while True:
time.sleep(.1)
if shutdownEvent.is_set():
shutdown()
However, it only creates a WAV file and an MP3 file with 1KB sizes, and there's no audio data stored inside them. Got any insights?
I am using Python 3.10 on Linux (Mint) and I think the UDP and subprocess stuff has significant differences between Windows.
I've had lots of issues around the subprocesses (first time I've tied multiple together like this). I've started to notice sometimes the subprocesses aren't closing and keep running in the background. So I'm sure I'm missing something in here still.
For the 1KB file sizes, I had something similar with earlier versions of the input read around this:
i_bytes = bytes(sys.stdin.buffer.read(128))
in an earlier version I had something that wouldn't return until it received an EOF, possibly a different read method will fix that on Windows? At least that would be my starting point to see if it is actually reading data.
I didn't think about the lag going to Sox - I noticed on mine it stutters a bit when it starts, but probably because I had a 5 second buffer I dumped in it eventually plays smooth. I guess without that buffer it may never stop stuttering.
I am using Python 3.10 on Linux (Mint) and I think the UDP and subprocess stuff has significant differences between Windows.
I've had lots of issues around the subprocesses (first time I've tied multiple together like this). I've started to notice sometimes the subprocesses aren't closing and keep running in the background. So I'm sure I'm missing something in here still.
For the 1KB file sizes, I had something similar with earlier versions of the input read around this:
i_bytes = bytes(sys.stdin.buffer.read(128))
in an earlier version I had something that wouldn't return until it received an EOF, possibly a different read method will fix that on Windows? At least that would be my starting point to see if it is actually reading data.
I didn't think about the lag going to Sox - I noticed on mine it stutters a bit when it starts, but probably because I had a 5 second buffer I dumped in it eventually plays smooth. I guess without that buffer it may never stop stuttering.
I never thought about using a buffer… although the buffer time might be too long for my liking, but if it makes it work, then I’ll absolutely take it
I'm leaving this issue so I can remember to fix it later on. When recording from multimon-ng using the default input device, it works fine, but when using an RTL_SDR device for tuning into NOAA Weather Radio, it STILL ONLY records the default input device. I'm not sure how to use rtl_fm to start and stop recording at a specific point, or if I need to split the streams into two, or if this can even work at all, but I will work to ensure this issue is resolved.