smallrye / smallrye-reactive-messaging

SmallRye Reactive Messaging
http://www.smallrye.io/smallrye-reactive-messaging/
Apache License 2.0
233 stars 174 forks source link

Improve Documentation #642

Closed obbyK closed 4 years ago

obbyK commented 4 years ago

I was searching for a solution to an error I was getting while using the amqp connector. After some googling of the error message SRMSG00200: The method com.example.OutgoingTicketConsumer#processOutgoing has thrown an exception: java.lang.ClassCastException: class io.vertx.core.json.JsonObject cannot be cast to class com.example.models.Ticket (io.vertx.core.json.JsonObject and com.example.models.Ticket are in unnamed module of loader io.quarkus.bootstrap.classloading.QuarkusClassLoader @4b88ca8e). I ended up finding the answer here, whereas the main documentation website (https://smallrye.io/smallrye-reactive-messaging/) didn't have it. It would be a very good learning experience to find most of what is required in the main website.

cescoffier commented 4 years ago

Would be updated during the next release.

Antora tends to link on the old (2) version instead of (2.1).

chris-aeviator commented 3 years ago

@obbyK how did you solve the issue? I'm facing the same with kafka + avro

cescoffier commented 3 years ago

@chris-aeviator can you provide the signature of the method?

chris-aeviator commented 3 years ago

@cescoffier thanks for your fast reply.

I'm seriously confused and overwhelmed by the amount of complexity and possibilities.

the error is 2020-10-19 20:15:11,624 ERROR [io.sma.rea.mes.provider] (vert.x-eventloop-thread-0) SRMSG00200: The method org.myzel.projectPlanner.ProjectInfoToPublishedProjectsMessageConsumer_7#consume has thrown an exception: java.lang.ClassCastException: class org.myzel.projectPlanner.domain.projectInfo cannot be cast to class java.lang.String (org.myzel.projectPlanner.domain.projectInfo is in unnamed module of loader io.quarkus.bootstrap.classloading.QuarkusClassLoader @d400943; java.lang.String is in module java.base of loader 'bootstrap') at org.myzel.projectPlanner.ProjectInfoToPublishedProjectsMessageConsumer_7_SmallRyeMessagingInvoker_consume_a793a9cc76abe8ea0eedf6d7f2db6d132889e355.invoke(ProjectInfoToPublishedProjectsMessageConsumer_7_SmallRyeMessagingInvoker_consume_a793a9cc76abe8ea0eedf6d7f2db6d132889e355.zig:46)

I'm using

According to https://quarkus.io/blog/kafka-avro/ I've setup avro-maven-plugin to do

        <outputDirectory>${project.build.directory}/generated-sources</outputDirectory>

in the generate-sources step. I'm also mvn generate-sources but this seems not to match with my project folder

since now I have a /target/classes and a /target/generated-sources dir - which seems wrong to me??? grafik

EDIT:

I can confirm, that the consumer codes raises the error, no matter if it's compiled into /target/classes (where the rest of the code is compiled to) or to /target/generated_sources

EDIT 2:

stacktrace

