Kotlin / kotlinx.coroutines

Library support for Kotlin coroutines
Apache License 2.0
12.94k stars 1.84k forks source link

BroadcastChannel.asFlow().onStart(...) is invoked before the subscription is opened #1758

Closed RBusarow closed 4 years ago

RBusarow commented 4 years ago

In the docs, we have this example of onStart(...):

flowOf("a", "b", "c")
    .onStart { emit("Begin") }
    .collect { println(it) } // prints Begin, a, b, c

Not just with emit(...), this works with sending to a Channel as well.

val channel = Channel<String>(4)

channel.consumeAsFlow()
    .onStart { 
      channel.send("Begin")
      channel.send("a")
      // etc.
    }
    .collect { println(it) } // prints Begin, a, b, c

However, with a BroadcastChannel we won't get anything:

val channel = broadcast {
  send(2)
}

channel.asFlow()
    .onStart { channel.send(1) }
    .collect { println(it) } // prints 2
}

If onStart(...) is primarily a callback to say that "we're collecting data", instead of a fancy way to prepend data to the Flow, then this feels wrong.

The difference is because of how the subscription is created. The subscription channel isn't created until the builder is reached, which is after onStart(...):

public fun <T> BroadcastChannel<T>.asFlow(): Flow<T> = flow {
    emitAll(openSubscription())
}

onStart(...) is being invoked at the same time as always, and the value is being sent, but the subscription's ReceiveChannel is created afterwards and never gets it.

Proposed Solution

Make asFlow() return its own internal type (BroadcastFlow?), and give it an update(...) function similar to ChannelFlow. This function will just accumulate the action(s) and store it/them until the final collect(...) is called.

Then make onStart(...) do a type check and just call update(...) if it's a BroadcastFlow. When it's time to start collecting, create the subscription, then invoke the action, then call emitAll(...).

public fun <T> BroadcastChannel<T>.asFlow(): Flow<T> = BroadcastFlow(this)

internal class BroadcastFlow<T>(
    private val source: BroadcastChannel<T>,
    private val _startAction: suspend FlowCollector<T>.() -> Unit = {}
) : Flow<T> {

    fun update(
        startAction: suspend FlowCollector<T>.() -> Unit
    ): BroadcastFlow<T> = BroadcastFlow(channel) {
        startAction()
        _startAction()
    }

    override suspend fun collect(collector: FlowCollector<T>) {
        val channel = source.openSubscription()
        collector._startAction()
        collector.emitAll(channel)
    }
}

public fun <T> Flow<T>.onStart(
    action: suspend FlowCollector<T>.() -> Unit
): Flow<T> = if (this is BroadcastFlow) {
    update(action)
} else {
    unsafeFlow {
        // Note: unsafe flow is used here, but safe collector is used to invoke start action
        SafeCollector<T>(this, coroutineContext).action()
        collect(this) // directly delegate
    }
}
fvasco commented 4 years ago

Your second example feels wrong. onStart pushes a message in the channel, however it is not possible to predict who will consume that message, except when a single consumer is present, as it is in the example.

In your third example, the code block sends 1 to any channel's subscribers, sincerely it is not clear to me the use case you are trying to solve.

RBusarow commented 4 years ago

You're right, if you have multiple consumers then the second example would be a bit different, but for the sake of the example let's assume it's only being consumed by that one consumeAsFlow().

Regarding the BroadcastChannel variant, a colleague of mine ran into this issue while updating some of our Android Bluetooth code. Here's a significantly reduced version of it:

class Connection {

  val gatt: BluetoothGatt = ...
  val callback: CallbackWrapper = ...

  suspend fun writeData(data: Data) = writeMutex.withLock {

    withTimeoutOrNull(TIMEOUT) {
      callback.output.onStart {

        // We want to wait until the flow is actually being collected
        // before we send the data, so that we're sure to be observing
        // when we get the response, except that this lambda is invoked
        // before the subscription is actually created.
        gatt.writeCharacteristic(data)
      }
        .take(1)
        .collect {
          // ...
        }

    }
  }

}

class CallbackWrapper : GiantAbstractCallbackForAndroidBluetooth() {

  private val outputChannel = BroadcastChannel<Data>(20)

  val output: Flow<Data>
    // in this case, we can "fix" the issue
    // by using openSubscription().consumeAsFlow()
    get() = outputChannel.asFlow()

  override fun onCharacteristicWrite(data: Data) {
    outputChannel.sendBlocking(data)
  }

  // lots more functions here
}

I'm not actually interested in using send directly from onStart(), but it is useful in triggering actions which result in that BroadcastChannel source being updated.

In this example, writeCharacteristic() leads to onCharacteristicWrite() being called. This is isn't actually response data from the peripheral -- it's just the callback informing us that the data has been written. We have to wait (suspend) for the characteristic to be written to before writing again. In this case, onCharacteristicWrite() is actually called from a different thread, so the subscription is usually created in time -- but that's a happy accident which just made it more difficult to track down.

The issue I see here is that onStart() behaves slightly differently for BroadcastChannel vs Channel or a pure Flow. In cases where we just need to use a BroadcastChannel to collect events, but we're consuming them in a Flow, then we can't rely upon the current implementation of onStart() to tell us when the flow will start receiving the data.

fvasco commented 4 years ago

Hi @RBusarow, you have to excuse me, I am not an Android developer, so it is hard for me to figure your work. However I suspect that your solution is too complex for the task:

it's just the callback informing us that the data has been written

Please read the official KEEP https://github.com/Kotlin/KEEP/blob/master/proposals/coroutines.md#wrapping-callbacks and reconsider your code, can you suspendCoroutine in writeData and resume it in onCharacteristicWrite?

RBusarow commented 4 years ago

No, that wouldn't work here since the callback can only be set when the connection is established. This is some pretty old Android code - not anything I have control over. The callback is actually an abstract class with 13 different callback functions (link), so the only real solutions are Channel-based.

We had a few options for this specific use-case. For now, we're just using openSubscription().consumeAsFlow(), and that's fine since the subscription creation is still lazy. I am not advocating a library change just to help this one case. 😉

My concern is really just that onStart() seems to be called too soon when using asFlow(). I think it's fair to say that the flow exists so that it can respond to new values in its source channel, but that link doesn't exist yet when it is "started". It's more like onAboutToStart().

With consumeAsFlow(), the flow is already capable of collecting events when onStart() is invoked. I believe this functionality is more useful.

Moving the callback invocation to after the subscription creation just gives us more options.

elizarov commented 4 years ago

@RBusarow Why would you want the change in behaviour that you are writing about? The example you give... did it come out why you were solving some practical problem? What was the problem you were trying to solve?

twyatt commented 4 years ago

I believe I'm faced with the same issue as @RBusarow, so I'll try to give more clarity into the problem we're trying to solve.

In Bluetooth Low-Energy (BLE) we read/write to characteristics on the peripheral, unfortunately in Android BLE, the object representations of these characteristics are shared/reused, so writing to a single characteristic could trample incoming data for the same characteristic, or visa versa. The common workaround is to use 2 characteristics for communication (a characteristic dedicated for writing and a characteristic dedicated for "reading" via change notifications).

All characteristic changes are delivered to Android's BluetoothGattCallback.onCharacteristicChanged method, so it made sense to route the characteristic changes to a Flow via a BroadcastChannel(BUFFERED). An abbreviated BLE I/O interface could look like:

interface GattIo {

    // Suspends until Android's `BluetoothGattCallback.onCharacteristicWrite` is called, indicating successful write.
    suspend fun writeCharacteristic(characteristic: BluetoothGattCharacteristic, value: ByteArray): OnCharacteristicWrite

    val onCharacteristicChanged: Flow<OnCharacteristicChanged>
}

