vsergeev / luaradio

A lightweight, embeddable software-defined radio framework built on LuaJIT
https://luaradio.io
Other
608 stars 63 forks source link

[Core] API for Asynchronous Control Messages #40

Open rhodey opened 7 years ago

rhodey commented 7 years ago

Hi @vsergeev,

I really love luaradio and am eager to switch many of my DSP projects over to your project, the main thing preventing me from doing this right now is your long-term "Add API for asynchronous control messages" TODO. Have you made any progress on this recently? Is there a design plan you could lay out that I could maybe get started on?

Lua and especially Lua FFI are not my specialties but I'm highly motivated to migrate to luaradio. I've been working on the Radio Witness Project for about two years now and If I can migrate to luaradio I would be eager to contribute more.

Best,

-- Rhodey

vsergeev commented 7 years ago

Hi Rhodey,

Awesome! I'm glad you like it. I'd be happy to collaborate on any features you're interested in and on the overall direction of the project.

Asynchronous control messages is the most requested feature by email, and is definitely a blocker for those trying to implement interactive flow graphs. Despite being in the long-term section in the wiki, its implementation isn't too difficult. It's mostly a matter of settling on a good first API, as it would be a very public interface. So this is a great opportunity to talk about it.

I also think it's time to start implementing it experimentally. Below is what I've had in mind so far. Ideas and feedback are always welcome.

High-level

Example

This is just a sketch up of what it could be like.

Example flow graph:

-- rtlsdr_am_envelope.lua example
local source = radio.RtlSdrSource(130e6, 1102500)
local decimator = radio.DecimatorBlock(5e3, 50)
local am_demod = radio.ComplexMagnitudeBlock()
local dcr_filter = radio.SinglepoleHighpassFilterBlock(100)
local af_filter = radio.LowpassFilterBlock(128, 5e3)
local af_gain = radio.AGCBlock('slow')
local sink = radio.PulseAudioSink(1)

local top = radio.CompositeBlock()
top:connect(source, tuner, am_demod, dcr_filter, af_filter, af_gain, sink)

top:start()

After the flow graph is started, the programmatic interface allows calls into blocks from the flow graph process:

-- Asynchronous control in Lua
source:ctrl_set_frequency(135e6)
af_filter:ctrl_set_bandwidth(3e3)
af_gain:ctrl_set_mode('fast')

These methods are monkey-patched after the top-level block is started to wrap the underlying interprocess calls.

Specific blocks can be exported to RPC under instance names:

-- Export control of these blocks to RPC, under the specified instance names
top:export(source, "rtlsdr")
top:export(af_filter, "af_filter")
top:export(af_gain, "af_gain")

and the built-in RPC server can be run with listen(), accepting a URI specifying the transport:

-- Listen for asynchronous control over RPC (blocking)
top:listen("tcp://0.0.0.0:9000")

The RPC speaks JSON and can be made by your language/transport of choice:

-- Block control methods namespaced under block instance
-> {"version": 1, "method": "rtlsdr.ctrl_set_frequency", "params": [135000000], "id": 1}
<- {"version": 1, "result": null, "id": 1}
-> {"version": 1, "method": "rtlsdr.ctrl_get_frequency", "id": 2}
<- {"version": 1, "result": 135000000.0, "id": 2}
-> {"version": 1, "method": "rtlsdr.ctrl_set_frequency", "params": ["foobar"], "id": 3}
<- {"version": 1, "error": "Invalid frequency.", "id": 3}
-> {"version": 1, "method": "af_filter.ctrl_set_bandwidth", "params": [2500.0], "id": 4}
<- {"version": 1, "result": null, "id": 4}

-- Top-level methods are top-level block methods
-> {"version": 1, "method": "status", "id": 5}
<- {"version": 1, "result": {"running": true}, "id": 5}
-> {"version": 1, "method": "stop", "id": 6}
<- {"version": 1, "result": null, "id": 6}
-> {"version": 1, "method": "status", "id": 7}
<- {"version": 1, "result": {"running": false}, "id": 7}

