smithy-lang / smithy-kotlin

Smithy code generator for Kotlin (in development)
Apache License 2.0
83 stars 26 forks source link

Exception in streaming call not propagated to caller's scope #1166

Open david-katz opened 1 month ago

david-katz commented 1 month ago

Describe the bug

An exception within a streaming sdk call is not being propagated to the caller's scope.

Regression Issue

Expected behavior

A streaming sdk call made within a given scope should propagate its exceptions back to that scope. An exception handler provided at launch should be called if the exception is not otherwise handled.

Current behavior

Exceptions that occur within the stream that are not handled within the stream are not propagated to the calling scope's exception handler. Instead the app's top-level exception handler is receiving the following as an unhandled exception:

    FATAL EXCEPTION: DefaultDispatcher-worker-5
    Process: myprocess, PID: 14065
    okhttp3.internal.http2.StreamResetException: stream was reset: CANCEL
    at okhttp3.internal.http2.Http2Stream.checkOutNotClosed$okhttp(Http2Stream.kt:646)
    at okhttp3.internal.http2.Http2Stream$FramingSink.emitFrame(Http2Stream.kt:557)
    at okhttp3.internal.http2.Http2Stream$FramingSink.write(Http2Stream.kt:532)
    at okio.ForwardingSink.write(ForwardingSink.kt:29)
    at okhttp3.internal.connection.Exchange$RequestBodySink.write(Exchange.kt:223)
    at okio.RealBufferedSink.emitCompleteSegments(RealBufferedSink.kt:256)
    at okio.RealBufferedSink.write(RealBufferedSink.kt:147)
    at aws.smithy.kotlin.runtime.http.engine.okhttp.InstrumentedSink.write(MetricsInterceptor.kt:51)
    at okio.RealBufferedSink.emitCompleteSegments(RealBufferedSink.kt:256)
    at okio.RealBufferedSink.write(RealBufferedSink.kt:147)
    at okio.RealBufferedSink.emit(RealBufferedSink.kt:262)
    at aws.smithy.kotlin.runtime.io.AbstractBufferedSinkAdapter.emit(BufferedSinkAdapter.kt:90)
    at aws.smithy.kotlin.runtime.io.SdkByteReadChannelKt$readAll$2.invokeSuspend(SdkByteReadChannel.kt:114)
    at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
    at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:101)
    at kotlinx.coroutines.internal.LimitedDispatcher$Worker.run(LimitedDispatcher.kt:113)
    at kotlinx.coroutines.scheduling.TaskImpl.run(Tasks.kt:89)
    at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:589)
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:823)
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:720)
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:707)
    Suppressed: okhttp3.internal.http2.StreamResetException: stream was reset: CANCEL
    at okhttp3.internal.http2.Http2Stream.checkOutNotClosed$okhttp(Http2Stream.kt:646)
    at okhttp3.internal.http2.Http2Stream$FramingSink.emitFrame(Http2Stream.kt:557)
    at okhttp3.internal.http2.Http2Stream$FramingSink.write(Http2Stream.kt:532)
    at okio.ForwardingSink.write(ForwardingSink.kt:29)
    at okhttp3.internal.connection.Exchange$RequestBodySink.write(Exchange.kt:223)
    at okio.RealBufferedSink.emitCompleteSegments(RealBufferedSink.kt:256)
    at okio.RealBufferedSink.write(RealBufferedSink.kt:147)
    at aws.smithy.kotlin.runtime.http.engine.okhttp.InstrumentedSink.write(MetricsInterceptor.kt:51)
    at okio.RealBufferedSink.close(RealBufferedSink.kt:280)
    at kotlin.io.CloseableKt.closeFinally(Closeable.kt:59)
    at aws.smithy.kotlin.runtime.http.engine.okhttp.StreamingRequestBody$doWriteTo$1.invokeSuspend(StreamingRequestBody.kt:64)
    at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
    at kotlinx.coroutines.UndispatchedCoroutine.afterResume(CoroutineContext.kt:266)
    at kotlinx.coroutines.AbstractCoroutine.resumeWith(AbstractCoroutine.kt:100)
    at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:46)
    ... 7 more
    Suppressed: kotlinx.coroutines.internal.DiagnosticCoroutineContextException: [TelemetryContext(aws.smithy.kotlin.runtime.telemetry.DefaultTelemetryProvider@9fa78e0), LoggingContextElement({rpc=Transcribe Streaming.StartStreamTranscription, sdkInvocationId=563166c0-e5e1-4c22-bbb4-96f5faa14d9e}), aws.smithy.kotlin.runtime.telemetry.context.TelemetryContextElement@db9e799, CoroutineName(call-context:send-request-body), StandaloneCoroutine{Cancelling}@60e6a5e, Dispatchers.IO]

