We (QLDB) recently upgraded to Kotlin Coroutines 1.3. Our code previously used Channel pervasively. During this upgrade we replaced some of those Channel uses with Flow and noticed java.lang.ClassCastExceptions.
I haven't been able to create a small reproducible sample, but we got the ClassCastExceptions in 2 scenarios:
a Flow operator (like filter) calls a function which has somewhere in it withContext(Dispatchers.IO)
a Flow operator calls a function which itself creates and processes a Flow. That inner Flow has a call to flowOn(Dispatchers.IO))
One is to encapsulate the appropriate context in someDataComputation itself:
it seems like what we're doing is fine.
If this is expected, how are we supposed to handle functions that change context? Our code base is full of functions of the form
suspend fun callSomeBlockingApi(): T = withContext(Dispatchers.IO)
If we change this to
fun callSomeBlockingApi(): T {
...
flow
.map { callSomeBlockingApi... }
.flowOn(Dispatchers.IO))
then not only do we have to change all those functions, but now anyone calling these functions has to know (based on reading the implementation or us adding docs) that it's blocking and they need to change the dispatcher.
Here is one of the stacktraces:
kotlin.coroutines.intrinsics.CoroutineSingletons cannot be cast to aws.quantum.metering.MeteringRecord: java.lang.ClassCastException java.lang.ClassCastException: kotlin.coroutines.intrinsics.CoroutineSingletons cannot be cast to aws.quantum.metering.MeteringRecord at com.amazonaws.ledger.storagemanagementservice.usage.MeteringSubmitterKt$tee$$inlined$map$1$1.emit(Collect.kt:137) at com.amazonaw
kotlin.coroutines.intrinsics.CoroutineSingletons cannot be cast to aws.quantum.metering.MeteringRecord: java.lang.ClassCastException
java.lang.ClassCastException: kotlin.coroutines.intrinsics.CoroutineSingletons cannot be cast to aws.quantum.metering.MeteringRecord
at com.amazonaws.ledger.storagemanagementservice.usage.MeteringSubmitterKt$tee$$inlined$map$1$1.emit(Collect.kt:137)
at com.amazonaws.ledger.storagemanagementservice.usage.MeteringSubmitterKt$status$$inlined$map$1$1.emit(Collect.kt:138)
at kotlinx.coroutines.flow.FlowKt__BuildersKt$asFlow$$inlined$unsafeFlow$5.collect(SafeCollector.kt:128)
at com.amazonaws.ledger.storagemanagementservice.usage.MeteringSubmitterKt$status$$inlined$map$1.collect(SafeCollector.kt:127)
at com.amazonaws.ledger.storagemanagementservice.usage.MeteringSubmitterKt$tee$$inlined$map$1.collect(SafeCollector.kt:127)
at com.amazonaws.ledger.storagemanagementservice.usage.MeteringSubmitterKt$filterForMetering$$inlined$filter$1.collect(SafeCollector.kt:127)
at com.amazonaws.ledger.storagemanagementservice.usage.MeteringSubmitterKt$diagnostics$$inlined$map$1.collect(SafeCollector.kt:127)
at com.amazonaws.ledger.storagemanagementservice.usage.MeteringSubmitterKt$diagnostics$$inlined$filter$1.collect(SafeCollector.kt:127)
at com.amazonaws.ledger.storagemanagementservice.usage.MeteringSubmitterKt$submissions$$inlined$map$1.collect(SafeCollector.kt:127)
at com.amazonaws.ledger.storagemanagementservice.usage.MeteringSubmitterKt.meter(MeteringSubmitter.kt:180)
at com.amazonaws.ledger.storagemanagementservice.usage.MeteringSubmitterKt$launchMeteringSubmitter$1$1$1.invokeSuspend(MeteringSubmitter.kt:81)
at com.amazonaws.ledger.storagemanagementservice.usage.MeteringSubmitterKt$launchMeteringSubmitter$1$1$1.invoke(MeteringSubmitter.kt)
at aws.quantum.metering.MeteringSessionKt.meteringSession(MeteringSession.kt:177)
at com.amazonaws.ledger.storagemanagementservice.usage.MeteringSubmitterKt$launchMeteringSubmitter$1$1.invokeSuspend(MeteringSubmitter.kt:78)
at com.amazonaws.ledger.storagemanagementservice.usage.MeteringSubmitterKt$launchMeteringSubmitter$1$1.invoke(MeteringSubmitter.kt)
at aws.quantum.metrics.PercentileMetricsKt$percentileMetrics$2$1.invokeSuspend(PercentileMetrics.kt:23)
at aws.quantum.metrics.PercentileMetricsKt$percentileMetrics$2$1.invoke(PercentileMetrics.kt)
at kotlinx.coroutines.intrinsics.UndispatchedKt.startUndispatchedOrReturn(Undispatched.kt:91)
at kotlinx.coroutines.BuildersKt__Builders_commonKt.withContext(Builders.common.kt:156)
at kotlinx.coroutines.BuildersKt.withContext(Unknown Source)
at aws.quantum.metrics.PercentileMetricsKt$percentileMetrics$2.invokeSuspend(PercentileMetrics.kt:22)
at aws.quantum.metrics.PercentileMetricsKt$percentileMetrics$2.invoke(PercentileMetrics.kt)
at aws.quantum.metrics.MetricHelpersKt$metrics$3.invokeSuspend(MetricHelpers.kt:98)
at aws.quantum.metrics.MetricHelpersKt$metrics$3.invoke(MetricHelpers.kt)
at kotlinx.coroutines.intrinsics.UndispatchedKt.startUndispatchedOrReturn(Undispatched.kt:91)
at kotlinx.coroutines.BuildersKt__Builders_commonKt.withContext(Builders.common.kt:156)
at kotlinx.coroutines.BuildersKt.withContext(Unknown Source)
at aws.quantum.metrics.MetricHelpersKt.metricsScope(MetricHelpers.kt:112)
at aws.quantum.metrics.MetricHelpersKt.metrics(MetricHelpers.kt:98)
at aws.quantum.metrics.PercentileMetricsKt.percentileMetrics(PercentileMetrics.kt:21)
at com.amazonaws.ledger.storagemanagementservice.usage.MeteringSubmitterKt$launchMeteringSubmitter$1.invokeSuspend(MeteringSubmitter.kt:77)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:32)
at kotlinx.coroutines.DispatchedTask.run(Dispatched.kt:241)
at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:270)
at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:79)
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:54)
at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
at com.amazonaws.ledger.storagemanagementservice.usage.StorageMeteringSubmitter.handleEvent(StorageMeteringSubmitter.kt:53)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
We (QLDB) recently upgraded to Kotlin Coroutines 1.3. Our code previously used Channel pervasively. During this upgrade we replaced some of those Channel uses with Flow and noticed
java.lang.ClassCastException
s.I haven't been able to create a small reproducible sample, but we got the ClassCastExceptions in 2 scenarios:
filter
) calls a function which has somewhere in itwithContext(Dispatchers.IO)
flowOn(Dispatchers.IO))
Is this because of context preservation?
Based on https://medium.com/@elizarov/execution-context-of-kotlin-flows-b8c151c9309b
it seems like what we're doing is fine.
If this is expected, how are we supposed to handle functions that change context? Our code base is full of functions of the form
If we change this to
then not only do we have to change all those functions, but now anyone calling these functions has to know (based on reading the implementation or us adding docs) that it's blocking and they need to change the dispatcher.
Here is one of the stacktraces: