ezmsg-org / ezmsg

Pure-Python DAG-based high-performance SHM-backed pub-sub and multi-processing pattern
https://ezmsg.readthedocs.io/en/latest/
MIT License
9 stars 4 forks source link

[For Discussion] Should a Unit's core logic come from a generator method or a class instance? #116

Open cboulay opened 2 months ago

cboulay commented 2 months ago

tl;dr - a generator method is more easily inspectable with meta programming but so far that has bugs; a class instance is more intuitive and makes other use cases viable.


For me and my colleagues, the most compelling feature of ezmsg is how reusable the processing code is outside the framework -- both for ease of unit testing and for use in other contexts. Thus, when teaching ezmsg to others, I made sure to emphasize test-driven development of isolatable stateful processing nodes. So far in ezmsg we've been doing this with "sendable generator" methods, and I was all-in. However, after going through the exercise of teaching this to others, I'm not convinced that generator methods are the best approach.

I think that if we were to use regular classes instead to encapsulate our stateful signal processing then this would be better received. It would be more familiar to Python devs, it would be easier to teach, and it would enable using ezmsg signal processing methods in the cloud. Admittedly, we'd have to do some good work in the design of the base classes and documentation. And there would be some work involved in redoing the meta-programming that @pperanich did to inspect these classes to make auto-Units.

What do you think, @pperanich and @griffinmilsap ?

griffinmilsap commented 1 month ago

@pperanich and I go back and forth on this issue quite a bit. I know Preston's initial reaction was that this wasn't super readable (perhaps too pythonic?) and had many of the same qualms you had. I think though, that for simple one-input/one-output processing functions, this is a really powerful approach.

My current approach is to use a generator for simple "transformer" units that have one input and one output; and manually wrap the generator in an ez.Unit (gen_to_unit is still quite experimental). There are many many more instances where my logic/behavior doesn't easily fit into this generator pattern and I'll natively build those ez.Units directly; but I try to keep these to data sources/sinks/UI/interaction related functionality -- ultimately functionality you'd want to re-implement in a production environment.

FWIW, running ezmsg units in a notebook environment works beautifully so long as you force single-process execution. The biggest issue with this approach is that you must manually interrupt the system to cause it to stop (typically with the stop button in jupyter) -- itd be REALLY cool to have the concept of daemonic coroutines in ezmsg, and if all that was remaining was daemonic coroutines, the system could auto-shutdown once all messages had percolated through the system.

cboulay commented 1 month ago

I think even if ezmsg.run(...) were fast and had good UX in offline analysis, I would still want each Unit to be backed by a simpler object because it's much easier to write good tests for and it can be reused outside of ezmsg. So the conversation about how to use a pipeline in an offline analysis is perhaps orthogonal to the issue of whether the simpler object is a generator or an instance of (e.g.) AxArrProcessor.

That being said, you brought up a related topic that I've been thinking about recently.

FWIW, running ezmsg units in a notebook environment works beautifully so long as you force single-process execution. The biggest issue with this approach is that you must manually interrupt the system to cause it to stop (typically with the stop button in jupyter)

ez.run(...) won't work for me in my offline analysis because of this. I'm using ezmsg offline to perform feature extraction that perfectly reproducing online feature extraction, then using those features for a ML analysis. Sometimes there are hyperparameters in the feature extraction process (e.g., window size, filter cutoff frequencies) that should be optimized as part of the ML analysis so I need to run the full pipeline for each hyperparameter set. In other words, I need to churn through multiple files * multiple times each. For some datasets, the offline feature extraction takes < 3 sec per file.

I definitely can't manually interrupt each time. Even if the system could auto-shutdown between files, the overhead of shutting down the ezmsg pipeline, waiting for shmem to clear out, spinning up a new pipeline etc is too long relative to the cost of processing each file. (I have another application that resets online ezmsg pipelines on source switches and it takes about 8 seconds for each switch with about 3-4 of those seconds for resolving and attaching to the new LSL stream.)

