spring-cloud / spring-cloud-function

Apache License 2.0
1.04k stars 616 forks source link

Exception caused by suspend lambda of kotlin that implements java.util.function.Consumer #1081

Open tonny1983 opened 1 year ago

tonny1983 commented 1 year ago

Describe the bug According to the docs, a lambda can be used to implement java.utl.function.Cusumer.

However, if calls a suspend method in the body, the lambda must add suspend keywords which casuses an exception of java.lang.UnsupportedOperationException: Multi argument Kotlin functions are not currently supported.

Sample Suppose I have a DataRepository interface which extends CoroutineCrudRepository, hence the DataRepository#save method is a suspend method (In the CoroutineCrudRepository interface, it is suspend fun <S : T> save(entity: S): T).

And the function class is

@Configuration
class DataFunction {
    @Bean
    fun consumerData(dataRepository: DataRepository): suspend (Data) -> Unit =
         {
            val result = dataRepository.save(it)
        }
}

and it cause the exception

org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'functionBindingRegistrar' defined in class path resource [org/springframework/cloud/stream/function/FunctionConfiguration.class]: Multi argument Kotlin functions are not currently supported
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1770)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:598)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:520)
    at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:325)
    at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234)
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:323)
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:199)
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:973)
    at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:942)
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:608)
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:737)
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:439)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:315)
    at org.springframework.boot.test.context.SpringBootContextLoader.lambda$loadContext$3(SpringBootContextLoader.java:137)
    at org.springframework.util.function.ThrowingSupplier.get(ThrowingSupplier.java:58)
    at org.springframework.util.function.ThrowingSupplier.get(ThrowingSupplier.java:46)
    at org.springframework.boot.SpringApplication.withHook(SpringApplication.java:1409)
    at org.springframework.boot.test.context.SpringBootContextLoader$ContextLoaderHook.run(SpringBootContextLoader.java:545)
    at org.springframework.boot.test.context.SpringBootContextLoader.loadContext(SpringBootContextLoader.java:137)
    at org.springframework.boot.test.context.SpringBootContextLoader.loadContext(SpringBootContextLoader.java:108)
    at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContextInternal(DefaultCacheAwareContextLoaderDelegate.java:187)
    at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:119)
    at org.springframework.test.context.support.DefaultTestContext.getApplicationContext(DefaultTestContext.java:127)
    at io.kotest.extensions.spring.SpringAutowireConstructorExtension.instantiate(SpringAutowireConstructorExtension.kt:27)
    at io.kotest.engine.spec.InstantiateSpecKt.createAndInitializeSpec(instantiateSpec.kt:30)
    at io.kotest.engine.spec.InstantiateSpecKt.instantiate(instantiateSpec.kt:11)
    at io.kotest.engine.spec.SpecRefKt.instance(SpecRef.kt:14)
    at io.kotest.engine.spec.SpecExecutor.createInstance-gIAlu-s(SpecExecutor.kt:63)
    at io.kotest.engine.spec.SpecExecutor.access$createInstance-gIAlu-s(SpecExecutor.kt:24)
    at io.kotest.engine.spec.SpecExecutor$execute$innerExecute$1.invokeSuspend(SpecExecutor.kt:40)
    at io.kotest.engine.spec.SpecExecutor$execute$innerExecute$1.invoke(SpecExecutor.kt)
    at io.kotest.engine.spec.SpecExecutor$execute$innerExecute$1.invoke(SpecExecutor.kt)
    at io.kotest.engine.spec.interceptor.ref.FinalizeSpecInterceptor.intercept-0E7RQCE(FinalizeSpecInterceptor.kt:24)
    at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invokeSuspend(SpecRefInterceptorPipeline.kt:49)
    at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
    at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
    at io.kotest.engine.spec.interceptor.ref.BeforeSpecStateInterceptor.intercept-0E7RQCE(BeforeSpecStateInterceptor.kt:25)
    at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invokeSuspend(SpecRefInterceptorPipeline.kt:49)
    at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
    at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
    at io.kotest.engine.spec.interceptor.ref.PrepareSpecInterceptor.intercept-0E7RQCE(PrepareSpecInterceptor.kt:25)
    at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invokeSuspend(SpecRefInterceptorPipeline.kt:49)
    at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
    at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
    at io.kotest.engine.spec.interceptor.ref.ApplyExtensionsInterceptor.intercept-0E7RQCE(ApplyExtensionsInterceptor.kt:36)
    at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invokeSuspend(SpecRefInterceptorPipeline.kt:49)
    at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
    at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
    at io.kotest.engine.spec.interceptor.ref.SpecFinishedInterceptor.intercept-0E7RQCE(SpecFinishedInterceptor.kt:22)
    at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invokeSuspend(SpecRefInterceptorPipeline.kt:49)
    at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
    at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
    at io.kotest.engine.spec.interceptor.ref.SpecStartedInterceptor.intercept-0E7RQCE(SpecStartedInterceptor.kt:20)
    at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invokeSuspend(SpecRefInterceptorPipeline.kt:49)
    at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
    at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
    at io.kotest.engine.spec.interceptor.ref.SpecRefExtensionInterceptor$intercept$inner$1.invokeSuspend(SpecRefExtensionInterceptor.kt:28)
    at io.kotest.engine.spec.interceptor.ref.SpecRefExtensionInterceptor$intercept$inner$1.invoke(SpecRefExtensionInterceptor.kt)
    at io.kotest.engine.spec.interceptor.ref.SpecRefExtensionInterceptor$intercept$inner$1.invoke(SpecRefExtensionInterceptor.kt)
    at io.kotest.engine.spec.interceptor.ref.SpecRefExtensionInterceptor.intercept-0E7RQCE(SpecRefExtensionInterceptor.kt:31)
    at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invokeSuspend(SpecRefInterceptorPipeline.kt:49)
    at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
    at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
    at io.kotest.engine.spec.interceptor.ref.RequiresTagInterceptor.intercept-0E7RQCE(RequiresTagInterceptor.kt:35)
    at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invokeSuspend(SpecRefInterceptorPipeline.kt:49)
    at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
    at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
    at io.kotest.engine.spec.interceptor.ref.TagsInterceptor.intercept-0E7RQCE(TagsInterceptor.kt:38)
    at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invokeSuspend(SpecRefInterceptorPipeline.kt:49)
    at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
    at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
    at io.kotest.engine.spec.interceptor.ref.SystemPropertySpecFilterInterceptor.intercept-0E7RQCE(SystemPropertySpecFilterInterceptor.kt:48)
    at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invokeSuspend(SpecRefInterceptorPipeline.kt:49)
    at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
    at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
    at io.kotest.engine.spec.interceptor.ref.SpecFilterInterceptor.intercept-0E7RQCE(SpecFilterInterceptor.kt:38)
    at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invokeSuspend(SpecRefInterceptorPipeline.kt:49)
    at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
    at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
    at io.kotest.engine.spec.interceptor.ref.IgnoredSpecInterceptor.intercept-0E7RQCE(IgnoredSpecInterceptor.kt:51)
    at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invokeSuspend(SpecRefInterceptorPipeline.kt:49)
    at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
    at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
    at io.kotest.engine.spec.interceptor.ref.EnabledIfInterceptor.intercept-0E7RQCE(EnabledIfInterceptor.kt:41)
    at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invokeSuspend(SpecRefInterceptorPipeline.kt:49)
    at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
    at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
    at io.kotest.engine.spec.interceptor.ref.RequiresPlatformInterceptor.intercept-0E7RQCE(RequiresPlatformInterceptor.kt:32)
    at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invokeSuspend(SpecRefInterceptorPipeline.kt:49)
    at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
    at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
    at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline.execute-0E7RQCE(SpecRefInterceptorPipeline.kt:50)
    at io.kotest.engine.spec.SpecExecutor.execute(SpecExecutor.kt:42)
    at io.kotest.engine.ConcurrentTestSuiteScheduler$schedule$8$1$1$2.invokeSuspend(ConcurrentTestSuiteScheduler.kt:72)
    at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
    at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:108)
    at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:280)
    at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:85)
    at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:59)
    at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
    at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:38)
    at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
    at io.kotest.common.RunBlockingKt.runBlocking(runBlocking.kt:3)
    at io.kotest.engine.launcher.MainKt.main(main.kt:34)