The method org.myzel.projectPlanner.ProjectInfoToPublishedProjectsMessageConsumer_7#consume has thrown an exception: java.lang.ClassCastException: class org.myzel.projectPlanner.domain.projectInfo cannot be cast to class java.lang.String (org.myzel.projectPlanner.domain.projectInfo is in unnamed module of loader io.quarkus.bootstrap.classloading.QuarkusClassLoader @4a31c2ee; java.lang.String is in module java.base of loader 'bootstrap')
    at org.myzel.projectPlanner.ProjectInfoToPublishedProjectsMessageConsumer_7_SmallRyeMessagingInvoker_consume_a793a9cc76abe8ea0eedf6d7f2db6d132889e355.invoke(ProjectInfoToPublishedProjectsMessageConsumer_7_SmallRyeMessagingInvoker_consume_a793a9cc76abe8ea0eedf6d7f2db6d132889e355.zig:46)
    at io.smallrye.reactive.messaging.AbstractMediator.invoke(AbstractMediator.java:79)
    at io.smallrye.reactive.messaging.SubscriberMediator.lambda$null$3(SubscriberMediator.java:143)
    at io.smallrye.mutiny.operators.UniOnItemTransform$1.onItem(UniOnItemTransform.java:31)
    at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$1.lambda$onItem$1(ContextPropagationUniInterceptor.java:35)
    at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:217)
    at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$1.onItem(ContextPropagationUniInterceptor.java:35)
    at io.smallrye.mutiny.operators.UniSerializedSubscriber.onItem(UniSerializedSubscriber.java:72)
    at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$1.lambda$onItem$1(ContextPropagationUniInterceptor.java:35)
    at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:217)
    at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$1.onItem(ContextPropagationUniInterceptor.java:35)
    at io.smallrye.mutiny.operators.UniSerializedSubscriber.onItem(UniSerializedSubscriber.java:72)
    at io.smallrye.mutiny.operators.UniCreateFromCompletionStage.lambda$forwardFromCompletionStage$1(UniCreateFromCompletionStage.java:30)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:883)
    at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2322)
    at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:143)
    at io.smallrye.mutiny.operators.UniCreateFromCompletionStage.forwardFromCompletionStage(UniCreateFromCompletionStage.java:22)
    at io.smallrye.mutiny.operators.UniCreateFromCompletionStage.subscribing(UniCreateFromCompletionStage.java:50)
    at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:43)
    at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:38)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:30)
    at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$2.lambda$subscribing$0(ContextPropagationUniInterceptor.java:51)
    at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:217)
    at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$2.subscribing(ContextPropagationUniInterceptor.java:51)
    at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:43)
    at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:38)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:30)
    at io.smallrye.mutiny.operators.UniOnItemTransform.subscribing(UniOnItemTransform.java:19)
    at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:43)
    at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:38)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:30)
    at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$2.lambda$subscribing$0(ContextPropagationUniInterceptor.java:51)
    at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:217)
    at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$2.subscribing(ContextPropagationUniInterceptor.java:51)
    at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:43)
    at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:38)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:30)
    at io.smallrye.mutiny.operators.UniOnItemOrFailureFlatMap.subscribing(UniOnItemOrFailureFlatMap.java:48)
    at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:43)
    at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:38)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:30)
    at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$2.lambda$subscribing$0(ContextPropagationUniInterceptor.java:51)
    at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:217)
    at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$2.subscribing(ContextPropagationUniInterceptor.java:51)
    at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:43)
    at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:38)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:30)
    at io.smallrye.mutiny.operators.UniSubscribeToCompletionStage.subscribe(UniSubscribeToCompletionStage.java:32)
    at io.smallrye.mutiny.groups.UniSubscribe.asCompletionStage(UniSubscribe.java:104)
    at io.smallrye.mutiny.Uni.subscribeAsCompletionStage(Uni.java:162)
    at io.smallrye.reactive.messaging.SubscriberMediator.lambda$processMethodReturningVoid$4(SubscriberMediator.java:145)
    at io.smallrye.mutiny.streams.stages.FlatMapCompletionStageFactory$FlatMapCompletionStage.lambda$apply$0(FlatMapCompletionStageFactory.java:46)
    at io.smallrye.mutiny.groups.MultiOnItem.lambda$transformToUni$5(MultiOnItem.java:290)
    at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.onItem(MultiFlatMapOp.java:176)
    at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
    at io.smallrye.mutiny.subscription.SerializedSubscriber.onItem(SerializedSubscriber.java:69)
    at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
    at io.smallrye.mutiny.subscription.SafeSubscriber.onNext(SafeSubscriber.java:87)
    at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.lambda$onNext$1(ContextPropagationMultiInterceptor.java:36)
    at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:217)
    at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.onNext(ContextPropagationMultiInterceptor.java:36)
    at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.lambda$onNext$1(ContextPropagationMultiInterceptor.java:36)
    at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:217)
    at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.onNext(ContextPropagationMultiInterceptor.java:36)
    at io.smallrye.mutiny.subscription.SafeSubscriber.onNext(SafeSubscriber.java:87)
    at io.smallrye.mutiny.helpers.HalfSerializer.onNext(HalfSerializer.java:31)
    at io.smallrye.mutiny.helpers.StrictMultiSubscriber.onItem(StrictMultiSubscriber.java:81)
    at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
    at io.smallrye.mutiny.streams.utils.ConnectableProcessor.onNext(ConnectableProcessor.java:122)
    at org.eclipse.microprofile.reactive.streams.operators.CompletionSubscriber$1DefaultCompletionSubscriber.onNext(CompletionSubscriber.java:85)
    at org.eclipse.microprofile.reactive.streams.operators.CompletionSubscriber$1DefaultCompletionSubscriber.onNext(CompletionSubscriber.java:85)
    at io.smallrye.reactive.messaging.SubscriberMediator$1.onNext(SubscriberMediator.java:107)
    at io.smallrye.reactive.messaging.SubscriberMediator$1.onNext(SubscriberMediator.java:98)
    at io.smallrye.mutiny.helpers.HalfSerializer.onNext(HalfSerializer.java:31)
    at io.smallrye.mutiny.helpers.StrictMultiSubscriber.onItem(StrictMultiSubscriber.java:81)
    at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
    at io.smallrye.mutiny.streams.utils.WrappedSubscriber.onNext(WrappedSubscriber.java:46)
    at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.lambda$onNext$1(ContextPropagationMultiInterceptor.java:36)
    at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:217)
    at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.onNext(ContextPropagationMultiInterceptor.java:36)
    at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.lambda$onNext$1(ContextPropagationMultiInterceptor.java:36)
    at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:217)
    at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.onNext(ContextPropagationMultiInterceptor.java:36)
    at io.smallrye.mutiny.subscription.SafeSubscriber.onNext(SafeSubscriber.java:87)
    at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.lambda$onNext$1(ContextPropagationMultiInterceptor.java:36)
    at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:217)
    at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.onNext(ContextPropagationMultiInterceptor.java:36)
    at io.smallrye.mutiny.helpers.HalfSerializer.onNext(HalfSerializer.java:31)
    at io.smallrye.mutiny.helpers.StrictMultiSubscriber.onItem(StrictMultiSubscriber.java:81)
    at io.smallrye.mutiny.operators.multi.MultiSignalConsumerOp$SignalSubscriber.onItem(MultiSignalConsumerOp.java:107)
    at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
    at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.lambda$onNext$1(ContextPropagationMultiInterceptor.java:36)
    at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:217)
    at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.onNext(ContextPropagationMultiInterceptor.java:36)
    at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.lambda$onNext$1(ContextPropagationMultiInterceptor.java:36)
    at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:217)
    at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.onNext(ContextPropagationMultiInterceptor.java:36)
    at io.smallrye.mutiny.helpers.HalfSerializer.onNext(HalfSerializer.java:31)
    at io.smallrye.mutiny.helpers.StrictMultiSubscriber.onItem(StrictMultiSubscriber.java:81)
    at io.smallrye.mutiny.operators.multi.MultiMapOp$MapProcessor.onItem(MultiMapOp.java:50)
    at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
    at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.lambda$onNext$1(ContextPropagationMultiInterceptor.java:36)
    at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:217)
    at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.onNext(ContextPropagationMultiInterceptor.java:36)
    at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.lambda$onNext$1(ContextPropagationMultiInterceptor.java:36)
    at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:217)
    at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.onNext(ContextPropagationMultiInterceptor.java:36)
    at io.smallrye.mutiny.helpers.HalfSerializer.onNext(HalfSerializer.java:31)
    at io.smallrye.mutiny.helpers.StrictMultiSubscriber.onItem(StrictMultiSubscriber.java:81)
    at io.smallrye.mutiny.operators.multi.MultiOperatorProcessor.onItem(MultiOperatorProcessor.java:63)
    at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
    at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.lambda$onNext$1(ContextPropagationMultiInterceptor.java:36)
    at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:217)
    at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.onNext(ContextPropagationMultiInterceptor.java:36)
    at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.lambda$onNext$1(ContextPropagationMultiInterceptor.java:36)
    at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:217)
    at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.onNext(ContextPropagationMultiInterceptor.java:36)
    at io.smallrye.mutiny.helpers.HalfSerializer.onNext(HalfSerializer.java:31)
    at io.smallrye.mutiny.helpers.StrictMultiSubscriber.onItem(StrictMultiSubscriber.java:81)
    at io.smallrye.mutiny.operators.multi.MultiSignalConsumerOp$SignalSubscriber.onItem(MultiSignalConsumerOp.java:107)
    at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
    at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.lambda$onNext$1(ContextPropagationMultiInterceptor.java:36)
    at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:217)
    at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.onNext(ContextPropagationMultiInterceptor.java:36)
    at io.smallrye.mutiny.vertx.MultiReadStream.lambda$subscribe$2(MultiReadStream.java:76)
    at io.vertx.kafka.client.consumer.impl.KafkaConsumerImpl.lambda$handler$1(KafkaConsumerImpl.java:80)
    at io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl.run(KafkaReadStreamImpl.java:230)
    at io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl.lambda$schedule$8(KafkaReadStreamImpl.java:185)
    at io.vertx.core.impl.ContextImpl.executeTask(ContextImpl.java:366)
    at io.vertx.core.impl.EventLoopContext.lambda$executeAsync$0(EventLoopContext.java:38)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:830)