I thought about reducing overhead by leaving the pipeline open and making sure each Unit could reset its state on file switch. For example, AxisArray could have a session_id field and a Unit would reset on session_id mismatch. (This might be a good addition to AxisArray anyway.) An additional requirement would be that exhaustion of one file would trigger loading another file with new session_id, but this could be handled in the custom file playback & msg iterator. The biggest hurdle is making sure all Units in the pipeline, including all future Units developed by novices, reset their state on session_id mismatch.

Even with this change, it only reduces the overhead when changing files within a single hyperparameter set. Changing to a new hyperparameter set is usually managed by an optimizer, and the optimizers are all designed to work in independent contexts, so they each need their own file loading + feature-extraction pipeline, bringing back in the ezmsg.run(...) overhead.

griffinmilsap commented 1 month ago

All super valid points. There's at least three different discussion threads here haha.

Coming back to generators vs classes, I've got two thoughts to add at this point:

  1. Are there any existing stream processing libraries that have minimal dependencies (numpy/scipy), have simple implementations of the operations we're interested in for ezmsg-sigproc, and have framework independent implementations that one can easily drop into any/all stream processing frameworks? If so, maybe its easiest to bring that in to bear here, and if not.. maybe it makes sense to take what's been written with AxisArray/ezmsg-sigproc and turn it into a separate lib that we can wrap ezmsg units around as needed.
  2. Related thought; what if ezmsg Units could serve the basis of this class (re)implementation of the generator backend to sigproc? Rather than the ez.run(...) entry point that has startup overhead but is intended for indefinite runtime; we could make a new runner that works a bit more like the existing generator compose functionality but directly invokes the async (possibly even synchronous?!) methods within the units directly?

At the end of the day, I've really come to the conclusion that LabGraph/ezmsg is primarily a useful stream processing pattern in Python bundled with a "system runner/backend" that is capable of executing code written in this pattern. The component/unit/collection wrapper code is super light weight and completely separated from the implementation that executes it (thanks in large part to the aspects of the implementation that were directly inspired by LabGraph. To that end, it might make sense to write a new "offline/deterministic" backend that runs ezmsg systems with finite runtime characteristics; thoughts?

EDIT: I've been thinking about this a lot over the weekend and think an initial PoC implementation would be pretty straightforward. The ezmsg.Unit class is particularly well set up for this and would allow access of STATE for capture to disk and what-not. I might give it a go this week

pperanich commented 1 month ago

I've been doing a lot of thinking about the whole generator-based vs. class-based debate for our processing components. One thing that really bugs me is how much behind-the-scenes magic happens with object manipulation in ezmsg's metaclass for Units. With this in mind, if we were to lean towards a class-based approach, I'm a bit hesitant to create another base class that manipulates instances in non-obvious ways or enforces a strict API.

Instead, I think we could use builtin Python class mechanics! Utilizing these native structures to implement processing logic not only aligns with Pythonic principles but also promises a straightforward and repeatable pattern for developing processing components. These components, I believe, would be easier to understand, more reusable, and ultimately more accessible to developers new to our framework.

Additionally, the introduction of helper decorators could offer a seamless means of integrating arbitrary classes with ezmsg functionalities. These decorators could enable features such as publisher-only tasks or the ability to subscribe to multiple topics, all without necessitating inheritance from a specific ezmsg base class.

I've put together some examples below to show off the difference between sticking with generators and moving to this class-based approach. Plus, I played around with functools.partial to show how we could easily tweak processing components without rewriting them from scratch. It's a bit of a side note, but I think it's a neat trick that could come in handy. Note that this could actually be done for the generator-based approach as well.

from functools import singledispatchmethod, partial

# Demonstrating Generator-based vs Class-based Processing for Mathematical Operations

