mne-tools / mne-lsl

A framework for real-time brain signal streaming with MNE-Python.
https://mne.tools/mne-lsl
BSD 3-Clause "New" or "Revised" License
56 stars 26 forks source link

Is there an option for real-time streaming in mne-lsl? #259

Closed timonmerk closed 3 months ago

timonmerk commented 4 months ago

Thanks again @mshamalainen for providing this great tool!

When testing the MNE-LSL player in combination with the MNE-LSL stream, we observed differences how fast data batches are processed depending on the respective hardware.

With this simple script using the example data having a duration of 150 s, the processing took on my machine around 28 s. Is this behavior expected? Intuitively I would have expected those to have real-time duration.

from mne_lsl.player import PlayerLSL
from mne_lsl.stream import StreamLSL

import numpy as np
from mne_lsl.datasets import sample
import mne

import time

fname = sample.data_path() / "sample-ant-raw.fif"

print(mne.io.read_raw_fif(fname)._last_time)

player = PlayerLSL(
    fname,
    name="test_stream",
    chunk_size=10,
    n_repeat=1,
)

start_time_player = time.time()

player = player.start()

stream = StreamLSL(name="test_stream", bufsize=2).connect(timeout=2)

last_time = time.time()

time_diff = time.time() - last_time  # in s
winsize = 1
sampling_interval = 1 / 10

data_ = []
time_ = []
data = None
check_data = None

while stream.connected:
    time_diff = time.time() - last_time  # in s
    if time_diff >= sampling_interval:
        last_time = time.time()

        if data is not None:
            check_data = data

        data, timestamp = stream.get_data(winsize=winsize)
        if (
            data is not None
            and check_data is not None
            and np.array_equal(data, check_data)
        ):
            stream.disconnect()
        else:
            print("retrieved data")
            data_.append(data)
            time_.append(timestamp)

end_time_player = time.time()

print(f"Duration of stream: {end_time_player - start_time_player} s")

Tagging here @SamedVossberg since he worked on that tests

mscheltienne commented 4 months ago

Hello @timonmerk @SamedVossberg, glad this library is of use to you! I'm not done playing with the snippet you provide, but I already have a bit of feedback for you:

(1) On my computer, copy/pasting your snippet, it ran for 5+ minutes before I interrupted execution. Which is still 'false'. (2) I can not reproduce the code snippet ending early as you have. (3) There is an issue with logging in ipykernel on VSCode which I discovered through your snippet, c.f. #260 (4) np.array_equal is designed to compare arrays of integers, with floating points you should prefer np.allclose (especially as here you are streaming in Volts EEG channels, in the range of e-6)

Why would it ran for longer than the raw duration? It boils down to how much you are hogging the CPU in the same process as the player is executed. Consider this code snippet that you can run with or without the time.sleep() line.

import time

from mne.io import read_raw_fif
from mne_lsl import set_log_level
from mne_lsl.datasets import sample
from mne_lsl.player import PlayerLSL

fname = sample.data_path() / "sample-ant-raw.fif"
raw = read_raw_fif(fname, preload=True).crop(0, 10).pick("eeg")

set_log_level("INFO")
player = PlayerLSL(
    raw,
    name="test_stream",
    chunk_size=10,
    n_repeat=1,
)
player.set_channel_units({elt: "microvolt" for elt in raw.ch_names})
start = time.time()
player = player.start()
time.sleep(10)  # <- comment or uncomment this line
while player._streaming_thread is not None:  # cheating a bit here :)
    pass
stop = time.time()

print(f"Duration of stream: {stop - start} s")
print(f"Duration of raw: {raw.times[-1]} s")

With the time.sleep():

Duration of stream: 10.170230150222778 s
Duration of raw: 10.0 s

Without the time.sleep():

Duration of stream: 27.57318687438965 s
Duration of raw: 10.0 s