2020-10-19 20:45:02,574 ERROR [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-0) SRMSG18203: A message sent to channel `projectInfo` has been nacked, fail-stop
chris-aeviator commented 3 years ago

by the way the consumer is crashing successfully on a consumed message only :0

chris-aeviator commented 3 years ago

I can see the record successfully being deserialized with the appropriate avro scheme before the error get's thrown

grafik

my avro scheme

{
  "type": "record",
  "name": "projectInfo",
  "namespace": "org.myzel.projectPlanner.domain",
  "fields": [
    {
      "name": "title",
      "type": "string"
    },
    {
      "name": "project",
      "type": "string"
    },
    {
        "name": "description",
        "type": "string"
    }
  ]
}

Is the key maybe the problem? I did not overwrite key.deserializer, and the key is coming in as a uuid hex string

cescoffier commented 3 years ago

Can you give me the signature of the method receiving (or expecting to receive) the message? String key should be fine, but the signature of the method should be something like:

@Incoming(...)
public void consume(projectInfo info) {
 // ...
}

BTW, the projectInfo class not using the Java convention, it should be ProjectInfo. I don't believe it's the issue here, but something you may want to fix at some point.

chris-aeviator commented 3 years ago

@cescoffier

I don't use @incoming since I'm working with kogito here. My code (& "wiring" via the bmpn file) follows https://github.com/kiegroup/kogito-examples/tree/stable/process-kafka-quickstart-quarkus which does also not have @incoming annotations. It allows though to pass the data from a reactive messaging message ("travellers") to be passed into a function defined also in the bpmn file.