Caused by: java.lang.UnsupportedOperationException: Multi argument Kotlin functions are not currently supported
    at org.springframework.cloud.function.context.config.KotlinLambdaToFunctionAutoConfiguration$KotlinFunctionWrapper.getFunctionRegistration(KotlinLambdaToFunctionAutoConfiguration.java:200)
    at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry.lookup(BeanFactoryAwareFunctionRegistry.java:165)
    at org.springframework.cloud.function.context.FunctionCatalog.lookup(FunctionCatalog.java:39)
    at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionBindingRegistrar.afterPropertiesSet(FunctionConfiguration.java:877)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1817)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1766)
    ... 103 common frames omitted

Other tries If using kotlinx.coroutines.flow.Flow, a non-suspend method CoroutineCrudRepository#saveAll can be used. Therefore, the lambda signature can be changed to (Flow<Data>) -> Unit which causes no exceptions in startup.

However, if call the function with a message, another exception occurs:

Caused by: java.lang.ClassCastException: class Data cannot be cast to class kotlinx.coroutines.flow.Flow (Data and kotlinx.coroutines.flow.Flow are in unnamed module of loader 'app')
    at org.springframework.cloud.function.context.config.KotlinLambdaToFunctionAutoConfiguration$KotlinFunctionWrapper.invoke(KotlinLambdaToFunctionAutoConfiguration.java:124)
    at org.springframework.cloud.function.context.config.KotlinLambdaToFunctionAutoConfiguration$KotlinFunctionWrapper.apply(KotlinLambdaToFunctionAutoConfiguration.java:99)
    at org.springframework.cloud.function.context.config.KotlinLambdaToFunctionAutoConfiguration$KotlinFunctionWrapper.accept(KotlinLambdaToFunctionAutoConfiguration.java:146)
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.invokeConsumer(SimpleFunctionRegistry.java:1029)
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:731)
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:577)
    at org.springframework.cloud.stream.function.PartitionAwareFunctionWrapper.apply(PartitionAwareFunctionWrapper.java:92)
    at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionWrapper.apply(FunctionConfiguration.java:832)
    at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.handleMessageInternal(FunctionConfiguration.java:661)
    at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)