Blocks control handlers are automatically registered by the prefix ctrl_ and are ordinary Lua methods that can access block state naturally:

local RtlSdrSource = block.factory("RtlSdrSource")

...

function RtlSdrSource:ctrl_set_frequency(frequency)
    assert(type(frequency) == "number", "Invalid frequency.")

    local ret = librtlsdr.rtlsdr_set_center_freq(self.dev[0], frequency)
    if ret ~= 0 then
        error("rtlsdr_set_center_freq(): " .. tostring(ret))
    end
end

function RtlSdrSource:ctrl_get_frequency()
    local frequency = librtlsdr.rtlsdr_get_center_freq(self.dev[0])
    if ret ~= 0 then
        error("rtlsdr_get_center_freq(): " .. tostring(ret))
    end

    return tonumber(frequency)
end

Composite blocks can implement control handlers too, calling underlying blocks and even nested composite blocks, as needed:

-- AM Envelope Example
local source = radio.RtlSdrSource(130e6, 1102500)
local decimator = radio.DecimatorBlock(5e3, 50)
local am_demod = radio.ComplexMagnitudeBlock()
local dcr_filter = radio.SinglepoleHighpassFilterBlock(100)
local af_filter = radio.LowpassFilterBlock(128, 5e3)
local af_gain = radio.AGCBlock('slow')
local sink = radio.PulseAudioSink(1)
local top = radio.CompositeBlock()

top:connect(source, tuner, am_demod, dcr_filter, af_filter, af_gain, sink)

-- Custom control handler for top-level composite block
function top:ctrl_reset()
    -- Reset to known state
    source:ctrl_set_frequency(130e6)
    af_filter:ctrl_set_bandwidth(5e3)
    af_gain:ctrl_set_mode('slow')
end

function top:ctrl_foo()
    return {1, 2, 3}
end

function top:ctrl_bar()
    error('bar')
end

top:start()
-- RPC to top-level methods
-> {"version": 1, "method": "ctrl_reset", "id": 7}
<- {"version": 1, "result": null, "id": 7}
-> {"version": 1, "method": "ctrl_foo", "id": 8}
<- {"version": 1, "result": [1, 2, 3], "id": 8}
-> {"version": 1, "method": "ctrl_bar", "id": 9}
<- {"version": 1, "error": "bar", "id": 9}

Performance and Other Details

The control path would probably be implemented with anonymous UNIX sockets, like the data path. The only changes to the critical path of the block is adding a poll() or select() to monitor both control and data sockets for new data. This isn't too new -- blocks with multiple inputs already poll() across multiple data path sockets before reading -- but it does add an additional context switch to the critical path of all single input blocks (which currently just block on read()).

The JSON RPC request decoding, execution, and response encoding would be handled in the control path of blocks. The built-in listen() RPC server in the top-level block is essentially just a reverse proxy (over TCP, etc.) to the underlying control paths (over UNIX sockets) of its blocks. It receives a request from the client, looks up the right block by the instance name, strips the instance name from the call, forwards the request to the block, and then forwards the response from the block back to the client.

Outstanding Issues/Questions

I'm leaning towards making them purely dynamic types for now, as this feels more Lua and makes registration very easy. Control handlers can assert types as needed, much as block constructors do now.

In the future, control handlers could associate an API help text that is accessible through the meta-API on the top-level flow graph block. It's also possible to add type annotations to control handlers later, and leave the simpler dynamic typing for backwards compatibility, if needed.

I'd love to hear what you think!

rhodey commented 7 years ago

Hey @vsergeev, my apologies for taking so long to get back to you on this, busy with a new job :/ I really appreciate your thorough response!

Asynchronous control via RPC

Asynchronous control via Lua is first of course but for my use case I'm very interested in RPC especially. For simple FM-decoding flow graphs there's not a lot of application complexity, but when you start decoding heavy protocols like P25 I've found working in a high-level language like Java to be really helpful. 100% of the RadioWitness DSP stack is Java, and performance aside I've been very happy with it. What I'm really hoping to do is move all my downsampling/resampling into LuaRaio, possibly some decoding too, but keep the protocol logic inside Java.

