Open rorbech opened 3 years ago
As discussed internally with @rorbech a safe usage pattern could be:
val context = ... // A context ensuring the thread on which the coroutine will run is a looper thread, e.g. Dispatchers.Main
object.toFlow()
.flowOn(context) // Ensures toFlow is run on a "looper" context
.map { obj -> doExpensiveWork(obj) }
.flowOn(Dispatchers.IO) // Performs your expensive operation using a pool of IO threads
.onEach { flowObject ->
// ...
}.launchIn(Dispatchers.Main)
The first toFlow
confines its own execution to a looper thread wheres the second offloads it to a background thread in case of an expensive operation. This could, however, collide with allowQueriesOnUiThread
when set to false
, in which case users would have to create a "loopered" coroutine context themselves. This should also be highlighted in the documentation of all toFlow
methods.
The problem is the implicit requirement spread across multiple calls. In the above the actual query's find
would also have to be executed in the same context.
We could bind it together by constructing a dispatcher associated with the required looper for find...()
by https://github.com/Kotlin/kotlinx.coroutines/blob/master/ui/kotlinx-coroutines-android/src/HandlerDispatcher.kt#L69. But, it is not a quick fix and should probably be given some though, and would maybe make sense to scope with customization of flow creation #7164
I guess it would be significantly easier to make the threading/dispatcher right if we postponed doing the actual query/result until collecting the flow by offering a method that takes the query like
fun <T : RealmModel> Realm.flow(block: (realm: Realm) -> RealmResults<T>) : Flow<RealmResults<T>> {
// Frozen checks, etc. ...
return callbackFlow {
// Construct the result
val results: RealmResults<T> = block(this@flow)
// ...
results.addChangeListener { listenerResults ->
offer(listenerResults.freeze())
}
awaitClose {
// ...
}
}
}
Which could be called like this
Realm.getInstance(configuration).flow { realm ->
realm.where<SimpleClass>()
.findAllAsync()
}.collect {
// ...
}
Following the example documentation of https://github.com/realm/realm-java/blob/973d43725d3bc2a8bd053892ddab0ff2856daa12/realm/kotlin-extensions/src/main/kotlin/io/realm/kotlin/RealmResultsExtensions.kt#L42 can lead to a thread violation issue if the flow is collected on a thread different from creating the result.
The documentation should be updated to a safe pattern, and/or we should prevent the violations by internal handling or options to customize flow creation #7164.
Similarly for https://github.com/realm/realm-java/blob/master/realm/kotlin-extensions/src/main/kotlin/io/realm/kotlin/RealmObjectExtensions.kt#L42