ReactiveX / RxPY

ReactiveX for Python
https://rxpy.rtfd.io
MIT License
4.81k stars 361 forks source link

Controlled Observables Bug(s) #87

Closed sebholl closed 8 years ago

sebholl commented 8 years ago

Hey,

Thank you so much for maintaining this module. It's super useful.

  1. Are we missing a .controlled() extension method to turn anObservable into a ControlledObservable? rxJS appears to have one.
  2. I think there might be a bug in the constructor of WindowedObservable... self.subscription is referenced before it is set which throws an exception. https://github.com/ReactiveX/RxPY/blob/master/rx/backpressure/windowedobservable.py#L56
import rx

source = rx.backpressure.controlledobservable.ControlledObservable(
    rx.Observable.from_iterable([1,2,3,4,5]),
    enable_queue = False,
).windowed(3)
  1. I'm trying to get any working example for ControlledObservable to work... Even this basic chain throws an exception. Is it a bug or am I doing something wrong?
import rx
import logging

logger = logging.getLogger(__name__)
logging.basicConfig(stream=sys.stderr,)

source = rx.backpressure.controlledobservable.ControlledObservable(
    rx.Observable.from_iterable([1,2,3,4,5]),
    enable_queue = False,
)

for number in source.to_blocking():
    print number

Which outputs the following stack trace...

Traceback (most recent call last):
  File "sample.py", line 37, in <module>
    for number in source.to_blocking():
  File "/Library/Python/2.7/site-packages/rx/linq/observable/blocking/toiterable.py", line 63, in __iter__
    return self.to_iterable()
  File "/Library/Python/2.7/site-packages/rx/linq/observable/blocking/toiterable.py", line 29, in to_iterable
    self.observable.materialize().subscribe(on_next)
  File "/Library/Python/2.7/site-packages/rx/observable.py", line 51, in subscribe
    return self._subscribe(observer)
  File "/Library/Python/2.7/site-packages/rx/anonymousobservable.py", line 50, in _subscribe
    current_thread_scheduler.schedule(set_disposable)
  File "/Library/Python/2.7/site-packages/rx/concurrency/currentthreadscheduler.py", line 54, in schedule
    return self.schedule_relative(timedelta(0), action, state)
  File "/Library/Python/2.7/site-packages/rx/concurrency/currentthreadscheduler.py", line 68, in schedule_relative
    self.queue.run()
  File "/Library/Python/2.7/site-packages/rx/concurrency/currentthreadscheduler.py", line 37, in run
    item.invoke()
  File "/Library/Python/2.7/site-packages/rx/concurrency/scheduleditem.py", line 18, in invoke
    self.disposable.disposable = self.invoke_core()
  File "/Library/Python/2.7/site-packages/rx/concurrency/scheduleditem.py", line 33, in invoke_core
    return self.action(self.scheduler, self.state)
  File "/Library/Python/2.7/site-packages/rx/concurrency/scheduler.py", line 128, in scheduled_action
    return self.invoke_rec_immediate(scheduler, pair)
  File "/Library/Python/2.7/site-packages/rx/concurrency/scheduler.py", line 83, in invoke_rec_immediate
    action(inner_action, state)
  File "/Library/Python/2.7/site-packages/rx/linq/observable/fromiterable.py", line 39, in action
    observer.on_next(item)
  File "/Library/Python/2.7/site-packages/rx/autodetachobserver.py", line 16, in _next
    self.observer.on_next(value)
  File "/Library/Python/2.7/site-packages/rx/autodetachobserver.py", line 16, in _next
    self.observer.on_next(value)
TypeError: 'ControlledSubject' object is not callable
dbrattli commented 8 years ago

Hi, the back-pressure code has not been tested much and is mostly ported directly from RxJS. I have synced it again, but I expect there are more bugs. If you could create some failing unit-tests that would be great. Most of the work is now to port controlled.js. For an example, look at test test_pausable.py

sebholl commented 8 years ago

Awesome, thanks @dbrattli. I'll try and get time to have a look this weekend.

lock[bot] commented 5 years ago

This thread has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs.