Kotlin multiplatform implementation of Reactive Extensions.
Should you have any questions or feedback welcome to the Kotlin Slack channel: #reaktive
There are a number of modules published to Maven Central:
reaktive
- the main Reaktive library (multiplatform)reaktive-annotations
- collection of annotations (mutiplatform)reaktive-testing
- testing utilities (multiplatform)utils
- some utilities like Clock
, AtomicReference
, Lock
, etc. (multiplatform)coroutines-interop
- Kotlin coroutines interoperability helpers (multiplatform)rxjava2-interop
- RxJava v2 interoperability helpers (JVM and Android)rxjava3-interop
- RxJava v3 interoperability helpers (JVM and Android)kotlin {
sourceSets {
commonMain {
dependencies {
implementation 'com.badoo.reaktive:reaktive:<version>'
implementation 'com.badoo.reaktive:reaktive-annotations:<version>'
implementation 'com.badoo.reaktive:coroutines-interop:<version>' // For interop with coroutines
implementation 'com.badoo.reaktive:rxjava2-interop:<version>' // For interop with RxJava v2
implementation 'com.badoo.reaktive:rxjava3-interop:<version>' // For interop with RxJava v3
}
}
commonTest {
dependencies {
implementation 'com.badoo.reaktive:reaktive-testing:<version>'
}
}
}
}
computationScheduler
- fixed thread pool equal to a number of coresioScheduler
- unbound thread pool with caching policynewThreadScheduler
- creates a new thread for each unit of worksingleScheduler
- executes tasks on a single shared background threadtrampolineScheduler
- queues tasks and executes them on one of the participating threadsmainScheduler
- executes tasks on main threadObservable
, Maybe
, Single
, Completable
PublishSubject
, BehaviorSubject
, ReplaySubject
, UnicastSubject
suspend
functions to/from Single
, Maybe
and Completable
Flow
to/from Observable
CoroutineContext
to Scheduler
Scheduler
to CoroutineDispatcher
Since version 2.x, Reaktive only works with the new memory model.
This functionality is provided by the coroutines-interop
module. Please mind some known problems with multi-threaded coroutines on Kotlin/Native.
val flow: Flow<Int> = observableOf(1, 2, 3).asFlow()
val observable: Observable<Int> = flowOf(1, 2, 3).asObservable()
fun doSomething() {
singleFromCoroutine { getSomething() }
.subscribe { println(it) }
}
suspend fun getSomething(): String {
delay(1.seconds)
return "something"
}
val defaultScheduler = Dispatchers.Default.asScheduler()
val computationDispatcher = computationScheduler.asCoroutineDispatcher()
Reaktive provides an easy way to manage subscriptions: DisposableScope.
Take a look at the following examples:
val scope =
disposableScope {
observable.subscribeScoped(...) // Subscription will be disposed when the scope is disposed
doOnDispose {
// Will be called when the scope is disposed
}
someDisposable.scope() // `someDisposable` will be disposed when the scope is disposed
}
// At some point later
scope.dispose()
class MyPresenter(
private val view: MyView,
private val longRunningAction: Completable
) : DisposableScope by DisposableScope() {
init {
doOnDispose {
// Will be called when the presenter is disposed
}
}
fun load() {
view.showProgressBar()
// Subscription will be disposed when the presenter is disposed
longRunningAction.subscribeScoped(onComplete = view::hideProgressBar)
}
}
class MyActivity : AppCompatActivity(), DisposableScope by DisposableScope() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
MyPresenter(...).scope()
}
override fun onDestroy() {
dispose()
super.onDestroy()
}
}
Please see the corresponding documentation page: Reaktive and Swift interoperability.
Reaktive provides Plugin API, something similar to RxJava plugins. The Plugin API provides a way to decorate Reaktive sources. A plugin should implement the ReaktivePlugin interface, and can be registered using the registerReaktivePlugin
function and unregistered using the unregisterReaktivePlugin
function.
object MyPlugin : ReaktivePlugin {
override fun <T> onAssembleObservable(observable: Observable<T>): Observable<T> =
object : Observable<T> {
private val traceException = TraceException()
override fun subscribe(observer: ObservableObserver<T>) {
observable.subscribe(
object : ObservableObserver<T> by observer {
override fun onError(error: Throwable) {
observer.onError(error, traceException)
}
}
)
}
}
override fun <T> onAssembleSingle(single: Single<T>): Single<T> =
TODO("Similar to onAssembleSingle")
override fun <T> onAssembleMaybe(maybe: Maybe<T>): Maybe<T> =
TODO("Similar to onAssembleSingle")
override fun onAssembleCompletable(completable: Completable): Completable =
TODO("Similar to onAssembleSingle")
private fun ErrorCallback.onError(error: Throwable, traceException: TraceException) {
if (error.suppressedExceptions.lastOrNull() !is TraceException) {
error.addSuppressed(traceException)
}
onError(error)
}
private class TraceException : Exception()
}