def add_last_and_mult(multiplier: float):
    """
    Generator function that multiplies the sum of the current and last received values by a multiplier.
    If it's the first value received, it only multiplies the current value.

    :param multiplier: The multiplier to apply to the sum of the current and last value.
    :yield: The current value after multiplication.
    """
    last_value = None
    current_value = float()
    while True:
        next_value = yield current_value
        current_value = next_value if last_value is None else next_value + last_value
        current_value *= multiplier
        last_value = next_value

class AddLastAndMult:
    """
    Class that encapsulates the behavior of adding the last and current value and then multiplying the sum.

    :param multiplier: The multiplier to apply to the sum of the current and last value.
    """

    def __init__(self, multiplier) -> None:
        self.multiplier = multiplier
        self.last_value = None

    def __call__(self, next_value: float) -> float:
        """
        Processes the next value by adding it to the last value (if any), multiplying the sum,
        and updating the last value.

        :param next_value: The next value to process.
        :return: The processed value after multiplication.
        """
        current_value = (
            next_value if self.last_value is None else next_value + self.last_value
        )
        current_value *= self.multiplier
        self.last_value = next_value
        return current_value

def run_add_last_and_mult():
    """
    Demonstrates the use of both the generator-based and class-based approaches to process a sequence of values.
    """
    mult = 3
    vals = list(range(1, 20, 3))

    # Generator-based processing
    proc = add_last_and_mult(mult)
    next(proc)  # Prime the generator
    print("Output from generator-based processing: ", [proc.send(val) for val in vals])

    # Class-based processing
    proc = AddLastAndMult(mult)
    print("Output from class-based processing: ", [proc(val) for val in vals])

# NOTE: This is a bit of a tangent, but I think it could be useful.
# Using functools.partial to create a variant of the AddLastAndMult class with a fixed multiplier of 5.
# This demonstrates how partials can be used to easily create different versions of processing steps,
# such as applying different types of filters or operations with predefined parameters.
AddLastAndMultBy5 = partial(AddLastAndMult, multiplier=5)

def run_partial():
    """
    Demonstrates the use of a class variant created with functools.partial for processing a sequence of values.
    This specific variant, AddLastAndMultBy5, multiplies the sum of the current and last value by 5.
    """
    # Define a sequence of values to process.
    vals = list(range(1, 20, 3))

    # Initialize the processing class with the multiplier preset to 5.
    proc = AddLastAndMultBy5()

    # Process the sequence of values and print the output.
    print("Output from AddLastAndMultBy5: ", [proc(val) for val in vals])

# Handling Different Data Types with Generator and Class-based Approaches
# See: https://docs.python.org/3/library/functools.html#functools.singledispatchmethod

def handle():
    """
    Generator function that prints messages based on the type of the received input.
    """
    while True:
        msg = yield
        if isinstance(msg, int):
            print(f"Message type is integer: {msg}")
        elif isinstance(msg, str):
            print(f"Message type is string: {msg}")
        else:
            print(f"Message type is not registered: {msg}")

class Handle:
    """
    Class that uses singledispatchmethod to handle messages of different types.
    """

    @singledispatchmethod
    def __call__(self, arg):
        """
        Default handler for unregistered message types.
        """
        print(f"Message type is not registered: {arg}")

    @__call__.register(int)
    def _(self, arg: int):
        """
        Handler for integer message types.
        """
        print(f"Message type is integer: {arg}")

    @__call__.register(str)
    def _(self, arg: str):
        """
        Handler for string message types.
        """
        print(f"Message type is string: {arg}")

def run_handle():
    """
    Demonstrates handling of different data types using both a generator and a class-based approach.
    """
    x = [10, "Hello world", 2.351]

    # Generator-based handling
    proc = handle()
    next(proc)  # Prime the generator
    for val in x:
        proc.send(val)

    # Class-based handling
    proc = Handle()
    for val in x:
        proc(val)

if __name__ == "__main__":
    run_add_last_and_mult()
    run_partial()
    run_handle()