Closed thomasnield closed 7 years ago
I just added concat_map()
, from_callable()
, and auto_connect()
to RxPy.
Here is auto_connect()
implemented in ConnectableObservable
:
from rx import AnonymousObservable
from rx.core import ObservableBase, Disposable
from rx.disposables import CompositeDisposable
class ConnectableObservable(ObservableBase):
"""Represents an observable that can be connected and disconnected."""
def __init__(self, source, subject):
self.subject = subject
self.source = source.as_observable()
self.has_subscription = False
self.subscription = None
super(ConnectableObservable, self).__init__()
def _subscribe_core(self, observer):
return self.subject.subscribe(observer)
def connect(self):
"""Connects the observable."""
if not self.has_subscription:
self.has_subscription = True
def dispose():
self.has_subscription = False
disposable = self.source.subscribe(self.subject)
self.subscription = CompositeDisposable(disposable, Disposable.create(dispose))
return self.subscription
def ref_count(self):
"""Returns an observable sequence that stays connected to the
source as long as there is at least one subscription to the
observable sequence.
"""
connectable_subscription = [None]
count = [0]
source = self
def subscribe(observer):
count[0] += 1
should_connect = count[0] == 1
subscription = source.subscribe(observer)
if should_connect:
connectable_subscription[0] = source.connect()
def dispose():
subscription.dispose()
count[0] -= 1
if not count[0]:
connectable_subscription[0].dispose()
return Disposable.create(dispose)
return AnonymousObservable(subscribe)
def auto_connect(self, subscriber_count=1):
"""Returns an observable sequence that stays connected to the
source indefinitely to the observable sequence.
Providing a subscriber_count will cause it to connect() after that many subscriptions occur.
A subscriber_count of 0 will result in emissions firing immediately without waiting for subscribers.
"""
connectable_subscription = [None]
count = [0]
source = self
is_connected = [False]
if subscriber_count == 0:
connectable_subscription[0] = source.connect()
is_connected[0] = True
def subscribe(observer):
count[0] += 1
should_connect = count[0] == subscriber_count and not is_connected[0]
subscription = source.subscribe(observer)
if should_connect:
connectable_subscription[0] = source.connect()
is_connected[0] = True
def dispose():
subscription.dispose()
count[0] -= 1
is_connected[0] = False
return Disposable.create(dispose)
return AnonymousObservable(subscribe)
Here is an example of it being used:
from rx import Observable
from random import randint
source = Observable.from_(["Bravo", "Charlie", "Tango", "Foxtrot"]) \
.map(lambda s: "{0}-{1}".format(s, randint(0, 1000))) \
.publish() \
.auto_connect(2)
source.subscribe(lambda s: print("Subscriber 1: {0}".format(s)))
source.subscribe(lambda s: print("Subscriber 2: {0}".format(s)))
OUTPUT:
Subscriber 1: Bravo-205
Subscriber 2: Bravo-205
Subscriber 1: Charlie-12
Subscriber 2: Charlie-12
Subscriber 1: Tango-479
Subscriber 2: Tango-479
Subscriber 1: Foxtrot-885
Subscriber 2: Foxtrot-885
This thread has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs.
It would be awesome to have an
auto_connect()
operator forConnectableObservable
that will automaticallyconnect()
after a certain number of subscribers subscribe to it. This has been added to RxJava last year:So instead of this:
I could just do this;