Block-side

"Control handlers run in block process as block methods" seems like the sensible, intuitive thing to do :+1:. I've never had the need to propagate errors from LuaRadio blocks, so I can't speak much to that, but I understand that developers who want to implement more than just resampling & decoding will definitely need this. RE: read-process-write loop, I really like the architecture you've worked out with LuaRadio, definitely don't want to harm throughput, but I expect benchmarking to show it's possible to do async control w/ high enough throughput.

Flowgraph-side

I like all these methods, very intuitive :) block & method discovery would be neat but yeah not a blocker for me at all.

Example code

With this API would it be possible to have multiple top level blocks running at once? Maybe they're separate invocations of the same LuaRadio flow graph script, but I'd really like to be able to have one flow graph per software defined radio running in parallel.

The project I'm immediately trying to port to LuaRadio is chnlzr-server. It comes online configured to a single SDR but then exposes a TCP API for resampling the SDR sample stream into multiple smaller channels concurrently, these smaller channels are streamed in parallel to the requesting TCP clients. Maybe top level blocks could be given an ID in the namespace such that they can be distinguished individually for RCP.

Statically typed control handler args

Static typing wouldn't do me any good immediately, and I agree it seems "more Lua" to keep them dynamic. If developers want type safety hopefully they'll be able to interface with the RPC API easily enough to do their type-safe work in a more strongly typed language.

manfredhaertel commented 6 years ago

While "playing" with my new RTL2832 based dongle, I discovered luaradio and I like it very much. Using luaradio for this purpose is much easier to "experiment" with the dongle than using GNUradio or any other tool.

However, the big disadvantage of luaradio is that you can not change parameters at runtime, not even the frequency of the dongle. There is even no (official?) way to disconnect blocks from each other and connecting others.

Anyway, I wrote a LUA script, which implents a luaradio source, which acts as a client for rtl_tcp and is able to change the frequency (and, if I want, also gain etc.) of the dongle. But I would like to see the IPC feature implemented, which would solve all the disadvantages and make luaradio a really great tool.

konsumer commented 4 years ago

For me, initially, I could work out other stuff (RPC, automatic looking up of params, etc) as needed, if I just had a basic in-lua way to change a parameter, I could do a lot, so I started looking at monkey-patching RtlSdrSource as an example, to allow setFrequency.

I did this:

local radio = require('radio')

local ffi = require('ffi')
local librtlsdr_available, librtlsdr = pcall(ffi.load, "rtlsdr")

function radio.RtlSdrSource:setFrequency(freq)
  if self.dev and self.dev[0] then
    self.frequency = freq
    local ret = librtlsdr.rtlsdr_set_center_freq(self.dev[0], self.frequency)
    if ret ~= 0 then
        error("rtlsdr_set_center_freq(): " .. tostring(ret))
    end
  else
    print("No self.dev[0]!")
  end
end

It works (in terms of outputting radio data, as normal), and self is defined (if I print it I get RtlSdrSource [1102500 Hz]) but no self.dev[0]. What else I need to do? Is this evil monkey-patching the problem, like do I need to make (copy) one from scratch?

As a sidenote, I think it would be really cool to use automatic getters/setters like this I think it makes a nicer, more lua-native-feeling API (radioSource.frequency = 90.7e6). We could track setable/getable params and do it all automatically, and later use that info for RPC or whatever.

konsumer commented 4 years ago

I tried copying the file, and just adding setFrequency and had the same issue. It seems like self.dev is only available in RtlSdrSource:initialize_rtlsdr() and RtlSdrSource:run(). Do I need to do something else?

vsergeev commented 4 years ago