Steps to Reproduce

The streaming call is being started within a coroutine as follows:

        val myDispatcher = Executors
            .newSingleThreadExecutor(VoiceInteractionThreadFactory)
            .asCoroutineDispatcher()

        // supervisor is SupervisorJob()
        val myScope = CoroutineScope(myDispatcher + supervisor)

        awsTranscribeJob =
                myScope.launch(RecognitionServiceExceptionHandler) {
                    withContext(myDispatcher) {
                        try {
                            awsTranscribe?.start()
                        } catch (e: AwsTranscribeException) {
                            Timber.tag(RECOGNITION_LOGGING_TAG).e(e, "Error processing transcript from AWS Transcribe")
                            launch(Dispatchers.Main) {
                                givenCallback?.error(SpeechRecognizer.ERROR_SERVER)
                            }
                        } catch (e: IOException) {
                            Timber.tag(RECOGNITION_LOGGING_TAG).e(e, "IO error getting transcript")
                            launch(Dispatchers.Main) {
                                givenCallback?.error(SpeechRecognizer.ERROR_NETWORK)
                            }
                    }
                }
    }
suspend fun start(): String {
        enableHttp2FrameLogging()
        fullMessage = StringBuilder()

        client =
            TranscribeStreamingClient {
                logMode = LogMode.LogRequest + LogMode.LogResponse
                credentialsProvider = myCredentialsProvider
                region = "us-east-1"
                httpClient =
                    OkHttp4Engine {
                        enableHttp2FrameLogging()
                    }

                interceptors = mutableListOf(AwsLoggingInterceptor())
            }

        val req =
            StartStreamTranscriptionRequest {
                audioStream = audioStreamFlow
                mediaSampleRateHertz = 16000
                mediaEncoding = MediaEncoding.Flac
                languageCode = LanguageCode.DeDe
                languageModelName = "my-language-model-de-DE-v1.2"
                enablePartialResultsStabilization = false
                // partialResultsStability = PartialResultsStability.Low
            }

            client?.startStreamTranscription(req) { resp ->
                resp.transcriptResultStream
                    ?.collect { event ->
                        logger.debug("event received on transcriptResultStream")
                        when (event) {
                            is TranscriptResultStream.TranscriptEvent -> {
                                logger.debug("AwsTranscribe TranscriptEvent received")
                                event.value.transcript?.results?.forEach { result ->
                                    val transcript = result.alternatives?.firstOrNull()?.transcript
                                    if (result.isPartial) {
                                        speechRecognitionResultsListener
                                            ?.onPartialResult(
                                                transcript
                                                    ?: "",
                                            )
                                    } else {
                                        transcript?.let {
                                            fullMessage.append(it)
                                            speechRecognitionResultsListener?.onResults(fullMessage.toString())
                                            stop()
                                        }
                                    }
                                }
                            }
                            else -> error("unknown event $event")
                        }
                    }
            }
        return fullMessage.toString()
    }

Possible Solution

In aws.smithy.kotlin.runtime.http.engine.okhttp.StreamingRequestBody#doWriteTo

Perhaps the GlobalScope.launch is the issue?

    private fun doWriteTo(sink: BufferedSink) {
        val context = callContext + callContext.derivedName("send-request-body")
        if (isDuplex()) {
            // launch coroutine that writes to sink in the background
            GlobalScope.launch(context + Dispatchers.IO) {
                sink.use { transferBody(it) }
            }
        } else {
            // remove the current dispatcher (if it exists) and use the internal
            // runBlocking dispatcher that blocks the *current* thread
            val blockingContext = context.minusKey(CoroutineDispatcher)

            // Non-duplex (aka "normal") requests MUST write all of their request body
            // before this function returns. Requests are given a background thread to
            // do this work in, and it is safe and expected to block.
            // see: https://square.github.io/okhttp/4.x/okhttp/okhttp3/-request-body/is-duplex/
            runBlocking(blockingContext) {
                transferBody(sink)
            }
        }
    }

