ReactiveX / RxPY

ReactiveX for Python
https://rxpy.rtfd.io
MIT License
4.77k stars 361 forks source link

Errors on completion are ignored #657

Closed MainRo closed 2 years ago

MainRo commented 2 years ago

Describe the bug If an operator raises an exception in its completion handler, then this exception is lost. This leads to stalled pipelines.

To Reproduce Steps to reproduce the behavior:

Expected behavior The pipeline should complete, or at least we need a way to catch this exception to dispose the subscription. I am not sure on how to deal with this in a generic way:

In AutoDetachObserver we could add an addition try/except block when calling self._on_completed. In the exception handler we could call the on_error handler:

    def on_completed(self) -> None:
        if self.is_stopped:
            return
        self.is_stopped = True

        try:
            try:
                self._on_completed()
            except Exception as e:
                self._on_error(e)
        finally:
            self.dispose()

This correctly propagates the error in my situation but:

Code or Screenshots

import reactivex as rx

def error_on_completed():
    def _error_on_completed(source):
        def on_subscribe(observer, scheduler):
            def on_completed():
                raise ValueError('Not happy')

            return source.subscribe(
                on_next=observer.on_next,
                on_error=observer.on_error,
                on_completed=on_completed,
                scheduler=scheduler,
            )
        return rx.create(on_subscribe)

    return _error_on_completed

source = rx.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon")

result = source.pipe(
    error_on_completed(),
).run()

Additional context Add any other context about the problem here.

dbrattli commented 2 years ago

Observers should never throw exceptions. We cannot call on_completed again, and we cannot call on_error, since it violates the contract, and what if on_error also throws? As for RxPY, we have already notified the user code, and there's basically nothing more we can do (terminate the program?). This needs to be handled by user code, wrapping in e.g a SafeObserver or similar.

Screenshot 2022-07-19 at 10 01 40

Ref:

MainRo commented 2 years ago

Thanks, This, unfortunately, confirms my understanding. My original issue was due to a bug, but I need to handle any potential exceptions being raised here gracefully.

Possibly, the most natural way in my case is to catch exceptions in each completion handler and propagate an on_error downstream. This would change a completion chain to an error chain, which is valid behavior AFAIU:

source:completion->op1:on_completed (raises an exception, chain with on_error)->op2:on_error->...->subscribe:on_error