How were you trying to call setFrequency()? Keep in mind the RtlSdrSource will be running in its own process, which is where self.dev is created and lives, so a method like setFrequency() that uses self.dev would have to called from within the RtlSdrSource running process. However, the additional complication here is that the main librtlsdr API for receiving samples -- librtlsdr.rtlsdr_read_async() -- is blocking. So the process will be inside that function for the entire time while it's running.

The librtlsdr tools, e.g. rtl_tcp, address this by creating a separate thread, which would also have access to dev and can make parameter / tuning changes with it. A similar approach would probably to be used here, and ultimately that control thread would run the RPC server to handle commands (much like rtl_tcp listens for commands over TCP), and would dispatch to methods like your setFrequency().

konsumer commented 4 years ago

I am calling it outside of RtlSdrSource on the instantiated block, in love (using my new LoveAudioSinkMono):

local LoveAudioSinkMono = require('LoveAudioSinkMono')
local radio = require('radio')

local ffi = require('ffi')
local librtlsdr_available, librtlsdr = pcall(ffi.load, "rtlsdr")

local top
local sink
local radioSource

function radio.RtlSdrSource:setFrequency(freq)
  self.frequency = freq
  if self.dev and self.dev[0] then
      local ret = librtlsdr.rtlsdr_set_center_freq(self.dev[0], self.frequency)
      if ret ~= 0 then
          error("rtlsdr_set_center_freq(): " .. tostring(ret))
      end
  else
      print("No self.dev[0]!")
  end
end

function love.load()
  sink = LoveAudioSinkMono()
  radioSource = radio.RtlSdrSource(90.7e6 - 250e3, 1102500)
  top = radio.CompositeBlock():connect(
    radioSource,                                 -- RTL-SDR source, offset-tuned to 88.5MHz-250kHz
    radio.TunerBlock(-250e3, 200e3, 5),          -- Translate -250 kHz, filter 200 kHz, decimate by 5
    radio.FrequencyDiscriminatorBlock(1.25),     -- Frequency demodulate with 1.25 modulation index
    radio.LowpassFilterBlock(128, 15e3),         -- Low-pass filter 15 kHz for L+R audio
    radio.FMDeemphasisFilterBlock(75e-6),        -- FM de-emphasis filter with 75 uS time constant
    radio.DownsamplerBlock(5),                   -- Downsample by 5
    sink 
  )
  top:start()
end

function love.update(dt)
  sink.update()
  -- this doesn't work, but I think it should....
  radioSource:setFrequency(radioSource.frequency + dt)
  print(radioSource.frequency)
end

function love.quit()
  top:stop()
  sink:release()
end

This is a full radio example that otherwise works. Is there any other way to get the reference to self.dev[0]?

vsergeev commented 4 years ago

Unfortunately, you can't call it from the parent process like that, since dev doesn't exist outside the RtlSdrSource running process. When your program called top:start(), it forked into a new process for each of the blocks in your flow graph, and started running the processing loop of the block in its own process. At that point, each block child process lives forever in run().

When your parent love process returned from calling top:start(), all of those blocks are independently running processes, with their own memory and execution. The radioSource reference you have after the flow graph started is a phantom -- its actual running state and execution doesn't exist in the parent process. The multiprocess approach is how LuaRadio achieves parallelism and flexible scheduling, but the consequences are that you need to use some form of IPC (signals, pipes, sockets, etc.) to communicate with blocks from the parent process after they've started running.

konsumer commented 4 years ago

What about if you injected a callback that had access to to it, before the fork?

local frequency = 90.7e6
local oldFrequency = 90.7e6

function radioSource:handleUpdates(dev)
  if oldFrequency ~= frequency then
    oldFrequency = frequency
    local ret = librtlsdr.rtlsdr_set_center_freq(dev[0], frequency)
    if ret < 0 then
        error("rtlsdr_set_center_freq(): " .. tostring(ret))
    end
  end
end

-- set frequency over here, in a loop or whatever

then added calling self:handleUpdates and self:handleUpdates(self.dev) to the run loop?

vsergeev commented 4 years ago

