micronaut-projects / micronaut-core

Micronaut Application Framework
http://micronaut.io
Apache License 2.0
6.04k stars 1.06k forks source link

COROUTINE_SUSPENDED response when calling coroutine in a suspending controller function which returns Flow #11131

Open vekonypeter opened 2 weeks ago

vekonypeter commented 2 weeks ago

Expected Behavior

I have a class annotated with @Controller and my method return Flow<...> as response. Inside the method I have to call another suspending function, because e.g. I want to fetch data from the database or call another API which all works via coroutines for me. Due to this my controller method is also a suspending.

When a client calls this method it should get the data sent via the Flow.

Actual Behaviour

status code is 200 , but the response is the following:

"COROUTINE_SUSPENDED"

Steps To Reproduce

/**
 * This returns "COROUTINE_SUSPENDED" as body
 */
@Get("foo1")
suspend fun foo1(): Flow<Char> {
    // get data from db or from other API
    val res = callDbOrOtherApi().toList()

    // want to return a flow that needs the result as an input
    return doSomeOtherStuff(res)
}

fun callDbOrOtherApi(): Flow<String> = flow {
    delay(1000) // other party needs some time to think

    // then it starts to stream the data
    listOf("val1", "val2")
        .forEach { emit(it) }
}

fun doSomeOtherStuff(input: List<String>): Flow<Char> =
    input.joinToString()
        .toList()
        .asFlow()

Environment Information

Example Application

No response

Version

4.4.2

vekonypeter commented 2 weeks ago

Workaround:

    @Get("foo2")
    fun foo2(): Flow<Char> {

        return flow {
            // get data from db or from other API
            emit(callDbOrOtherApi().toList())
        }.flatMapConcat { res ->

            // want to return a flow that needs the result as an input
            doSomeOtherStuff(res)
        }
    }
dstepanov commented 2 weeks ago

Please create a sample app that reproduces the problem. Make sure you have added Micronaut Kotlin dependency.

yawkat commented 2 weeks ago

combining Flow and suspend seems a bit weird to me

dstepanov commented 2 weeks ago

I have added a sample project with Flow and it's passing. https://github.com/micronaut-projects/micronaut-core/pull/11135 Please modify it to reproduce your problem

vekonypeter commented 2 weeks ago

@dstepanov sure, example application here based on your's: https://github.com/vekonypeter/miconaut-core-issue-11131/tree/main

I modified your example, because that way it is surely works. The problem is when you have a suspending function, which returns a Flow, but also contains some other suspending coroutine interaction before. See example here: https://github.com/vekonypeter/miconaut-core-issue-11131/blob/main/src/main/kotlin/com/example/HelloController.kt#L14-L18

Test result here is:

Expected :Hello World
Actual   :"COROUTINE_SUSPENDED"
<Click to see difference>

org.opentest4j.AssertionFailedError: expected: <Hello World> but was: <"COROUTINE_SUSPENDED">
    at org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
    at org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
    at org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
    at org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182)
    at org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177)
    at org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1145)
    at com.example.HelloControllerTest.testHelloWorld1(HelloControllerTest.kt:21)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at io.micronaut.test.extensions.junit5.MicronautJunit5Extension$2.proceed(MicronautJunit5Extension.java:142)
    at io.micronaut.test.extensions.AbstractMicronautExtension.interceptEach(AbstractMicronautExtension.java:162)
    at io.micronaut.test.extensions.AbstractMicronautExtension.interceptTest(AbstractMicronautExtension.java:119)
    at io.micronaut.test.extensions.junit5.MicronautJunit5Extension.interceptTestMethod(MicronautJunit5Extension.java:129)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)

Dependencies seem to be fine, micronaut-kotlin-runtime is added.

Is this a very exotic use-case? For us, it seems pretty common, because all of our database interactions and calls towards other APIs are done using suspending functions.

dstepanov commented 2 weeks ago

I see, but as Jonas wrote it doesn’t make sense to have suspended Flow.

yawkat commented 2 weeks ago

@vekonypeter the reason it's weird is that you're combining two reactive programming styles (coroutines and flows). In our framework view it becomes a Publisher<Publisher<String>>. You can "unwrap" this in one of the following ways:

Index: src/main/kotlin/com/example/HelloController.kt
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/src/main/kotlin/com/example/HelloController.kt b/src/main/kotlin/com/example/HelloController.kt
--- a/src/main/kotlin/com/example/HelloController.kt    (revision abe1e617c596a15913e542f4832d531535516e00)
+++ b/src/main/kotlin/com/example/HelloController.kt    (date 1725020074789)
@@ -6,15 +6,16 @@
 import kotlinx.coroutines.delay
 import kotlinx.coroutines.flow.Flow
 import kotlinx.coroutines.flow.asFlow
+import kotlinx.coroutines.flow.emitAll
 import kotlinx.coroutines.flow.flow

 @Controller("/hello")
 class HelloController {

     @Get(value = "/world1")
-    suspend fun world1(): Flow<String> {
+    fun world1(): Flow<String> = flow {
         delay(1000)
-        return listOf("Hello World").asFlow()
+        emitAll(listOf("Hello World").asFlow())
     }

     @Get(value = "/world2", produces = [MediaType.TEXT_PLAIN])
Index: src/main/kotlin/com/example/HelloController.kt
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/src/main/kotlin/com/example/HelloController.kt b/src/main/kotlin/com/example/HelloController.kt
--- a/src/main/kotlin/com/example/HelloController.kt    (revision abe1e617c596a15913e542f4832d531535516e00)
+++ b/src/main/kotlin/com/example/HelloController.kt    (date 1725020169453)
@@ -7,14 +7,15 @@
 import kotlinx.coroutines.flow.Flow
 import kotlinx.coroutines.flow.asFlow
 import kotlinx.coroutines.flow.flow
+import kotlinx.coroutines.flow.single

 @Controller("/hello")
 class HelloController {

     @Get(value = "/world1")
-    suspend fun world1(): Flow<String> {
+    suspend fun world1(): String {
         delay(1000)
-        return listOf("Hello World").asFlow()
+        return listOf("Hello World").asFlow().single()
     }

     @Get(value = "/world2", produces = [MediaType.TEXT_PLAIN])
vekonypeter commented 2 weeks ago

I don't fully understand what you mean by "combining two reactive programming styles (coroutines and flows)". Flows are an integral part of Kotlin coroutines. Maybe the example is just too simple, but imagine the following use-case:

in code:

suspend fun test(): Flow<Any> {
    val res = httpClientMethodCall() // this is a suspending function therefore test() must be also suspending
    return readSomeStuffFromDb(res)
}

I don't think that this is something super weird, pretty usual when you have everything implemented with coroutines and suspending functions. But this will return the "COROUTINE_SUSPENDED" response for sure.

I can make it work somehow like this and probably in a lot of other ways too, but it feels odd:

fun test(): Flow<Any> = flow {
    emit(httpClientMethodCall())
}.map { res ->
    readSomeStuffFromDb(res)
}
yawkat commented 2 weeks ago

Both a suspend method and a Flow represent the same thing: A result that will be produced asynchronously in the future (in reactive streams terms, both are a flow). Flow has some added features like multiple items and multiple consumers, but it is conceptually the same thing.

When you have a suspend method that returns a Flow, the framework first has to wait for the suspend method to complete, and then also for the flow to complete. This form of double flow is not supported.

FWIW, this is not exactly idiomatic in general kotlin code either, because it can be annoying to work with even without micronaut involved. This SO answer describes it well: https://stackoverflow.com/a/76031024