Context

No response

Smithy-Kotlin version

1.3.17

Platform (JVM/JS/Native)

JVM

Operating system and version

Android 13

david-katz commented 1 month ago

I tried adding a .catch block to the stream processing:

                resp.transcriptResultStream
                    ?.catch { e ->
                        if (e !is Exception) throw e
                        logger.error("error in transcriptResultStream: ${e.message}")
                        speechRecognitionResultsListener?.onError(e)
                        stop()
                    }
                    ?.collect { event ->

but the result was that while this catch block was called, for instance when no audio was received by transcribe for 15 seconds, the StreamResetException was still unhandled.

ianbotsf commented 1 month ago

I've been able to reproduce this and I'm working on a fix and additional testing within the SDK. I'll post an update here when I have a PR.

ianbotsf commented 15 hours ago

Circling back to this, I do believe that our use of GlobalScope is not optimal and should be changed. That being said, I do not believe that using a better scope will affect the issue of exceptions disappearing from child jobs.

I think the problem lies in using launch without any additional constructs which would wait on the result or propagate the exception. Without those, exceptions produced by child jobs won't block the current thread, they'll merely happen in the background and be handled by the default coroutine exception handler (which prints them to the console). I suggest using coroutineScope { } or join() to propagate the exception:

awsTranscribeJob =
    myScope.launch(RecognitionServiceExceptionHandler) {
        withContext(myDispatcher) {
            try {
                awsTranscribe?.start()
            } catch (e: AwsTranscribeException) {
                Timber.tag(RECOGNITION_LOGGING_TAG).e(e, "Error processing transcript from AWS Transcribe")
                launch(Dispatchers.Main) {
                    givenCallback?.error(SpeechRecognizer.ERROR_SERVER)
                }
            } catch (e: IOException) {
                Timber.tag(RECOGNITION_LOGGING_TAG).e(e, "IO error getting transcript")
                launch(Dispatchers.Main) {
                    givenCallback?.error(SpeechRecognizer.ERROR_NETWORK)
                }
            }
        }
    }.join()

Additionally, I believe the use of SupervisorJob here means child jobs' exceptions will not cause any error to happen outside of the scope, meaning they also will disappear. Your RecognitionServiceExceptionHandler may be able to see they've occurred and log messages but recovery will be impossible.

Lastly, I'm unclear what AwsTranscribeException is in the catch clause. The Kotlin Transcribe client will throw exceptions which inherit from TranscribeStreamingException for service-related errors, including BadRequestException which occurs when an audio stream has been idle too long.

I don't know enough about your surrounding code and desired state to be certain but I believe your invocation of this API can be simplified to remove the explicit scope and supervisor job, handle the correct base exception type, and eliminate the need for the uncaught exception handler:

val myDispatcher = Executors
    .newSingleThreadExecutor(VoiceInteractionThreadFactory)
    .asCoroutineDispatcher()

awsTranscribeJob = coroutineScope {
    withContext(myDispatcher) {
        try {
            awsTranscribe?.start()
        } catch (e: TranscribeStreamingException) {
            Timber.tag(RECOGNITION_LOGGING_TAG).e(e, "Error processing transcript from AWS Transcribe")
            launch(Dispatchers.Main) {
                givenCallback?.error(SpeechRecognizer.ERROR_SERVER)
            }
        } catch (e: IOException) {
            Timber.tag(RECOGNITION_LOGGING_TAG).e(e, "IO error getting transcript")
            launch(Dispatchers.Main) {
                givenCallback?.error(SpeechRecognizer.ERROR_NETWORK)
            }
        }
    }
}

This should enable exceptions to be caught and handled appropriately, including leaving the audio stream idle for too long.

Can you review and let me know if that resolves the exception propagation problem?