What's happening here? You have an extremely fast permanent while loop which is hogging all of the execution time from the CPU, leaving almost nothing for the player to push new chunks of data. You can also observe this with the legacy viewer, in a separate terminal type mne_lsl_stream_viewer while starting the code snippet above with or without the sleep. You'll notice that without the sleep, the datapoints are coming in very slowly.

The player is here to mock a stream, in practice this stream will likely be created and active through a different process. I would recommend to start the player in a separate process, i.e. a different terminal/interpreter, which should get you a behavior closer to a real system.

Note also that the `chunk_size differs greatly between systems, and thus it should be adapted in the player to reflect the system you are trying to mock.

I'll have a second look next week when able, let me know if that already helps you!

mscheltienne commented 4 months ago

Also, in your example, the variable sampling_interval = 1 / 10 is compared to time in seconds, which is not correct, the delay between 2 push operations depends on both the chunk size and on the sampling frequency of the recording replayed.

timonmerk commented 3 months ago

Many thanks @mscheltienne for this thorough answer! I think I understand now that there is no internal "clock" that pushes samples in the Player which resembles the data in real-time of the stored data file. I guess this is also very difficult given different hardware specs and Python doesn't seem to be a super good language for quick timing as I found out...

Thanks for highlighting np.allclose, that's very important!

Maybe one explanaition to the sampling. With the if time_diff >= sampling_interval we wanted to ensure a server-based clock that sequentially pulls data with a defined sampling rate, which was here called with a sampling interval of 1/10, so 100ms. But given that no real-time can be guaranteed by the player, this makes the stream.get_data also dependent on the underlying hardware.

But many thanks for investigating this point!

mscheltienne commented 3 months ago

I'm not sure I'm following here. There is an internal clock which tries to push samples at the right time. However, samples are pushed per chunks.

Let's assume a sampling frequency of 1000 Hz and a chunk size of 100.

If the chunk size is not 100 but 1 samples, then the same logic applies:

In any case, the chunk size should be adapted to match what the real-time application you are faking, mocking, is actually using. For instance, if I take the LSL application for EEG amplifier X, it might send 10 sample per 10 samples; while the application for EEG amplifier Y sends 1 samples per 1 sample.

Also the player is always pushing samples to the best of the thread ability. If other functions are executed in the same thread and are hogging all of the CPU attention, then it will not manage to reliably hit the scheduled timings because the thread does not have the opportunity to do so (it's always busy doing something else). This is why I recommend starting the player in a separate interpreter/process. Note that I might add an option to do so directly from mne_lsl API or make it the default in the future.

To get back to your sampling interval, if the idea was to check every 100 ms for new data, that's completely fine. However, the way you are doing this check with the player in the same thread/process is hogging the CPU time away from the player. Thus it does not have the opportunity to do its job and to push samples. Below 2 propositions that would work with an 100 ms update interval:

In one interpreter:

player = PlayerLSL(...)
player.start()

In a second interpreter:

time_interval = 1 / 10  # 100 ms between 2 acquisition
stream = StreamLSL(...).connect(...)
start = time.time()
while True:
    if time.time() - start < time_interval:
        continue  # let's wait longer
    data, ts = stream.get_data()
    ...

In this example, I separate the player and stream process in 2 interpreters because this part of the while loop is hogging all of the thread time and attention:

start = time.time()
while True:
    if time.time() - start < time_interval:
        continue  # let's wait longer

Instead of this while loop hogging all of the CPU time, you can simply use a sleep to give time to the thread to do other tasks. Then, you can have both a player and stream in the same process, replicating decently a real-time application:

player = PlayerLSL(...)
player.start()

stream = StreamLSL(...).connet(...)
time_interval = 1 / 100

while True:
    time.sleep(time_interval)
    data, ts = stream.get_data()
    ...

With both of those snippet, you should be able to mimic a real-time stream very closely. For instance, you can have a look to this example I recently wrote (currently part of the dev doc): https://mne.tools/mne-lsl/dev/generated/examples/00_peak_detection.html#sphx-glr-generated-examples-00-peak-detection-py It's a real-time peak detector. At the end, I measure how much time the detector needs to detect a cardiac R-peak entering the buffer, and the delays are all other the place.. because I used a player with a chunk size of 64 samples! Running the same script with a chunk size of 1 sample, I get timings around 5 to 6 ms which exactly match the timings I get if I connect this detector to an ANT EEG amplifier and run the detector on true signal.

timonmerk commented 3 months ago

Many thanks for those explanations! From what I understand, option 1 is much more stable.

I had previously encountered also this issue under Windows 11 with time.sleep(). It didn't seem possible to sleep below 10-15 ms. And for some of our applications we wanted to ensure a sampling rate of 1 kHz with chunk size 1, so each ms a new sample. The only way around it seemed to be to continuously check with time.time() if 1 ms has passed. But I agree, multiprocessing should get rid of the issue.

mscheltienne commented 3 months ago

Sure time.sleep is not perfect, but it's not as bad as 15 years ago (that post is from 2009, mentions Ubuntu 10.04 (we are 24.04 LTS) and was likely run in Python 2). Take the following code snippet:

import time

from matplotlib import pyplot as plt

for target in (0.05, 0.01, 0.001):
    delays = list()
    for k in range(100):
        start = time.time()
        time.sleep(target)
        end = time.time()
        delays.append(end - start)
    f, ax = plt.subplots(1, 1, layout="constrained")
    ax.hist(delays, bins=20)
    ax.axvline(target, color="red")
    ax.set_title(f"Target: {target} seconds.")
plt.show()

I'm on Windows at the moment and got:

image

So for the sleep with a 1 ms target, I either wake up after 1 ms.. or after 2 ms. Which is pretty close from the target I wanted ;) And you can get lower, e.g. 0.1 ms:

image

I'm missing the target by 0.4 ms, landing usually at 0.5 ms. IIRC, on windows and python 3.11+ it now uses a high resolution timer with a resolution in the nanoseconds.

mscheltienne commented 3 months ago

And for some of our applications we wanted to ensure a sampling rate of 1 kHz with chunk size 1, so each ms a new sample. The only way around it seemed to be to continuously check with time.time() if 1 ms has passed. But I agree, multiprocessing should get rid of the issue.

The way I would do this:

(1) for mock purposes, use a player in a separate interpreter (2) for real application purposes, ensure your LSL application is pushing every sample one by one with an high-precision clock

Then for the client stream side, I would indeed have a CPU intensive loop with a tiny sleep at the end.

stream = StreamLSL(...).connect(processing_flags="all")
while True:
    if stream.n_new_samples != 0:
        data, ts = stream.get_data()  
        ...  # do something with this data
    time.sleep(0.0001)  # give a bit of time outside the loop to the thread

This last sleep is here to give a bit of time to the Thread to acquire (pull) new samples in the buffer. In the future, I might add an option to manually run the acquisition step to enable this kind of loop without any need for idle time:

while True:
    stream.acquire()
    if stream.n_new_samples != 0:
        data, ts = stream.get_data()
        ...

c.f. #262

mscheltienne commented 3 months ago

I'm closing this issue: 1.4 is out, with added tutorials and documentation around this CPU hogging issue. This tutorial shows how to run a player in a separate process: https://mne.tools/mne-lsl/stable/generated/examples/00_player_separate_process.html#sphx-glr-generated-examples-00-player-separate-process-py

This method allows single threaded acquisition: https://mne.tools/mne-lsl/stable/generated/api/mne_lsl.stream.StreamLSL.html#mne_lsl.stream.StreamLSL.acquire Associated with this tutorial: https://mne.tools/mne-lsl/stable/generated/tutorials/30_stream_manual.html#sphx-glr-generated-tutorials-30-stream-manual-py

timonmerk commented 3 months ago

Many thanks! We will directly implement it :)