image

EDIT:

this is my config grafik

cescoffier commented 3 years ago

From the error message, Kogito expects a String and not a ProjectInfo. I don't know anything about Kogito. The Kafka / Reactive Messaging parts is ok (it does get the message, and deserialize them correctly).

chris-aeviator commented 3 years ago

It seems you've pointed me into the right direction.

When changing to capital ProjectInfo, telling avro-maven to generate into /src/main/java and remove my Handmade ProjectInfo.java it makes much more sense, since it seemed just a random part has received the message. Currently fiddling with the drools part but I will write an update whenever I solve this.

Thanks for your explanation of what's happening.

chris-aeviator commented 3 years ago

I'm ending up with the exact same Issue...

chris-aeviator commented 3 years ago

In the generated sources I can now see

    @org.eclipse.microprofile.reactive.messaging.Incoming("projectInfo")
    public void consume(String payload) {
        eventConsumerFactory.get(event -> {
            ProjectInfoToPublishedProjectsModel model = new ProjectInfoToPublishedProjectsModel();
            model.setProjectInfo(event);
            return model;
        }, org.myzel.projectPlanner.domain.ProjectInfo.class, ProjectInfoToPublishedProjectsMessageDataEvent_7.class, useCloudEvents).consume(application, process, payload, "projectInfo");
    }
cescoffier commented 3 years ago

So, it expects a String. Is there anything you can configure to use another type? (is it something you can configure).

You may want to bring this issue to Zulip where the Kogito team can help.

chris-aeviator commented 3 years ago

It is setup to take com.myzel.projectPlanner.domain.ProjectInfo . It follows the example Repo - I'll try to chat with them, thanks

chris-aeviator commented 3 years ago

@cescoffier this is indeed an issue with kogito, they have their own deserilization for events in between and thus the bean vs. string issue. They have an issue on https://issues.redhat.com/browse/KOGITO-2866 that should deal with it for anybody interested.

chris-aeviator commented 3 years ago

I solved this by adding a consumer as ProjectInfoConsumer that consumes the avro message and forwards it as a string in an reactive message

    @Incoming("ProjectInfo")
    @Outgoing("projectInfo")
    public String consume(ProjectInfo projectInfo) {
        logger.info("received projectinfo on consumer");
        return projectInfo.toString();
    }

whereas ProjectInfo is the avro-enabled kafka message coming in and projectInfo is the message that's passed internally, now everything is working as expected. Kogito team was really cool and they added avro support to their feature timeline (https://issues.redhat.com/browse/KOGITO-3647)