Closed elizarov closed 4 years ago
"Observable value source" was discussed here https://github.com/Kotlin/kotlinx.coroutines/pull/274
I made a similar proposal (with some issues), here the synchronized version.
import kotlinx.coroutines.experimental.channels.Closed
import kotlinx.coroutines.experimental.channels.ConflatedChannel
import kotlinx.coroutines.experimental.channels.ReceiveChannel
private class SubscribableVariableImpl<T>(initialValue: T) : SubscribableVariable<T> {
/**
* Subscription list
*/
private var _subscriptions: Array<Subscription>? = null
@Volatile
public override var value: T = initialValue
set(newValue) {
// lock value and notify value update
synchronized(this) {
field = newValue
_subscriptions?.forEach { it.offer(newValue) }
}
}
public override fun openSubscription(): ReceiveChannel<T> {
val subscription = Subscription()
synchronized(this) {
_subscriptions = _subscriptions?.let { it + subscription } ?: arrayOf(subscription)
// offer initial value
subscription.offer(value)
}
return subscription
}
/**
* Remove [subscription] from [_subscriptions] list
*/
private fun removeSubscription(subscription: Subscription) {
synchronized(this) {
val oldSubscriptions = _subscriptions ?: error("Subscription not found")
if (oldSubscriptions.size == 1) {
if (oldSubscriptions[0] == subscription) {
_subscriptions = null
return
} else {
error("Subscription not found")
}
}
val newSubscriptions: Array<Subscription?> = arrayOfNulls(oldSubscriptions.size - 1)
var newI = 0
for (oldSubscription in oldSubscriptions) {
if (oldSubscription != subscription) {
if (newI == newSubscriptions.size)
error("Subscription not found")
newSubscriptions[newI] = oldSubscription
newI++
}
}
_subscriptions = newSubscriptions as Array<Subscription>
}
}
/**
* Private [ReceiveChannel] implementation
*/
private inner class Subscription : ConflatedChannel<T>(), ReceiveChannel<T> {
override fun onClosed(closed: Closed<T>) {
removeSubscription(this)
super.onClosed(closed)
}
}
}
In the above draft replacing Array
to MutableList
reduces the code complexity and allocation in synchronized blocks.
Moreover the inner class Subscription
is now unuseful.
@fvasco Does it pass all the ConflatedBroadcastChannel
tests? If it does, then you can consider making a PR with the code, marking it as "Fixes #395" (this issue).
Any updates on this? Maybe something changed after the release of cold streams preview because even now with Flow, to implement something like "observable value source" you have to use a channel with all allocation problems, maybe we should expect some optimization at least for channel created for Flow (flowViaChannel()
builder), because it can guarantee a single producer and a single consumer
No updates yet. We are now focused on two areas:
The idea that I have in mind, though, is that for #1082, instead of turning "ConflatedBroadcastChannel" into the "DataFlow" we can reimplement "DataFlow" from scratch, maybe dropping some for its channel features, thus making it simpler, and open the road to lock-based allocation-free implementation.
@elizarov Thanks for the update! Yes, I understand that Flow now has higher priority, I also agree that probably reimplementing it as DataFlow with some more specific use case is a better approach than attempt to fix ConflatedBroadcastChannel. I believe that it's a crucial part of kotlinx.coroutines required for many UI use cases, you can see that even now we already have a few attempts to implement Reactive Behavior/LiveData like streams for reactive UI implementation for MPP projects, also it's important to projects who would like to switch (or start from scratch) to kotlinx.coroutines from LiveData or RxJava.
This issue will be fixed in #1974 by providing allocation-free capabilities similar to ConflatedBroadcastChannel
.
ConflatedBroadcastChannel
is used in UI applications as "observable value source" and it would be useful to make it operate in a fully allocation-free way on its send-receive path even though for the cost of losing some lock-freedom (it is not much of importance there).