python-streamz / streamz

Real-time stream processing for python
https://streamz.readthedocs.io/en/latest/
BSD 3-Clause "New" or "Revised" License
1.24k stars 148 forks source link

Generic exception handling #433

Open freol35241 opened 3 years ago

freol35241 commented 3 years ago

Work in progress: A (naive?) stab at generic exception handling within streamz.

This PR attaches an exception handler to each Stream object that acts as an optional start of a new pipeline branch.

from streamz import Stream

def problem(item):
    raise ValueError

s = Stream()
mapper = s.map(problem)

# Default
s.emit(123) # <-- raises and InvalidDataError

# explicit silencing of exceptions
mapper.on_exception().silent = True
s.emit(123) # No exception 

# Do something with the exception and the data that caused it
def problem_solver(item):
    data_that_caused_the_problem, resulting_exception = item
    # Do something
mapper.on_exception().map(problem_solver)
s.emit(123) # The exception and the faulty data is pushed downstream in the on_exception branch

This PR requires some linting, a few tests fail (but nothing major it seems) and updated docs. Consider it a starting point for discussion, mainly in relation to #86. Would appreciate some feedback on this before I continue.

martindurant commented 3 years ago

I like this idea! I can't promise exactly when I can have a look, but soon. First a couple of questions:

freol35241 commented 3 years ago

Hmm, good questions:

All existing tests now pass and the linting complaints have been taken care of.

Remaining things:

But I wont continue further before you have had a chance to take a look.

martindurant commented 3 years ago

I have had a chance to glance at the code, and it had no occurred to me that the exception handling needs to go into all stream nodes, and handled upstream of the actual exception, in order to make things possible. I can see why you can't put exception handling into the node that is actually generating the exception, but might I suggest a less general change.

You could make a node for this specific functionality, something like

class ExceptionHandler(Stream):
    def __init__(self, upstream, on_error, catch=Exception, **kwargs): ...

    def update(self, x, who=None, metadata=None):
        try:
            self.emit(x, metadata=metadata)
        except self.catch as e:
            self.on_error.update((x, e), self, metadata=metadata)

(here, on_error is a streamz node, but it could be just a function maybe) Unfortunately, the exception branch would not show up in the graph visualisation. Somewhere, probably in the checkpointing conversation, we already talked about the possibility of labelling off-track nodes (i.e., something not following upstream/downstream), but I don't think there was any progress on that.

Finally, InvalidDataError seems to me more like a testing entity, not something to put into the main core module.

cc @jsmaupin

freol35241 commented 3 years ago

Thanks for taking a look 👍

Yes, my opinion is that a fully generic and integrated mechanism for exception handling probably needs to be present in all nodes. With that said, your suggestion is a less intrusive (but makes "building" the pipeline a bit more verbose and perhaps slightly less intuitive (maybe a highly personal preference?)) so if you feel my suggestion is over the top feel free to close this PR.

martindurant commented 3 years ago

I think I would prefer the minimal change - would you like to code it up along the lines of my suggestion? I think your example should be easy enough to make.

freol35241 commented 3 years ago

@martindurant I have been thinking a bit about this and the reason for me to put together the initial implementation in this PR was to try to get rid of my current way of "handling exceptions gracefully":

from streamz import Stream

def problem(item):
    try:
        raise ValueError
    except Exception as exc:
        return exc

s = Stream()
mapper = s.map(problem)
exception_branch = mapper.filter(lambda x: isinstance(x, Exception)) # Handle exceptions
everything_good = mapper.filter(lambda x: not isinstance(x, Exception)) # Continue as usual

After trying out your suggestion I realized I probably wouldn't use it since it messes with my mental model of the pipeline (i.e. needing to explicitly define an exception handler upstream of where an exception might happen). I can also be very sure where the exception originates from. So, I will choose to not implement your suggestion after all. Sorry for that.