If you want request/response style communication, then you'll need to handle sending the request (via suspend writeCharacteristic) and then picking up the response (from the Flow<OnCharacteristicChanged>).

BLE Coroutines IO

In the diagram above, I've tried writing the Client.request function using various techniques:

  1. writeCharacteristic in onStart

As @RBusarow pointed out, the current behavior will invoke onStart and in turn writeCharacteristic before subscribing to the underlying BroadcastChannel. The time for a BLE operation to make a round-trip will usually be much slower than the Flow is to subscribe to the BroadcastChannel, so most of the time you'll subscribe to the BroadcastChannel in time to receive the response. In rare cases where the response comes back faster than the Flow can subscribe means you'll lose the response. Being that the situation is rare makes it a very hard to track down bug.

  1. async within a coroutineScope

In the following pseudo code we face the same potential issue as number 1 above, that async could be slow to spin up (and not be subscribed in time and miss the response).

suspend fun request(request: Request): Response = coroutineScope {
    val response = async {
        onCharacteristicChanged
            .map { /* ... */ }
            .first { response -> request.id == response.id }
    }

    val bytes: ByteArray = request.toBytes()
    writeCharacteristic(transmitCharacteristic, bytes)
    response.await()
}

Changing the behavior (#1759) would make BroadcastChannel.asFlow a good fit when a dedicated write via suspend and read via Flow design exists, though I'm not sure how common that is (outside of it seemingly being common in Android BLE).

Is there perhaps another approach that we've overlooked (aside from the attempts listed above) that would be better suited?

elizarov commented 4 years ago

Thanks a lot for a detailed explanation of your use-case. Please note, that we are currently working on a replacement for all kinds of BroadcastChannel. They will be supported, but deprecated. It means that a replacement design will have to take your use-case into account.

So, what would be a flow-based replacement for your use-case? There will be two recommended approaches to disseminate events:

  1. Use a two-step callbackFlow { .... }.shareIn(scope) approach. This way, you first convert callbacks to a cold flow, then make it hot via a sharing operator.
  2. Use a single-step MutableSharedFlow() approach (similarly to BroadcastChannel, but directly a flow). This way you explicitly create a hot flow object, emit to it, and collect events from it.

Anyway, consumers get a reference to a flow that they collect from. The complication is that they may also have an operator chain between the upstream and the collector:

flow.operatorChain().collect { ... }

What we want to achieve here is to have some kind of a reliable signal that the collector had, indeed, started collecting and will receive any further events from the upstream flow. onStart is not serving this purpose. Could we "fix" onStart so that it works? Unfortunately, I don't see a solution to fix onStart that would not overcomplicate the simple design of the flow itself.

Essentially the design of the flow contains just three kinds of signals that travel between the downstream collector of the events and an upstream emitter of the events:

There's no explicit "after start" signal.

The solution proposed in #1759 is local. It does not propagate through the operator chains and cannot be fixed. It means that the solution is fragile. We need a robust one. How can we have it?

The solution the preserves this simple design of Flow is to materialize "after start" signal. The first thing emitter should do when a new collector appears should be to emit a special Started event object that signals to the collector the actual start. This Started event will travel downstream through all the regular operators. This works great with cold flows:

val coldFlow = callbackFlow<Event> {
    val callback = .... 
    api.registerCallback(callback) // synchronuos API (sic!)
    send(Started) // tell collector callback was installed and events are flowing
    awaitClose { api.unregister(callback) } 
}

Now a collector can be written like this:

coldFlow
    .onEach { if (it is Started) sendMessageToTheDevice() }
    .collect { ... /* would not miss an answer from the device! */ }

But sharing operators pose a special challenge. When you share a single upstream collection with multiple downstream ones the upstream does not get any signal on incoming downstream collectors. They are all handled by the sharing operator itself.

val hotFlow = coldFlow.shareIn(scope) // share an expensive device connection
hotFlow.
   onEach { if (it is StartedEvent) /* oops, we might have missed Started */ }
   ....

So, the design of the sharing operators has to include this feature of materializing "after start" signal for all incoming collectors. The draft design that I envision is to add some kind of a keepFirstIf feature to sharing primitives so that you can (optionally) instruct them to all always keep a specified event in their replay buffer and replay it as the first one to all the downstream collectors:

val hotFlow = coldFlow.shareIn(scope, keepFirstIf = { it is Started })
hotFlow // now properly signals Started event downstream

What do you think?

twyatt commented 4 years ago

It sounds promising, but I'm not quite sure how to expose this paradigm on my public API.

For the Android Bluetooth Low-Energy (BLE) API, the BluetoothGattCallback is registered at time of connect, so we don't (out of the box) have the luxury of registerCallback and unregister.

I could workaround that limitation by baking in my own callback registration and holding a reference to that callback (GattCallback in the following code snippet) that I provide to Android's connectGatt:

class GattCallback : BluetoothGattCallback() {

    @Volative
    private var callback: Callback? = null

    fun register(callback: Callback) {
        this.callback = callback
    }

    fun unregister() {
        callback = null
    }

    override fun onCharacteristicChanged(
        gatt: BluetoothGatt,
        characteristic: BluetoothGattCharacteristic
    ) {
        callback?.invoke(/* ... */)
    }

    // ...
}

Since BluetoothGattCallback.onCharacteristicChanged receives "change notifications" for all characteristics, a multicast Flow felt like a versatile way to expose "characteristic changes" (but it sounds like we can't get the "subscribe before emitting Started" from a single-step MutableSharedFlow?).

Based on your example, to get the desired multicast behavior I could have:

sealed class Event {
    object Started : Event()
    data class Value(value: OnCharacteristicChanged) : Event()
}

class Gatt(
    private val gattCallback: GattCallback
) {

    val onCharacteristicChanged: Flow<Event> = callbackFlow<Event> {
        val callback = { sendBlocking(Value(it)) }
        gattCallback.register(callback)
        send(Started)
        awaitClose { gattCallback.unregister() } 
    }.shareIn(GlobalScope, keepFirstIf = { it is Started })
}

Although it's a bit awkward to document the usage of onCharacteristicChanged stating that users of the property should ignore the Started event if they're not concerned with the bidirectional communication paradigm (e.g. a Bluetooth device that provides accelerometer data where latest value can be observed via characteristic change events).

If I instead expose onCharacteristicChanged as the callbackFlow without the shareIn (unicast), then it places the burden of sharing the Flow on the caller, and they need to understand the implementation details (specifically that Started will be emitted) to properly trigger their desired "on start" action.

elizarov commented 4 years ago

@twyatt You can expose onCommunicationStarted extension as a part of your API that filters away Started and calls a user-supplied lambda when it is received:

fun Flow<Event>.onCommunicationStarted(block: () -> Unit) = transform { event -> 
    if (event is Started) block() else emit(event)
}
twyatt commented 4 years ago

The extension function makes sense, but the type (Flow<Event>) for the exposed Flow property isn't desirable.

Whereas before I would've exposed it as:

data class OnCharacteristicChanged(...)

class Gatt {
    // Multicast hot-Flow backed by BroadcastChannel.asFlow
    val onCharacteristicChanged: Flow<OnCharacteristicChanged> = ...
}

To support Started, my API will need to changed to:

sealed class Event {
    object Started : Event()
    data class Value(value: OnCharacteristicChanged) : Event()
}

class Gatt {
    // Multicast hot-Flow backed by callbackFlow.shareIn
    val onCharacteristicChanged: Flow<Event> = ...
}

As you pointed out, an onCommunicationStarted extension function can be provided to handle the Started event and return a Flow that carries the familiar OnCharacteristicChanged type:

fun Flow<Event>.onCommunicationStarted(block: () -> Unit): Flow<OnCharacteristicChanged> = ...

But in situations where consumer isn't concerned with the Started paradigm (i.e. subscribing to a Bluetooth peripheral that provides accelerometer data, where it's ok to miss a few values) then I'd have to provide another extension function:

fun Flow<Event>.characteristicChanges(): Flow<OnCharacteristicChanged> = ...

Is there a possibility the Started object (and machinery to facilitate it) would be built into the Coroutines library? Rather than having libraries have to come up with their own ways of exposing the Started paradigm; being in the Coroutines library makes it a familiar concept with standardized/expected extension functions.

elizarov commented 4 years ago

Is there a possibility the Started object (and machinery to facilitate it) would be built into the Coroutines library? Rather than having libraries have to come up with their own ways of exposing the Started paradigm; being in the Coroutines library makes it a familiar concept with standardized/expected extension functions.

Here are potential solutions:

What can we have "out-of-the-box" in the library? We can provide some kind of ready-to-use StartedOrValue<T> class and an operator like Flow<StartedOrValue<T>>.onStarted(block: () -> ): Flow<T> to dematerizalize the "started" event. Does it worth having in the library? I don't know.

twyatt commented 4 years ago

@elizarov thanks so much for explaining the design hurdles and why certain options are off the table.

Is it worth having in the library? I don't know.

I appreciate you hearing and working through our use-case and offering possible solutions.

For now, it seems reasonable for me to continue to expose BroadcastChannel on my public APIs so that consumers can use openSubscription().consumeAsFlow().onStart { ... } (as mentioned in https://github.com/Kotlin/kotlinx.coroutines/issues/1758#issuecomment-573945653).

I'll monitor the flow-sharing label to follow how the Flow multicast/sharing APIs evolve. Thanks again!

elizarov commented 4 years ago

@twyatt @RBusarow With upcoming SharedFlow in #2034 we'll be deprecating BroadcastChannel, which means that your use-case will have to be taken into account in the design of SharedFlow.

Unfortunately, in the current shared flow prototype if you expose events: SharedFlow<Event> and call events.onStart { ... }.collect { ... } there is no guarantee that events are started to being received. However, for SharedFlow we can provide a dedicated operator that is specific to SharedFlow (and shared flow only!) that is providing this guarantee you are looking for. How to call this operator, that is the question. The names I can come up with:

Any other ideas? We also have an option to rename onStart to make it less confusible with this new shared flow operator (onStart is still experimental, we'd still leave onStart as deprecated for people who are already using it)

twyatt commented 4 years ago

onStarted would be my preferred naming.

I do see how the similarity to onStart could cause confusion, but perhaps it makes them both more discoverable (IDE will suggest either as possible candidates when, for example, autocompleting for "onsta")?

Volatile-Memory commented 4 years ago
  1. async within a coroutineScope

In the following pseudo code we face the same potential issue as number 1 above, that async could be slow to spin up (and not be subscribed in time and miss the response).

suspend fun request(request: Request): Response = coroutineScope {
    val response = async {
        onCharacteristicChanged
            .map { /* ... */ }
            .first { response -> request.id == response.id }
    }

    val bytes: ByteArray = request.toBytes()
    writeCharacteristic(transmitCharacteristic, bytes)
    response.await()
}

I believe I solved/worked around this problem in my use of ABLE and previously RxAndroidBLE (bridged into coroutines) by starting collection through CoroutineStart.UNDISPATCHED:

    private suspend fun writeBytesAndAwaitResponse(command: ByteArray, configId: HAConfigID): HaConfigDataSourcePacket {
        val notification = connection.async(start = CoroutineStart.UNDISPATCHED) {
            controlPointNotifications.filter { it.opCode.configId == configId }.first()
        }
        connection.writeCharacteristicOrThrow(controlPoint, command, WRITE_TYPE_DEFAULT)
        return notification.await()
    }
Volatile-Memory commented 4 years ago

@elizarov is that the correct usage of UNDISPATCHED? is there a more idiomatic way to do this?

elizarov commented 4 years ago

Btw, we'd decided to name the SharedFlow operator onSubscription. See https://github.com/Kotlin/kotlinx.coroutines/issues/2034#issuecomment-631469395

elizarov commented 4 years ago

@elizarov is that the correct usage of UNDISPATCHED?

Yes. That is exactly the case start = UNDISPATCHED was essentially designed for.

Volatile-Memory commented 4 years ago

this comment chain is getting rather long, is there anything mentioned that isn't solved by async undispatched? Does the onSubscription method add anything that does not handle?

twyatt commented 4 years ago

I believe I solved/worked around this problem in my use of ABLE and previously RxAndroidBLE (bridged into coroutines) by starting collection through CoroutineStart.UNDISPATCHED

@Computr0n I had missed the start = UNDISPATCHED option. TIL. Thanks!

Volatile-Memory commented 4 years ago

@twyatt sweet, glad I could help. I'm currently writing a new communications library for our new apps that is using ABLE and I like the design quite a bit.

twyatt commented 4 years ago

The UNDISPATCHED documentation states:

Immediately executes the coroutine until its first suspension point in the current thread as if the coroutine was started using Dispatchers.Unconfined.

@elizarov does that mean that using UNDISPATCHED as per @Computr0n's https://github.com/Kotlin/kotlinx.coroutines/issues/1758#issuecomment-631577424 could still potentially miss a BLE response?

When we hit the "first suspension point" (being filter in https://github.com/Kotlin/kotlinx.coroutines/issues/1758#issuecomment-631577424 code snippet), is the Flow still in the process of "hooking everything up" or have we already successfully subscribed? In other words, is UNDISPATCHED simply narrowing the time window in which a missed response might occur, and onSubscription is still needed (and should be preferred when available) to provide the "guarantee that events are started to being received"?

Volatile-Memory commented 4 years ago

filter is not a suspension point, the stream has not been assembled at that point.

the only suspension point is at collection, which is only happening after first() is called.

spreading the lines out in your IDE and looking for the suspend icon in the gutter will show this in a more visual way.

twyatt commented 4 years ago

filter is not a suspension point, the stream has not been assembled at that point.

Whoopsie, oversight on my part. 🤦 Thanks for clarifying. 👍

elizarov commented 4 years ago

UNDISPATCHED is predictable:

launch(start = CoroutineStart.UNDISPATCHED) {
    print(1)
    delay(100) // the actual suspension that matters here
    print(2)
}
print(3)

Prints: 132, playground: https://pl.kotl.in/R9w5PAcps

One advantage that onSubscription brings to the table is that the code reads easier and you can write all the code with a single operator chain. Let's take a simplified form of the example from https://github.com/Kotlin/kotlinx.coroutines/issues/1758#issuecomment-631577424

Here's the code with undispatched start:

suspend fun awaitResponse(cmd: Cmd, id: ID): Response = coroutineScope {
    val response = async(start = CoroutineStart.UNDISPATCHED) {
        sharedConnectionFlow.filter { it.id == id }.first()
    }
    connection.writeOrThrow(cmd)
    response.await()
}

Here's how you'd write the same code with onSubscription:

suspend fun awaitResponse(cmd: Cmd, id: ID): Response = 
    sharedConnectionFlow.
        .onSubscription { connection.writeOrThrow(cmd) }
        .filter { it.id == id }.first()

Unlike a solution with async/await, this code can be also easily updated for cases when there are multiple responses and we need to return a flow of them. onSubscription provides a composable solution. We can have a separate function to setup a flow of responses:

fun responseFlow(cmd: Cmd, id: ID): Flow<Response> = 
    sharedConnectionFlow.
        .onSubscription { connection.writeOrThrow(cmd) }
        .filter { it.id == id }

And then use various terminal operators on it like responseFlow(cmd, id).first() or others.

Volatile-Memory commented 4 years ago

@elizarov brilliant. thank you!

elizarov commented 4 years ago

I'm closing this issue as the design of shared flows in #2034 now takes this use-case into account.