When a process is forked, the entire memory of the parent process is cloned, but it's not shared. Any method you attach to radioSource in the parent process will be cloned into the child process. Calling it in the parent process will operate on the parent's phantom instance of radioSource with an uninstantiated dev, not the child's radioSource with the real instantiated dev. You can call it in the child process to operate on the child's instantiated dev, but then you don't have the new frequency from the parent -- just the one copy from the time of fork(). In order to communicate from parent process to/from the child process, you'll need some form of IPC -- pipes, sockets, signals, etc. The two processes' state are siloed off from each other.

POSIX shared memory (man 7 shm_overview) is also a form of IPC, and in theory you could instantiate dev there in a shared region, but that approach would some setup. Ultimately, the RPC solution over a UNIX socket would address this problem generally for all blocks.

edit: for clarity

konsumer commented 4 years ago

I appreciate the help & info.

I think implementing the RPC is a bit beyond my time & expertise. I will probably have to switch to gnuradio for my immediate project. Luaradio is such a good fit, otherwise. The rest of my project is in lua, and the efficiency luaradio brings really helps on a pizero with limited CPU, but configurable params is pretty important on a radio frontend.

POSIX shared memory (man 7 shm_overview) is also a form of IPC, and in theory you could instantiate dev there in a shared region, but that approach would some setup.

If you have any hints how to do this, I might be able to keep using luaradio, and that would make me very happy. Ugly is fine. I could just keep the worst part hidden in a lua-lib until proper RPC is implemented by someone, and then other love-users could also benefit.

konsumer commented 4 years ago

Also just one example block of the sort of RPC interface you expect (maybe RtlSdrSource or another central block) could probably get me going on implementing it on the others. Even though it's beyond my capabilities, I'm really motivated to help luaradio get this feature, because it's so dang awesome!

I have experience with gRPC (I co-authored a book about it) and web stuff (REST, graphql, etc) and other forms of modern RPC, so I'm sure I could help.

vsergeev commented 4 years ago

@konsumer I had a chance to look into this some more to try to put together an example for you. I wrote a variant of RtlSdrSource that created the device handle in a shared memory region opened with shm_open() and mmap(), so the device handle could be used by both the parent and child processes, and then the parent could make source:set_frequency() kind of calls. Unfortunately, the underlying libusb interface (and probably some underlying kernel API) doesn't take kindly to processes sharing a USB device, so this was a dead end (you can see it here if you're curious).

Most of the hard work for the RPC interface is in the core LuaRadio sample processing loop. For RPC support, there will be a new control message file descriptor that needs to be multiplexed with the sample data file descriptors in the main block runtime loop, and there will need to be seamless serialization and deserialization of the control messages, so unfortunately the feature is quite involved and intrusive and not easy to package up into a standalone example. Some source blocks will also present other challenges if their library API is blocking and takes control of the main thread. Those may require some sort of threading support in LuaRadio -- this is actually the case for librtlsdr, but not for libairspy, for example.

It will be implemented inevitably, but there's three more release cycles -- v0.8.0 coming out in the next week or so, v0.9.0 with a few new blocks and core improvements, and v0.10.0 with standalone applications (see https://github.com/vsergeev/luaradio/issues/65#issuecomment-690052888) before I can get to it.

konsumer commented 3 years ago

I keep coming back to this in my mind, as I would love to be able to use luaradio, and this is the main blocker for me, right now. I am thinking maybe I could implement something like rtl-udp for just rtl-sdr, and it would get me part of the way there. I'm not stuck on that message-format, if it seems bad, but it makes sense to me to use an existing thing as a standard, if there are no objections.

How would you feel about UDP ports opened for every block that could use dynamic control messages? One idea I had is a "find next free port" in block init, then save that in the object & start a socket-listener, then later send messages to that port to make it do stuff. This way, it will feel like the object code I wrote before (you call control updates on the parent block object) and seamlessly send UDP messages to the other thread.