On a slight side note: This could perhaps be solved by defining some kind of "composed" node which functions according to the above logic.

martindurant commented 3 years ago

I think there is a general discussion to be had here, and perhaps @jsmaupin , @roveo and @chinmaychandak might have thoughts.

At the moment, a node can have multiple downstreamz, and they are all of equal importance. There are also some streamz nodes that take multiple upstramz (e.g., combine_latest), where one of those inputs may be special, and the logic for this is contained within the node.

We did discuss the possibility of multiple possible output stream types for a node, and the exception case would be one of these. We could annotate them with different line styles in the graph display. Do we have multiple output types? Should there be, for example, a where which passes each event to only one of two output streamz?

The main thing that I had against this PR as it stands, is that the logic for handling an exception is upstream, but the downstream holds the exception output stream. On the other hand, it's nice to only have to edit _emit to have this available for all node types, rather than the many versions of update(). After this discussion, I may be persuaded, so long as we take out the InvalidData stuff.

roveo commented 3 years ago

My thoughts:

dicts = src.map(json.loads)
dicts.on_exception(ParsingError).sink_to_file(...)

instead of checking for error type in the handler function and re-raising if it's not ParsingError.

dicts = src.map(json.loads)
invalid_batches = dicts.on_exception(ParsingError).partition(100, timeout=30)
invalid_batches.sink_to_s3(...)
invalid_batches.map(make_slack_message).sink_to_slack(...)

So I want to wait 30 seconds if there are any more bad records, write them to S3 for manual inspection and notify myself. This looks like a normal stream, just a different branch.

sink = result.sink_to_db(...)
sink.on_exception(InvalidDataError).sink_to_file("rejecteddata.log")
sink.on_exception(ConnectionError).retry(wait_seconds=10, max_retries=5)
node.try().something().something_else().except(Exception).map(handle)
martindurant commented 3 years ago

Note that it's OK to break the API, since almost all streamz code lives right here, and we can edit classes as needed to achieve what we want. Of course, we'd rather not change the user-called methods (a small number).

node.try().something().something_else().except(Exception).map(handle)

That is an interesting API! Now there would be something tricky to show graphically or indeed to implement :)

freol35241 commented 2 years ago

Stumbled upon this again after not having used streamz for a while. For what its worth, I currently use the following for exception handling, it gives me the flexibility I need but I wouldn't be surprised if it has some nasty side effects that I have yet to discover...

from types import MethodType
from streamz import Stream, Sink

@Stream.register_api()
class on_exception(Sink):
    def __init__(self, upstream: Stream, exception=Exception, **kwargs):
        super().__init__(upstream, **kwargs)

        original_upstream_update_method = upstream.update

        def _(upstream_self, x, who=None, metadata=None):
            try:
                return original_upstream_update_method(x, who, metadata)
            except exception as exc:
                # Pass down the branch started with this stream instead
                self._emit((x, exc), metadata)

        # Bind to upstream
        upstream.update = MethodType(_, upstream)

    def update(self, x, who=None, metadata=None):
        pass  # NO-OP
martindurant commented 2 years ago

Hah, monkey-patch :)

I still think there is merit in making this kind of operation available for everyone. I'm afraid I still like my solution, which can be fleshed out a bit:

class ExceptionHandler(Stream):
    def __init__(self, upstream, on_error: Stream,
                 catch: Tuple[Exception] = (Exception,), retries=0, retry_wait=1, **kwargs):
        self.on_error = on_error
        self.catch = catch
        self.retries = retries
        self.retry_wait = retry_wait
        super().__init__(upstream, **kwargs)

    def update(self, x, who=None, metadata=None):
        retries = self.retries
        while True:
            try:
                return super().emit(x, metadata=metadata)
            except self.catch as e:
                if retries > 1:
                    time.sleep(self.retry_wait)  # or async?
                    retries -= 1
                    continue
                return self.on_error.update((x, e), self, metadata=metadata)