MichaelSchneeberger / rxbp

A back-pressured rxpy extension
Apache License 2.0
26 stars 2 forks source link
backpressure monix python reactive rxpy

RxPy back-pressure extension

Build Status Coverage Status Package Publish Status

rxbp is an extension to the RxPY python library, that integrates back-pressure into the Observable pattern in form of Flowables.

The rxbp library is inspired by Monix, and has still an experimental status.

Installation

rxbp v3.x runs on Python 3.7 or above. To install rxbp alpha version:

pip3 install --pre rxbp

Example

rxbackpressure has a similar syntax as RxPY.

# example taken from RxPY
import rxbp

source = rxbp.from_(["Alpha", "Beta", "Gamma", "Delta", "Epsilon"])

composed = source.pipe(
    rxbp.op.map(lambda s: len(s)),
    rxbp.op.filter(lambda i: i >= 5)
)
composed.subscribe(lambda value: print(f"Received {value}"))

Integrate RxPY

A RxPY Observable can be converted to a Flowable by using the rxbp.from_rx function. Equivalently, a Flowable can be converted to an RxPY Observable by using the to_rx function.

import rx
import rxbp

rx_source = rx.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon")

# convert Observable to Flowable
source = rxbp.from_rx(rx_source)

composed = source.pipe(
    rxbp.op.map(lambda s: len(s)),
    rxbp.op.filter(lambda i: i >= 5)
)

# convert Flowable to Observable
composed.to_rx().subscribe(lambda value: print(f"Received {value}"))

Differences from RxPY

Flowable

Similar to an RxPY Observable, a Flowable implements a subscribe method, which is a mechanism that allows to describe a data flow from its source to a sink. The description is done with rxbp operators exposed by rxbp.op.

Like in functional programming, usings rxbp operators does not create any mutable states, but rather concatenates functions without calling them yet. We first describe what we intend to do in form of a plan and then execute the plan. A Flowable is executed by calling its subscribe method. This will start a chain reaction, where each downsream Flowables calls the subscribe method of its upstream Flowable until the sources start emitting the data. Once a Flowable is subscribed, we allow it to have internal mutable states.

Compared to RxPY Observables, however, a Flowable uses Observers that are able to back-pressure on an on_next method call. This has the effect that certain operators behave differently from the ones in RxPY.

MultiCast (experimental)

A MultiCast is used when a Flowable emits elements to more than one Observer, and can be though of a nested Flowable of type Observable[T[Flowable]].

The syntax to multi-cast a Flowable is quite different from RxPY and there are good reasons for that. In RxPY, there is an operator called share, that turns an Observable into a so-called hot Observable allowing multiple downstream subscribers to receive the same elements. The first subscribe call has the side-effect that subsequent subscribe calls will not propagate upstream, but register themselves to the hot Observable. The following example illustrates the side-effect that happens when a shared Observable is subscribed for the first time.

import rx
from rx import operators as op

o = rx.range(4).pipe(
    op.share(),
)

o.subscribe(print)
o.subscribe(print)      # the second time no elements are sent

The previous code outputs:

0
1
2
3

In rxbp, however, the elements of a Flowable sequence can only be multi-casted, if the Flowable is nested inside a MultiCast. This can be done with the rxbp.multicast.return_flowable function. return_flowable takes a Flowable, a list of Flowables or a dictionary of Flowables and creates a MultiCast that emits the nested Flowables. Similarly to a Flowable, a MultiCast implements a pipe method that takes a sequence of MultiCast operators, which are exposed by rxbp.multicast.op.

import rxbp

f = rxbp.multicast.return_flowable(rxbp.range(10)).pipe(
    rxbp.multicast.op.map(lambda base: base.pipe(
        rxbp.op.zip(base.pipe(
            rxbp.op.map(lambda v: v + 1),
            rxbp.op.filter(lambda v: v % 2 == 0)),
        ),
    )),
).to_flowable()
f.subscribe(print)

The previous code outputs:

(0, 2)
(1, 4)
(2, 6)
(3, 8)
(4, 10)

match operator (experimental)

The match operator tries to match two Flowables, and raises an exception otherwise. Two Flowables match if they have the same base or if there exists a mapping that maps one base to the base of the other Flowable. These mappings propagated internally when subscribing to a Flowable.

If two Flowables match, the elements of each Flowable sequence are filtered and dublicated (if necessary) first and then zipped together. The following example creates two Flowables where one is having base 10 and the other contains a mapping from base 10 to it's own base None (base None refers to a unknown Flowable sequence). The match operator applies the mapping to the Flowable of base 10 such that every second element is selected due to v % 2.

import rxbp

rxbp.from_range(10).pipe(
    rxbp.op.match(rxbp.from_range(10).pipe(
        rxbp.op.filter(lambda v: v % 2 == 0)),
    )
).subscribe(print)

The previous code outputs:

(1, 1)
(3, 3)
(5, 5)
(7, 7)
(9, 9)

When to use a Flowable, when RxPY Observable?

A Flowable is used when some asynchronous stage cannot process the data fast enough, or needs to synchronize the data with some other event. Let's take the zip operator as an example. It receives elements from two or more sources and emits a tuple once it received one element from each source. But what happens if one source emits the elements before the other does? Without back-pressure, the zip operator has to buffer the elements from the eager source until it receives the elements from the other source. This might be ok depending on how many elements need to be buffered. But often it is too risky to buffer elements somewhere in our stream as it potentially leads to an out of memory exception. The back-pressure capability prevents buffers to grow by holding the data back until it is actually needed.

The advantage of a RxPY Observable is that it is generally faster and more lightweight.

Flowable

Create a Flowable

Transforming operators

Combining operators

Other operators

Create a rx Observable

MultiCast (experimental)

Create a MultiCast

Transforming operators

Transforming operators (Flowables)

Other operators