mne-tools / mne-lsl

A framework for real-time brain signal streaming with MNE-Python.
https://mne.tools/mne-lsl
BSD 3-Clause "New" or "Revised" License
61 stars 28 forks source link

EpochsStream acquisition_delay for separate event stream #369

Open mzgsxs opened 1 day ago

mzgsxs commented 1 day ago

Hi I'm trying to understand and use EpochsStream class for my work.

epochs = EpochsStream(
    stream,
    bufsize=20,
    event_id=None,
    event_channels="LeftCue",
    event_stream=stream_events,
    tmin=-0.2,
    tmax=0.5,
    baseline=(None, 0),
    picks='eeg'
).connect(acquisition_delay=0.1)
epochs.info

It seems that if I use a separate event stream, connect with acquisition_delay < tmax, EpochsStream will always reject the events in ts_event. Is this behavior normal and expected or should this be considered as a bug?

https://github.com/mne-tools/mne-lsl/blob/1c60e82e70ed4f203a4a627b6b26ccc89864d9cc/src/mne_lsl/stream/epochs.py#L921

maybe it should be checked here, to see if it's smaller than tmax?

https://github.com/mne-tools/mne-lsl/blob/1c60e82e70ed4f203a4a627b6b26ccc89864d9cc/src/mne_lsl/stream/epochs.py#L317

thank you

mscheltienne commented 9 hours ago

I don't think acquisitin_delay and tmax can interact in such a way, I think you are facing a different issue which yields the rejection of all events. If you take the tutorial Epoching a Stream in real-time, it comes down to:

import time

import numpy as np
from mne import annotations_from_events, find_events
from mne.io import read_raw_fif
from mne_lsl.datasets import sample
from mne_lsl.lsl import resolve_streams
from mne_lsl.player import PlayerLSL
from mne_lsl.stream import EpochsStream, StreamLSL

fname = sample.data_path() / "mne-sample" / "sample_audvis_raw.fif"
raw = read_raw_fif(fname, preload=False).pick(("meg", "stim")).load_data()
events = find_events(raw, stim_channel="STI 014")
events = events[np.isin(events[:, 2], (1, 2))]  # keep only events with ID 1 and 2
annotations = annotations_from_events(
    events,
    raw.info["sfreq"],
    event_desc={1: "ignore", 2: "event"},
    first_samp=raw.first_samp,
)
annotations.duration += 0.1  # set duration since annotations_from_events sets it to 0

raw.set_annotations(annotations)
player = PlayerLSL(
    raw, chunk_size=200, name="tutorial-epochs-2", source_id="test", annotations=True
).start()

resolve_streams()

stream = StreamLSL(bufsize=4, name="tutorial-epochs-2", source_id="test")
stream.connect(acquisition_delay=0.1, processing_flags="all")
stream.info["bads"] = ["MEG 2443"]  # remove bad channel
stream.filter(None, 40, picks="grad")  # filter signal

stream_events = StreamLSL(
    bufsize=20, name="tutorial-epochs-2-annotations", source_id="test"
)
stream_events.connect(acquisition_delay=0.1, processing_flags="all")

epochs = EpochsStream(
    stream,
    bufsize=20,  # number of epoch held in the buffer
    event_id=None,
    event_channels="event",  # this argument now selects the events of interest
    event_stream=stream_events,
    tmin=-0.2,
    tmax=0.5,
    baseline=(None, 0),
    picks="grad",
).connect(acquisition_delay=0.1)

while epochs.n_new_epochs < 10:
    time.sleep(0.5)

data = epochs.get_data(n_epochs=epochs.n_new_epochs)

epochs.disconnect()
stream.disconnect()
stream_events.disconnect()
player.stop()

Where you have acquisition_delay set to 0.1 and tmax to 0.5; matching your condition. Yet epochs are acquired.


acquisition_delay simply sets a timer. Every tick, the acquisition thread will look if new data is available. The new data is always associated with its LSL timestamps.

tmin / tmax simply define how you cut the epoch out of the LSL timeline, as you would in MNE-Python with a Raw / Epoch object.

