open-ephys / plugin-GUI

Software for processing, recording, and visualizing multichannel electrophysiology data
https://open-ephys.org/gui
GNU General Public License v3.0
193 stars 684 forks source link

Interest in improvements for streaming / API capabilities of OpenEphys? #372

Closed pbotros closed 2 years ago

pbotros commented 4 years ago

Hi,

In our fork of OpenEphys (https://github.com/carmenalab/plugin-GUI), we've made a couple changes to integrate it into our experimental environment that might be beneficial to the broader OpenEphys community. I'll outline some of the changes in this fork here, and if any of these seem interesting to you all, let me know and I can work on merging these into development/master, or feel free to pull in whatever you see fit yourself.

Background For some background - we do BMI in our lab, both in rodent and NHP models. We have a number of sensors, Raspberry Pi / Arduinos, and computers in our current experimental setup for rodent BMI, with our data needs centralized through River (https://github.com/pbotros/river). We've previously used TDT to acquire our neural data, which exposes both streaming APIs (via a separate PCI-e card) and control APIs (via HTTP) that allows it to integrate in a modular fashion to the rest of our stack. The data acquisition is connected to the rest of the computers/sensors in a pseudo-"microservices" stack, where an underlying infrastructure passes commands between various services in our ecosystem and each service can consume and/or produce data streams. For example, our data acquisition is done in our "service", another service consumes that data and detects spikes / calculates firing rates, another "service" outputs to a speaker for our BMI output for the animal, another manages a camera, etc.

OpenEphys API We're now integrating Neuropixels into our experiments, which to us meant using OpenEphys. To retain the same modularity as we had before -- where our data acquisition setup is an isolated module and doesn't influence the rest of the stack -- we wanted OpenEphys to have an API such that we could interact with it and pull/push data into it. In particular, we wanted ways to easily interact with OpenEphys's "Core" state (like recording status or data directories) but also processor-specific state, such as a particular flag in one of the processors in the chain.

To address this, we've implemented an HTTP server that exposes a "RESTful" interface, with endpoints such as:

Processors can expose whichever parameters they want, and are in charge of responding to the setting of their own parameters. It uses the already-existing Parameter class.

The above HTTP is similar in spirit to the NetworkEvents plugin backed by ZeroMQ, but the lack of structure / flexibility of ZeroMQ makes it a bit more painful to work with (both on the server and client sides). Using HTTP means it should be seamlessly integrate-able for just about all languages and all skill levels of developers, and RESTful interfaces are good for intuition. Plus, the HTTP library integrated here is a header-only library pulled in via CMake, so hopefully does not add much developer burden.

Sorting In addition, it's time-consuming to robustly sort Neuropixels data by hand, as we could previously could quickly do with lower-channel-count recordings for our BMI experiments (though even that was neither very accurate nor fun). In response to this, we've integrated Kilosort2 sorting in an offline step, and, using the API infrastructure mentioned above, have improved the existing SpikeSorter:

All of the changes above were done to be intentionally backwards-compatible with the existing SpikeSorter functionality, so should hopefully only be net-beneficial (though testing & code review will lend some more confidence here on the backwards compatible).

River Output We made a separate Plugin that is designed to pipe data in OpenEphys out to River, our current solution for handling streaming data. River affords for high-throughput data (since it's backed by Redis) that is defined by a "schema", so that all data is also structured. We've used this as our "communication bus" between our services, so the idea here is to use River to just pipe particular pieces of data out of OpenEphys as needed by the rest of the stack.

This is similar in spirit to the ZMQInterface plugin, but, while ZeroMQ is undoubtedly higher-throughput than River/Redis, ZeroMQ requires a bit more client setup, doesn't have any strictly defined schema built-in to it, and doesn't have any built-in support for persistence. We figured to prioritize ease-of-use and schemas when deciding to use River.

We'll likely make this plugin available via the normal third-party plugin routes, but just flagging it here explicitly in case it interests you all. I see that there's a Github Project on data streaming, and I'd think River fits pretty well into your needs here (bias disclaimer: I'm the author).

====

We're still early in our OpenEphys integration, so we'll undoubtedly be iterating on our experimental setup and the above functionality as we go further, but if you all think it's helpful, I'm happy to work on merging some of the above changes to development. Let me know, and feel free to look through the fork. If you're not interested, happy to close out this issue.

Thanks! Paul

jsiegle commented 4 years ago

Hi Paul,

Thanks for the update, I think all of these improvements would be of interest to the broader community! As you may have seen with the announcement of GUI v0.5.0, we're now distributing plugins via the built-in Plugin Installer. This would be the perfect mechanism for sharing the River Output. In order to do this, you'd need to create a separate repo for this plugin (instructions for this are coming soon on our new documentation site), which we would then fork to the open-ephys-plugins account in order to set up automated builds. Do you have examples of the types of downstream software you're using this plugin with?

The ProcessorGraphHttpServer looks awesome! We will plan on integrating this into the next major release. We're currently drafting a list of core GUI upgrades for the next release, in order to get feedback from the community, and we'll be sure to include this on there.

The SpikeSorter improvements also sound super useful. We are working on polishing that module and migrating it to its own repo, so it can be upgraded independently of the host application...this is our plan for all plugins moving forward. We will incorporate your changes when we do so (likely before the end of the year).

I saw you also added a HeartbeatNode on your fork. How does that fit into your larger setup?

pbotros commented 4 years ago

Hi Paul,

Thanks for the update, I think all of these improvements would be of interest to the broader community! As you may have seen with the announcement of GUI v0.5.0, we're now distributing plugins via the built-in Plugin Installer. This would be the perfect mechanism for sharing the River Output. In order to do this, you'd need to create a separate repo for this plugin (instructions for this are coming soon on our new documentation site), which we would then fork to the open-ephys-plugins account in order to set up automated builds. Do you have examples of the types of downstream software you're using this plugin with?

Good to know re: the new plugin distribution mechanism; I'll definitely follow up with that route. I'll look forward to that documentation.

Re: some examples: sure! A straightforward example is our BMI "decoder", which bins spikes into firing rates and computes a "control signal" which can be mapped to an output. Spikes are output to their own River stream from a different process, and the decoder reads one bin's worth (~50ms) of spikes from that stream and then writes to its own stream.

Some code that gives you the gist of how it works in our stack (this is Python; there are Python bindings with compatibility with NumPy but River is in C++ so bindings are possible in many other languages):

# Initializing writing the decoder stream
self.decoder_stream = river.StreamWriter(environment.redis_connection())
self.decoder_stream.initialize(
    decoder_stream_info.stream_name(session.session_name),
    decoder_stream_info.schema(),
    {
        'sampling_rate': str(1.0 / session.configuration.decoder_bin_size),
    })
self.decoder_stream_buffer = np.empty((1,), dtype=self.decoder_stream.schema.dtype())

# Initializing reading the spikes stream
self.spike_stream = river.StreamReader(environment.redis_connection())
self.spike_stream.initialize(SpikeDetectorStreamInfo.stream_name(session.session_name), 30000)
self.spike_stream_buffer = np.empty((10000,), dtype=self.spike_stream.dtype())
# Main decoder loop:
while not self._should_stop:
    # Blocks for 50ms and returns as many spikes as possible since last read
    num_read = self.spike_stream.read(self.spike_stream_buffer, 50e-3)
    spikes = self.spike_stream_buffer[:num_read]
    firing_rates = compute_firing_rates(spikes)
    control_signal = self._decode_firing_rates(firing_rates)
    self.decoder_stream_buffer[0]['control_signal'] = control_signal
    self.decoder_stream.write(self.decoder_stream_buffer)

There's then more processes consuming the decoder stream, like the process responsible for outputting an auditory tone according to the decoder's output.

In parallel to all of this, the "ingester" is running as well, which automatically takes all of these various streams and persists them to disk in a .parquet file (more info in the River README). This then lets us do something like the following simple example for post-experiment analysis that plots the decoder over time:

import pandas as pd
import matplotlib.pyplot as plt
df = pd.read_parquet('/path/to/river_streams/some-decoder-stream-name/data.parquet')
plt.plot(df['timestamp_ms'], df['control_signal'])

These are just of the more straightforward examples on how we've used it - we're also using River in some more complicated scenarios where it's enabled us to "horizontally" scale instead of vertically. For example, for another project, we use River as a bridge between a local data acquisition machine and a machine in the AWS cloud for some heavy, semi-realtime computing.

Anyways - happy to chat more about River if you all are interested. I'm personally curious how you might be thinking of streaming from the open-ephys perspective; there's certainly things out there I haven't considered that could possibly be better solutions than River for addressing the "streaming problem" in neuroscience.

The ProcessorGraphHttpServer looks awesome! We will plan on integrating this into the next major release. We're currently drafting a list of core GUI upgrades for the next release, in order to get feedback from the community, and we'll be sure to include this on there.

Great to hear!

The SpikeSorter improvements also sound super useful. We are working on polishing that module and migrating it to its own repo, so it can be upgraded independently of the host application...this is our plan for all plugins moving forward. We will incorporate your changes when we do so (likely before the end of the year).

Migrating it to its own repo makes sense, that's a good move. Let me know if/how I can help with integrating these changes when the time comes.

I saw you also added a HeartbeatNode on your fork. How does that fit into your larger setup?

Good question - it's related to River and synchronization. The idea is to give us a way to synchronize the Neuropixel samples with the rest of the River streams, e.g. our decoders, cameras, etc. Everything written to River gets a server timestamp assigned to it (c.f. https://redis.io/topics/streams-intro), and so by outputting an explicit entry into River with the last read sample from the Neuropixel, we can establish a Neuropixel sample index <=> River server timestamp map. We will then use this in our postprocessing pipeline to align all of the various other "software" pieces of data to our neural data from "hardware". This approach is similar to the more common alternative of sending a hardware TTL to the data acquisition system to synchronize neural data with other components, but we found this heartbeat solution a bit easier and more flexible when working with a softish-realtime system like ours.

jsiegle commented 2 years ago

Hi @pbotros – it's been a while, but we are almost ready to make your HTTP server a standard part of the GUI. It will be included by default in our next release, version 0.6.0 (see #493 for draft release notes).

The latest implementation is here: https://github.com/open-ephys/plugin-GUI/blob/development-juce6/Source/Utils/OpenEphysHttpServer.h

We've added a few features, such as more fine-grained control over recording options, as well as the ability to use the server to broadcast messages to plugins during acquisition. Overall, it's been extremely helpful, and we think it will be an essential part of the GUI moving forward. We'd love to have your feedback on the HTTP Server, or any other aspect of v0.6.0.

Since we copied the source code from your fork, you're not currently credited as a contributor. If you could open a simple pull request to get your GitHub account onto the contributor's list, that would be greatly appreciated!

pbotros commented 2 years ago

Great news @jsiegle ! The v0.6.0 release looks exciting and seems to have many broad steps in the right direction; glad the HTTP server made the cut. Is there an approximate date for v0.6.0 release you're aiming for? If it's flexible, I should be able to look over the changes in the next ~week or so, otherwise I can adjust my priorities. Thanks!

jsiegle commented 2 years ago

We will be testing for at least a month before the official release, so anytime in the next few weeks is good!

jsiegle commented 2 years ago

Version 0.6.0 of the GUI has now been officially released, and incorporates the HTTP Server code you developed. Thanks a lot for that!

The Spike Sorter now lives in its own repository. If you'd like to incorporate some of your changes to that plugin, you can open up a pull request there.

We are interested in making the River Output available via the Plugin Installer later this year — we'll be in touch soon about that.

pbotros commented 2 years ago

Great to hear @jsiegle - and sorry about never getting around to looking back over the existing HTTP code. Glad it got shipped!

Re: Spike Sorter: definitely, that could be something I see myself taking on in the near future. There might be a few straightforward parts to break off of my implementation that could help the Spike Sorter, namely proper threshold crossing detection and supporting more than 2D ellipsoids for PCA-based sorting.

Re: River: of course! If you're curious, its documentation has been revamped recently, and happy to chat about it at any time. For making it available via the Plugin Installer, aside from some cleanup on the plugin itself, would I follow the "Creating a new plugin" instructions?

jsiegle commented 2 years ago

We actually split off the spike detection code from the Spike Sorter, so now it just modifies the sortedID value for spikes that were generated upstream. This prevents the need for redundant code between the Spike Detector and the Spike Sorter, and doesn't seem to add any processing overhead. We've also made a bunch of improvements to the Spike Detector, including automated threshold setting and a much more user-friendly configuration interface.

So I think the threshold-related changes would need to go into the Spike Detector, while the multi-D ellipsoids would go into the Spike Sorter.

For the River Plugin, you should start with the processor-plugin-template and add the necessary code to that. Once you have the basic functionality in place, we can fork it and configure everything for distribution via the Plugin Installer.