Alternatively, I wonder if there isn't a better way to send messages across threads. It would be ideal, in my mind, if all the blocks didn't need to open UDP ports (like a shared port with addresses, or shared memory, or some other IPC method that doesn't open a bunch of ports/files.) Any hints would be much appreciated.

konsumer commented 3 years ago

maybe another nice wire-format would be binary OSC. It's fairly efficient, and a standard that could be used by other things easily, externally.

konsumer commented 3 years ago

I made a fltk async-control demo, but I'm not totally sure how to do the actual IPC part. I will keep playing with it.

konsumer commented 3 years ago

What about the idea of rtl_tcp as a new kind of block? I can run it with rtl_tcp and connect and send it commands with this python:

#! /usr/bin/env python3

import socket
import struct
import time

SET_FREQUENCY = 0x01
SET_SAMPLERATE = 0x02
SET_GAINMODE = 0x03
SET_GAIN = 0x04
SET_FREQENCYCORRECTION = 0x05

class RtlTCP(object):
  def __init__(self, host="localhost", port=1234, sample_rate=2048000):
    self.host = host
    self.port = port
    self.conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    self.conn.connect((self.host, self.port))
    self.set_samplerate(sample_rate)

  def set_frequency(self, freq):
    self.__send_command(SET_FREQUENCY, int(freq))

  def set_samplerate(self, rate):
    self.__send_command(SET_SAMPLERATE, int(rate))

  def set_gain(self, gain):
    self.__send_command(SET_GAIN, int(gain))

  def set_gainmode(self, mode):
    self.__send_command(SET_GAINMODE, int(mode))

  def set_frequency_correction(self, freq):
    self.__send_command(SET_FREQENCYCORRECTION, int(freq))

  def __send_command(self, command, parameter):
    cmd = struct.pack(">BI", command, parameter)
    self.conn.send(cmd)

if __name__=="__main__":
  sdr = RtlTCP()
  time.sleep(0.1)
  sdr.set_frequency(88.6 * 1e6)
vsergeev commented 3 years ago

How would you feel about UDP ports opened for every block that could use dynamic control messages? One idea I had is a "find next free port" in block init, then save that in the object & start a socket-listener, then later send messages to that port to make it do stuff. This way, it will feel like the object code I wrote before (you call control updates on the parent block object) and seamlessly send UDP messages to the other thread.

The plan for messaging and transport is still as described in https://github.com/vsergeev/luaradio/issues/40#issuecomment-270885155 -- i.e. JSON-RPC for wide interoperability, stream sockets, probably initially just UNIX domain sockets, followed by other transport layers which can be specified by a different schema when launching the RPC server with top:listen(...).

The solution I'm currently working on for #35 (the last todo for the v0.9.0 release) will introduce a control file descriptor that will be monitored by blocks in their main process loop and allow for more graceful shutdown of sources instead of sending SIGTERM. This control interface will then be extended to carry asynchronous control messages, so the groundwork for all of this is definitely being laid.

What about the idea of rtl_tcp as a new kind of block? I can run it with rtl_tcp and connect and send it commands with this python:

You could use a NetworkClientSource connected to rtl_tcp, but I believe commands sent back to rtl_tcp must be on the same TCP connection (and that it only handles one connection at a time), so you may need to create a proxy that can take commands from another connection or use a tool like rtlmux.

konsumer commented 3 years ago

You could use a NetworkClientSource connected to rtl_tcp, but I believe commands sent back to rtl_tcp must be on the same TCP connection

Yeh, I figured. I couldn't get it working in gnuradio, quickly, to test audio, but I thought TCP worked with only one connection, so It was a bit confusing how that would work. rtlmux sounds like it might be a temporary solution to that.

The idea of using regular rtl-sdr block and sending async commands in JSON-RPC over stream-sockets sounds wonderful. I guess I was hoping there might be some shortcut by using something that already exists, but maybe that doesn't really help.

I'm not great with C (I mean I can passably use it, but no expert) but if there is anything I can do to help, let me know. Happy to write lua or python for testing GUI/messaging/etc.