ugly workaound Calling suspend method in a runBlocking block that makes the Consumer itself be not in a suspend one.

The version of spring-cloud-function is 4.0.5 (spring-cloud 2022.0.4).

olegz commented 12 months ago

Have you looked at one of our tests with coroutines? - https://github.com/spring-cloud/spring-cloud-function/blob/71ee83ef4b649e673d15139ec726e8237bc60ee8/spring-cloud-function-kotlin/src/test/java/org/springframework/cloud/function/kotlin/ContextFunctionCatalogAutoConfigurationKotlinSuspendTests.java#L34

tonny1983 commented 12 months ago

Hi @olegz ,

Thanks a lot for your kind reply.

The test code you mentioned is written in Java, while I use kotest in my project.

When comparing these codes, I got some further information.

1. when "Multi argument Kotlin functions are not currently supported" exception occurs The exception occurs when using a raw object as input parameter, viz., suspend (Data) -> Unit will cause the exception but suspend (Flow<Data>) -> Unit will not.

The reason of this result is due to isValidKotlinSuspendConsumer method defined in KotlinLambdaToFunctionAutoConfiguration class which does 4 checks:

isTypeRepresentedByClass(functionType, Function2.class) &&
type.length == 3 &&
CoroutinesUtils.isFlowType(type[0]) &&
CoroutinesUtils.isContinuationUnitType(type[1])

Obviously, the third one which checks whether the input patameter is a Flow type or not failed. I'm not sure whethere it is an expected behaviour because no ducoments describe using coroutine in Spring Cloud Function, but it is a normal case to use raw object rather than a Flow object as a suspend method's paramter using kotlin coroutine (CoroutineCrudRepository is just a great example).

*2. the problem of using `Flow<>`** In your mentioned Java test code, I rewrote it to kotlin style and added something to test result of the function:

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class KotlinSuspendFunctionJUnitTest {
    private lateinit var context: GenericApplicationContext

    private lateinit var catalog: FunctionCatalog

    @AfterEach
    fun close() {
        context.close()
    }

    @Test
    fun typeDiscoveryTests() {
        create(arrayOf(KotlinSuspendFlowLambdasConfiguration::class.java))
        val functionCatalog = context.getBean(FunctionCatalog::class.java)
        val kotlinFunction = functionCatalog.lookup<FunctionInvocationWrapper>("kotlinFunction")
        Assertions.assertThat(kotlinFunction.isFunction).isTrue()
        Assertions.assertThat(kotlinFunction.inputType.typeName)
            .isEqualTo("reactor.core.publisher.Flux<java.lang.String>")
        Assertions.assertThat(kotlinFunction.outputType.typeName)
            .isEqualTo("reactor.core.publisher.Flux<java.lang.String>")
        // The following code runs correctly
        val result = kotlinFunction.apply(Flux.just("abcd")) as  Flux<String>
        StepVerifier.create(result).expectNext("ABCD").verifyComplete()
    }

    private fun create(types: Array<Class<*>>, vararg props: String) {
        context = SpringApplicationBuilder(*types).properties(*props).run() as GenericApplicationContext
        catalog = context.getBean(FunctionCatalog::class.java)
    }
}

As you can see, if using Flux which is the assertion checked input and output type, the test passes without errors. However, if trying to use Flow which is just as the same as the definition of the kotlinFunction method, an exception occured and the test failed: