ReactiveX / RxPY

ReactiveX for Python
https://rxpy.rtfd.io
MIT License
4.72k stars 356 forks source link

Examples with rx.interval() don't work #312

Closed devtud closed 5 years ago

devtud commented 5 years ago

I've just installed rxpy and I can't figure out why the examples with rx.interval() don't work.

Rx==3.0.0a2 Python 3.7.2

For example:

import rx

rx.interval(1.0).subscribe(AnonymousObserver(on_next=lambda t: print(t)))

Throws:

...
File "[PATH]/python3.7/site-packages/rx/core/observable/timer.py", line 70, in subscribe
    return _scheduler.schedule_periodic(period, action, state=0)
AttributeError: 'NoneType' object has no attribute 'schedule_periodic'

What am I doing wrong?

nevers commented 5 years ago

Just stumbled across the very same issue. The interval method has a scheduler argument and its default value is set to None, which in turn seems to trip up the timer implementation.

It seems that this was fixed in https://github.com/ReactiveX/RxPY/pull/280 so I'm not sure why we're still seeing this. (I'm running v3.0.0a2) Maybe that patch is not yet released to the masses.

In any case just set the scheduler argument to timeout_scheduler to make it work. For example:

rx.interval(1, scheduler=rx.concurrency.timeout_scheduler).subscribe_(print)
input("press any key to exit")
jcafhe commented 5 years ago

Hi, Indeed, this should be fixed now but not released yet. If it's possible, you should preferably install rxpy3 by cloning the master branch. Keep in mind that v3 is still a work in progress.

devtud commented 5 years ago

Indeed, I've install it from master branch and now it works. Thank you guys.

dbrattli commented 5 years ago

Just published rx-3.0.0a3

marcj commented 4 years ago

I'm using 3.0.1 and still get that error

import rx.scheduler
rx.interval(1, scheduler=rx.scheduler.CurrentThreadScheduler()).subscribe_(print)
Traceback (most recent call last):
  File "<input>", line 1, in <module>
  File "/Users/marc/Library/Application Support/JetBrains/Toolbox/apps/PyCharm-C/ch-0/193.6015.41/PyCharm CE.app/Contents/plugins/python-ce/helpers/pydev/_pydev_bundle/pydev_umd.py", line 197, in runfile
    pydev_imports.execfile(filename, global_vars, local_vars)  # execute the script
  File "/Users/marc/Library/Application Support/JetBrains/Toolbox/apps/PyCharm-C/ch-0/193.6015.41/PyCharm CE.app/Contents/plugins/python-ce/helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in execfile
    exec(compile(contents+"\n", file, 'exec'), glob, loc)
  File "/Users/marc/bude/deepkit-python-sdk/examples/generic/rx-test.py", line 2, in <module>
    rx.interval(1, scheduler=rx.scheduler.CurrentThreadScheduler()).subscribe_(print)
  File "/usr/local/lib/python3.7/site-packages/rx/core/observable/observable.py", line 152, in subscribe_
    current_thread_scheduler.schedule(set_disposable)
  File "/usr/local/lib/python3.7/site-packages/rx/scheduler/trampolinescheduler.py", line 50, in schedule
    return self.schedule_absolute(self.now, action, state=state)
  File "/usr/local/lib/python3.7/site-packages/rx/scheduler/trampolinescheduler.py", line 94, in schedule_absolute
    self.get_trampoline().run(item)
  File "/usr/local/lib/python3.7/site-packages/rx/scheduler/trampoline.py", line 31, in run
    self._run()
  File "/usr/local/lib/python3.7/site-packages/rx/scheduler/trampoline.py", line 52, in _run
    item.invoke()
  File "/usr/local/lib/python3.7/site-packages/rx/scheduler/scheduleditem.py", line 25, in invoke
    ret = self.scheduler.invoke_action(self.action, state=self.state)
  File "/usr/local/lib/python3.7/site-packages/rx/scheduler/scheduler.py", line 103, in invoke_action
    ret = action(self, state)
  File "/usr/local/lib/python3.7/site-packages/rx/core/observable/observable.py", line 137, in set_disposable
    if not auto_detach_observer.fail(ex):
  File "/usr/local/lib/python3.7/site-packages/rx/core/observer/autodetachobserver.py", line 62, in fail
    self._on_error(exn)
  File "/usr/local/lib/python3.7/site-packages/rx/internal/basic.py", line 34, in default_error
    raise err
  File "/usr/local/lib/python3.7/site-packages/rx/core/observable/observable.py", line 135, in set_disposable
    subscriber = self._subscribe_core(auto_detach_observer, scheduler)
  File "/usr/local/lib/python3.7/site-packages/rx/core/observable/observable.py", line 42, in _subscribe_core
    return self._subscribe(observer, scheduler) if self._subscribe else Disposable()
  File "/usr/local/lib/python3.7/site-packages/rx/core/observable/timer.py", line 76, in subscribe
    return _scheduler.schedule_periodic(period, action, state=0)
AttributeError: 'CurrentThreadScheduler' object has no attribute 'schedule_periodic'
marcj commented 4 years ago

That proposed fix

rx.interval(1, scheduler=rx.concurrency.timeout_scheduler).subscribe_(print)

doesn't work, as rx.concurrency doesn't exist.