The steps in _prune_events are: https://github.com/mne-tools/mne-lsl/blob/1c60e82e70ed4f203a4a627b6b26ccc89864d9cc/src/mne_lsl/stream/epochs.py#L896-L928


If all of your epochs are dropped, I think the likely reasons is: https://github.com/mne-tools/mne-lsl/blob/1c60e82e70ed4f203a4a627b6b26ccc89864d9cc/src/mne_lsl/stream/epochs.py#L911C1-L916C81

I would increase the buffer size of both StreamLSL object.

mzgsxs commented 4 hours ago

Thank you so much for the reply, because my event stream has irregular sampling rate, that's why I was thinking maybe it's due to acquisition_delay. I'm trying to manually make a new event stream and push events into the stream, here is my code:

import mne
from mne_lsl.stream import StreamLSL
import time
import numpy as np
from matplotlib import pyplot as plt

# Connect to the LSL stream (replace 'OpenBCI_EEG' with your stream name if different)
stream_name = "obci_eeg1"
stream = StreamLSL(name=stream_name, bufsize=10)
stream.connect(acquisition_delay=0.01, processing_flags="all")
import time
import uuid

import numpy as np

from mne_lsl.lsl import (
    StreamInfo,
    StreamInlet,
    StreamOutlet,
    local_clock,
    resolve_streams,
)

source_id_cue = uuid.uuid4().hex
sinfo_cue = StreamInfo(
    name="Cue-stream",
    stype="Event",
    n_channels=2,
    sfreq=0,
    dtype="float64",
    source_id=source_id_cue,
)
sinfo_cue.set_channel_names(["LeftCue", "RightCue"])
sinfo_cue.set_channel_types("cue")
sinfo_cue.set_channel_units("Bool")

outlet = StreamOutlet(sinfo_cue)

I have two streams right now:

< sInfo 'obci_eeg1' >
  | Type: EEG
  | Sampling: 250.0 Hz
  | Number of channels: 4
  | Data type: <class 'numpy.float32'>
  | Source: openbcigui

< sInfo 'Cue-stream' >
  | Type: Event
  | Sampling: Irregular
  | Number of channels: 2
  | Data type: <class 'numpy.float64'>
  | Source: c7ece86c457341738ec04465848b6dbf

outlet event stream and this stream event works just fine

stream_events = StreamLSL(name="Cue-stream", source_id=source_id_cue, bufsize=20)
stream_events.connect(acquisition_delay=0.01, processing_flags='all')
# outlet.push_sample(np.array([0., 0.1]))
outlet.push_sample(np.array([0.1, 0.]))
time.sleep(0.01)
# will need special treatment for n_new_samples = 0 case
data, ts = stream_events.get_data(stream_events.n_new_samples)
print(data, ts)
print(data.shape, ts.shape)
[[0.1]
 [0. ]] [154476.24553867]
(2, 1) (1,)

But EpochsStream just doesn't work as expected

from mne_lsl.stream import EpochsStream

epochs = EpochsStream(
    stream,
    bufsize=20,
    event_id=None,
    event_channels="LeftCue",
    event_stream=stream_events,
    tmin=-1.0,
    tmax=2.0,
    baseline=(None, 0),
    picks='eeg'
).connect(acquisition_delay=0.01)
epochs.info
outlet.push_sample(np.array([0.1, 0.]))
# outlet.push_sample(np.array([0., 0.1]))
time.sleep(0.01)

The following are the inputs value to _prune_events function, [] are events which all been pruned.

[[0 0 0]
 [1 0 0]]
None
750
(2500,)
154539.77138322216 154549.76862672862
[154476.24553867 154549.80302879]
-250
[]
[]
[]
[]
[]

it seems that if I change acquisition_delay to 4.0, it kinda worked, although it still doesn't;t work some of the time. Am I using this class in the right way?

mzgsxs commented 4 hours ago

I think the problem is when outlet.push_sample() is called, it triggers EpochsStream's _acquire right away, this causes the drop of the event, as epoch data is not ready yet, however there is no additional _acquire planed ahead which causes this event to be dropped permenately

mscheltienne commented 3 hours ago

That's too many inputs for the function signature:

def _prune_events(
    events: NDArray[np.int64],
    event_id: dict[str, int] | None,
    buffer_size: int,
    ts: NDArray[np.float64],
    last_ts: float | None,
    ts_events: NDArray[np.float64] | None,
    tmin_shift: float,
) -> NDArray[np.int64]:

Can you clarify a bit?

I think the problem is when outlet.push_sample() is called, it triggers EpochsStream's _acquire right away, this causes the drop of the event, as epoch data is not ready yet, however there is no additional _acquire planed ahead which causes this event to be dropped permenately

If you provide acquisition_delay=0.1, it will call every 0.1s the method _acquire in the background thread (except if you are hogging CPU resources in the main thread, in which case the background thread will try to wake up and to call _acquire when the main thread gives him some headroom).

I'll have another look at your example tomorrow.

mzgsxs commented 1 hour ago

thank you, sorry for my poor explanation

I think the problem is when outlet.push_sample() is called, it triggers EpochsStream's _acquire right away, this causes the drop of the event, as epoch data is not ready yet, however there is no additional _acquire planed ahead which causes this event to be dropped permenately

If I've understood your explanation correctly, acquisition_delay act as a timer and defines how often stream_events.n_new_samples is checked. if acquisition_delay is small, _acquire is called almost right after stream_events.n_new_samples > 0, however at this point, if tmax is greater than the time passed from the tick/when _acquire was called (data from stream buffer does not have data for one complete epoch), the event is rejected and pruned. but since stream_events.n_new_samples has been reset to 0 after get_data call, if no new event occur after this and within stream's bufsize time (10s in the above example), further _acquire won't be able to get epoch data as it;s already out of the stream's buffer.

mscheltienne commented 59 minutes ago

If I've understood your explanation correctly, acquisition_delay act as a timer and defines how often stream_events.n_new_samples is checked. if acquisition_delay is small, _acquire is called almost right after stream_events.n_new_samples > 0, however at this point, if tmax is greater than the time passed from the tick/when _acquire was called (data from stream buffer does not have data for one complete epoch), the event is rejected and pruned. but since stream_events.n_new_samples has been reset to 0 after get_data call, if no new event occur after this and within stream's bufsize time (10s in the above example), further _acquire won't be able to get epoch data as it;s already out of the stream's buffer.

Still no ;) But closer.

acquisition_delay does act as a timer, and at every tick, EpochsStream._acquire is called.

defines how often stream_events.n_new_samples is checked

This is erroneous, we don't care about the variable n_new_samples. The StreamLSL stream_events has an internal buffer of N samples (LSL timestamp + values). As long as the buffer size is not exceeded, samples are not removed from this buffer. Once the buffer size is exceeded, it acts as a circular buffer.

Every tick of the acquisition_delay timer. the method EpochsStream._acquire is called which:

Thus, if an event was prune at one of the iteration, it will probably get consumed at a later iteration.

mscheltienne commented 55 minutes ago

But I think you are right that the description I just did is not what the code is doing.

This public get_data call is resetting the number of new samples here: https://github.com/mne-tools/mne-lsl/blob/c274e56f496ab43353580acefa5cd73cd0f1276b/src/mne_lsl/stream/base.py#L615

I need to have a look tomorrow, but that might be the problem.

mscheltienne commented 26 minutes ago

Tests are passing here: https://github.com/mne-tools/mne-lsl/pull/371 Can you give a shot to the changes in this PR? You can install the wheel for your platform available at the end of the CI summary: https://github.com/mne-tools/mne-lsl/actions/runs/12075781960?pr=371

(macos-native: intel CPUs) (macos-latest-native: arm64 CPUs)

I'll update the PR tomorrow with a test reflecting your issue.

mzgsxs commented 9 minutes ago

thank you, I will try this first.

https://github.com/mne-tools/mne-lsl/actions/runs/12075781960?pr=371