micronaut-projects / micronaut-rabbitmq

Micronaut extensions to easily publish and consume messages with RabbitMQ
Apache License 2.0
18 stars 20 forks source link

Add Kotlin Coroutine support for @RabbitListener #532

Open Marek000 opened 1 year ago

Marek000 commented 1 year ago

Feature description

Please, would it be possible to support Kotlin Coroutines for @RabbitListener annotated methods? So it would be possible to use this annotation with suspend keyword on method? Currently it is not possible and I'm getting error below.

2023-08-29 10:11:24,814 [] [] [pool-4-thread-17] ERROR io.micronaut.rabbitmq.exception.DefaultRabbitListenerExceptionHandler Error processing a message for RabbitMQ consumer [cz.test.consumer.ProductListener@4cdac949]
io.micronaut.rabbitmq.exception.RabbitListenerException: An error occurred binding the message to the method
    at io.micronaut.rabbitmq.intercept.RabbitMQConsumerAdvice.lambda$process$2(RabbitMQConsumerAdvice.java:155)
    at io.micronaut.rabbitmq.intercept.RabbitMQConsumerAdvice$RecoverableConsumerWrapper$1.callbackHandle(RabbitMQConsumerAdvice.java:512)
    at io.micronaut.rabbitmq.intercept.RabbitMQConsumerAdvice$RecoverableConsumerWrapper$1.handleDelivery(RabbitMQConsumerAdvice.java:506)
    at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
    at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:111)
    at io.micrometer.core.instrument.composite.CompositeTimer.record(CompositeTimer.java:141)
    at io.micrometer.core.instrument.Timer.lambda$wrap$0(Timer.java:196)
    at io.micronaut.scheduling.instrument.InvocationInstrumenterWrappedRunnable.run(InvocationInstrumenterWrappedRunnable.java:47)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: io.micronaut.core.serialize.exceptions.SerializationException: Error decoding JSON stream for type [continuation]: Cannot construct instance of `kotlin.coroutines.Continuation` (no Creators, like default constructor, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
 at [Source: (byte[])"{"timestamp":"2023-08-29T09:21:33.873585+02:00"}"; line: 1, column: 1]
    at io.micronaut.rabbitmq.serdes.JsonRabbitMessageSerDes.deserialize(JsonRabbitMessageSerDes.java:79)
    at io.micronaut.rabbitmq.bind.RabbitBodyBinder.lambda$bind$0(RabbitBodyBinder.java:55)
    at java.base/java.util.Optional.map(Optional.java:265)
    at io.micronaut.rabbitmq.bind.RabbitBodyBinder.bind(RabbitBodyBinder.java:55)
    at io.micronaut.rabbitmq.bind.RabbitDefaultBinder.bind(RabbitDefaultBinder.java:59)
    at io.micronaut.rabbitmq.bind.RabbitDefaultBinder.bind(RabbitDefaultBinder.java:28)
    at io.micronaut.core.bind.DefaultExecutableBinder.bind(DefaultExecutableBinder.java:75)
    at io.micronaut.rabbitmq.intercept.RabbitMQConsumerAdvice.lambda$process$2(RabbitMQConsumerAdvice.java:153)
    ... 10 common frames omitted

I've found a similar issue for spring framework https://github.com/spring-projects/spring-amqp/issues/1210

Or maybe I am missing something when creating suspendable consumer. Thx

sdelamo commented 10 months ago

do you have a reproducible sample app?