ReactiveX / RxPY

ReactiveX for Python
https://rxpy.rtfd.io
MIT License
4.81k stars 361 forks source link

on_error_resume_next should take a function #13

Closed caente closed 9 years ago

caente commented 10 years ago

Hey man, finally I have an actual issue :-)

One of my problems was that on_error_resume_next takes an Observable(or more) as parameter, instead of a function that takes an Exception and returns an Observable. In java/scala I can do something like this:

Observable.defer(Observable[Result[A]](_.onNext(Result(action(msg.getBody), msg)))).onErrorResumeNext {
      ex =>
        logger.error(s"Error processing ${msg.getBody}", ex)
        Observable.empty
    }

While in python I only can do something like this:

 def __process(msg, action):
        def result(m):
            return lambda: Observable(lambda s: s.on_next(Result(action(m.get_body()), m)))

        return Observable.defer(result(msg)).on_error_resume_next(Observable.empty())

It seems silly, but that logger is actually logging the exception to loggly, is a information that we really need, so is kind of a big deal to know why was an exception, even when everything should keep working as if nothing happened. In other words, the subscribers of the that Observable should never receive the error, but the error has to be logged.

So, what's the rationale to take off the exception? Is there a chance to get it back?

dbrattli commented 10 years ago

Hi, both the .NET and JS impl. of Rx swallows the error silently. But the Java way makes sense and is easy to add without breaking anything. I'll make a new release later today, and give you a ping when it's ready. Btw. which scheduler/concurrency model are you using for your project?

caente commented 10 years ago

A little bit, just a periodic scheduler to pull messages from SQS

On September 30, 2014 at 2:51:45 AM, Dag Brattli (notifications@github.com) wrote:

Hi, both the .NET and JS impl. of Rx swallows the error silently. But the Java way makes sense and is easy to add without breaking anything. I'll make a new release later today, and give you a ping when it's ready. Btw. which scheduler/concurrency model are you using for your project?

— Reply to this email directly or view it on GitHub.

dbrattli commented 10 years ago

New release pushed.

PS: About schedulers. RxPY is not thread safe yet, so you want to run it single threaded using asyncio or tornado (ioloop) schedulers. Default behaviour is to use a new thread for timeouts. I'm adding locks for thread synchronization but still many operators that are not thread safe.

caente commented 10 years ago

Yeah, I’m using IOLoopScheduler for the periodic scheduler.

thanks for the release, pulling now On September 30, 2014 at 2:58:00 PM, Dag Brattli (notifications@github.com) wrote:

New release pushed.

PS: About schedulers. RxPY is not thread safe yet, so you want to run it single threaded using asyncio or tornado (ioloop) schedulers. Default behaviour is to use a new thread for timeouts. I'm adding locks for thread synchronization but still many operators that are not thread safe.

— Reply to this email directly or view it on GitHub.

lock[bot] commented 5 years ago

This thread has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs.