SeanezLab / BoMI-StartReact

https://seanezlab.github.io/BoMI-StartReact/
0 stars 0 forks source link

Use a deque for the buffer classes #18

Closed seasonedfish closed 1 year ago

seasonedfish commented 1 year ago

The buffers are fixed-length FIFOs, and they currently use list slicing to update their contents.

We should be able to use deques to achieve this functionality. According to the Python wiki, list set slice has an O(k+n) time complexity, while deque extend has an O(k) time complexity. So, using deques should make the code both easier to understand and faster.

kwsp commented 1 year ago

After our discussion today, I wrote a benchmark for fun. Indeed, for the purpose of keeping a deque data structure, the deque class from the python std is faster. However, I just remembered that I'm only keeping this buffer in memory for visualization - every time a new packet comes, we plot the whole buffer on screen. Benchmarking again and adding a read on the whole buffer every time a new packet comes, the np.array buffer performs much better, since the elements are stored contiguous in memory and the read is basically free.

[Bench (No access)]     BufferDeque(bufsize=1000, n_channels=3, n_packets=1000000) took 0.729s (7.29e-07s/it)
[Bench (No access)]     BufferArray(bufsize=1000, n_channels=3, n_packets=1000000) took 1.729s (1.73e-06s/it)
[Bench (with access)]   BufferDeque(bufsize=1000, n_channels=3, n_packets=1000000) took 97.989s (9.8e-05s/it)
[Bench (with access)]   BufferArray(bufsize=1000, n_channels=3, n_packets=1000000) took 1.771s (1.77e-06s/it)
Benchmark code ```python from abc import ABC, abstractmethod from collections import deque import time import numpy as np class BufferBase(ABC): """ A BufferBase subclass hold a buffer of size (bufsize, n_channels) of float64 values. """ @abstractmethod def __init__(self, bufsize: int, n_channels: int): "Construct the Buffer class" @abstractmethod def add_packet(self, packet: tuple[float, ...]): "Receive a packet of `n_channels` floats." @abstractmethod def get_buf(self) -> np.ndarray: "Get the buffer to a np array." class BufferDeque(BufferBase): def __init__(self, bufsize: int, n_channels: int): self.bufsize = bufsize self.n_channels = n_channels self.buf = deque([(0.0,) * n_channels for _ in range(bufsize)]) def add_packet(self, packet: tuple[float, ...]): self.buf.popleft() self.buf.append(packet) def get_buf(self) -> np.ndarray: return np.array(self.buf) class BufferArray(BufferBase): def __init__(self, bufsize: int, n_channels: int): self.bufsize = bufsize self.n_channels = n_channels self.buf = np.zeros((bufsize, n_channels), dtype=np.float64) def add_packet(self, packet: tuple[float, ...]): self.buf[:-1] = self.buf[1:] self.buf[-1] = packet def get_buf(self) -> np.ndarray: return self.buf def _test_bufsize(_buf_class): # Test size bufsize = 10 n_channels = 3 b: BufferBase = _buf_class(bufsize, n_channels) _buf = b.get_buf() assert len(_buf) == bufsize assert _buf.shape[-1] == n_channels bufsize = 123 b = _buf_class(bufsize, n_channels) _buf = b.get_buf() assert len(_buf) == bufsize assert _buf.shape[-1] == n_channels def _test_buf(_buf_class): bufsize = 10 n_channels = 3 b: BufferBase = _buf_class(bufsize, n_channels) packets = np.random.random((bufsize, n_channels)) for packet in packets: b.add_packet(tuple(packet)) _buf = b.get_buf() assert np.allclose(_buf, packets) def test(): _test_bufsize(BufferDeque) _test_bufsize(BufferArray) _test_buf(BufferDeque) _test_buf(BufferArray) def _bench_no_access(_buf_class, bufsize: int, n_channels: int, n_packets: int): b: BufferBase = _buf_class(bufsize, n_channels) start = time.time() for _ in range(n_packets): packet = np.random.random(n_channels) b.add_packet(tuple(packet)) elapsed = time.time() - start print( f"[Bench (No access)]\t{_buf_class.__name__}({bufsize=}, {n_channels=}, {n_packets=}) took {elapsed:.3f}s ({elapsed/n_packets:.3g}s/it)" ) return elapsed def _bench_with_access(_buf_class, bufsize: int, n_channels: int, n_packets: int): """ Read the whole buffer everytime a new packet comes. """ b: BufferBase = _buf_class(bufsize, n_channels) start = time.time() for _ in range(n_packets): packet = np.random.random(n_channels) b.add_packet(tuple(packet)) _ = b.get_buf() elapsed = time.time() - start print( f"[Bench (with access)]\t{_buf_class.__name__}({bufsize=}, {n_channels=}, {n_packets=}) took {elapsed:.3f}s ({elapsed/n_packets:.3g}s/it)" ) return elapsed def bench(): _bench_no_access(BufferDeque, 1000, 3, 1_000_000) _bench_no_access(BufferArray, 1000, 3, 1_000_000) _bench_with_access(BufferDeque, 1000, 3, 1_000_000) _bench_with_access(BufferArray, 1000, 3, 1_000_000) if __name__ == "__main__": test() bench() ```
seasonedfish commented 1 year ago

Ah, thanks for this. Very enlightening, and good thinking about adding a benchmark with a read.