Closed mostafacs closed 3 years ago
What are you trying to achieve?
You do the aggregation with:
@Incoming("consumed-channel")
@Outgoing("aggregated-channel")
public Multi<Message<ProcessRequest>> aggregate(Multi<Message<ProcessRequest>> in) {
return in.groupItems().intoLists().every(Duration.ofSeconds(1));
}
The aggregated-channel
will receive List<Message<ProcessRequest>>
.
Note that you will need to handle the message acknowledgment yourself in this case.
Another solution would be to use pre-acknowledgement:
@Incoming("consumed-channel")
@Outgoing("aggregated-channel")
public Multi<ProcessRequest> aggregate(Multi<ProcessRequest> in) {
return in.groupItems().intoLists().every(Duration.ofSeconds(1));
}
The aggregated-channel
will receive List<ProcessRequest>
, and the messages from Kafka are already acknowledged (before entering the aggregate
method.
Got it. thank you. I miss reading about Multi. probably because not mentioned in @Incoming methods in quarkus reactive messaging tutorial guide.
Greetings to all, in my use case send kafka producer messages are sent one by one. I want to read them as a list in the consumer application. I can do that at the Spring Kafka library.
spring kafka batch listeners
It would be nice if I could do this in quarkus-smallrye-reactive-messaging-kafka
too.
@cescoffier I tried your answer. But I got the error, java.lang.ClassCastException: class ExampleConsumer cannot be cast to class io.smallrye.mutiny.Multi
@suleymancan do you have the code you used?
ooh sorry @cescoffier, I am writing:
application.properties
kafka.bootstrap.servers=hosts
mp.messaging.connector.smallrye-kafka.group.id=KafkaQuick
mp.messaging.connector.smallrye-kafka.auto.offset.reset=earliest
mp.messaging.incoming.test-consumer.connector=smallrye-kafka
mp.messaging.incoming.test-consumer.value.deserializer=org.batch.TestConsumerDeserializer
TestConsumerDeserializer.class
I may have made a mistake here. I added value.deserializer because it is mandatory
public class TestConsumerDeserializer extends JsonbDeserializer<TestConsumer> {
public TestConsumerDeserializer(){
// pass the class to the parent.
super(TestConsumer.class);
}
}
MyConsumer.class
@ApplicationScoped
public class MyConsumer {
@Incoming("test-consumer")
//@Outgoing("aggregated-channel")
public void aggregate(Multi<Message<TestConsumer>> in) {
System.out.println(in);
}
}
You are injecting CouponBatchItem while you deserialized TestConsumer.
My fault. I tried to sample code here. I misspelled some class names. I edited my message. Sorry 🙏 I keep getting casting errors when I try the suggested solution.
I tried to use your aggregate code above and I got the following cast exception.
My consumer code:
@Incoming("channel-in")
public void processFeed(Multi<RawListingRequest> request) {
System.out.println("Request Received");
}
2021-04-13 16:33:30,883 INFO [org.apa.kaf.cli.con.int.ConsumerCoordinator] (vert.x-kafka-consumer-thread-0) [Consumer clientId=kafka-consumer-channel-in, groupId=b6d1a59d-1ab2-4107-a31f-38aa66f1f22a] Revoke previously assigned partitions ebayListing-0
2021-04-13 16:33:30,891 ERROR [io.sma.rea.mes.provider] (vert.x-eventloop-thread-0) SRMSG00201: Error caught while processing a message: java.lang.ClassCastException: com.my.listing.request.form.RawListingRequest cannot be cast to io.smallrye.mutiny.Multi
at com.sellware.feeder.EbayFeedListerConsumer_SmallRyeMessagingInvoker_aggregate_39233801fb1ef8724bfe593850d65b926ec2e4b5.invoke(EbayFeedListerConsumer_SmallRyeMessagingInvoker_aggregate_39233801fb1ef8724bfe593850d65b926ec2e4b5.zig:46)
at io.smallrye.reactive.messaging.AbstractMediator.invoke(AbstractMediator.java:94)
at io.smallrye.reactive.messaging.SubscriberMediator.lambda$null$3(SubscriberMediator.java:143)
at io.smallrye.mutiny.operators.UniOnItemTransform$1.onItem(UniOnItemTransform.java:26)
at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$1.lambda$onItem$1(ContextPropagationUniInterceptor.java:31)
at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$1(SmallRyeThreadContext.java:530)
at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$1.onItem(ContextPropagationUniInterceptor.java:31)
at io.smallrye.mutiny.operators.UniSerializedSubscriber.onItem(UniSerializedSubscriber.java:86)
at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$1.lambda$onItem$1(ContextPropagationUniInterceptor.java:31)
at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$1(SmallRyeThreadContext.java:530)
at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$1.onItem(ContextPropagationUniInterceptor.java:31)
at io.smallrye.mutiny.operators.UniSerializedSubscriber.onItem(UniSerializedSubscriber.java:86)
It's because a method receiving a multi cannot return void
. There is an issue to add that, but I'm still puzzled as it would mean that you need to handle the subscription and back-pressure protocol yourself.
So, in the meantime, try with:
@Incoming("channel-in")
@Outgoing("junk")
public Multi<X> processFeed(Multi<RawListingRequest> request) {
return request
.group().asList().of(5)
.onItem().transform(list -> ...) // Do your bulk logic
}
@Incoming("junk")
public void junk(X x) { }
My business logic is pretty different. I'm receiving multiple requests from Kafka and then I'm passing the chunk to be processed with another workflow engine (Temporal). so, Is it possible to receive all the chunk read from Kafka ?
I don't want to push to another kafka queue.
Well, just configure the group as you want. I just provide an example with a batch of 5, but you can do whatever you want, or if batching is not required (that was the idea of this issue), just consumes the records one by one.
Probably, I'm not explaining well. What I want is to get this code working with void return type:
@Incoming("channel-in")
public void processFeed(Multi<RawListingRequest> request) {
}
Can we get this feature working?
As I said it's tracked, but I'm not sure it's a good idea to add this feature. Indeed, you as user would need to subscribe to the received Multi, and managed the subscription, requests, and so on. It is NOT easy.
As posted before, there is a workaround for now.
If you need to received the Multi, use @Inject @Channel("my-data") Multi
So basically there are 2 workarounds: an easy one and a hard one.
Just for me to be understand. What code would you have written in your method?
Is handling the subscription something you feel ok to do?
The code I wrote in my method is call job workflow implemented with Temporal.io . I want to get the chunk together because it’s related to the same user and better to handle them together. Actually I don’t know. how I could manage subscription. But I find away to resolve that from workflow code by waiting a minute to group messages into a queue and then start processing them
also workflow handle throttling because we call another api and we have a limited quota and better to bulk send request instead of sending request by request and waste the limited quota we assigned
Ok, but your code would not work even if that signature would be supported.
Multi is a stream, it's not a group of objects. If you don't subscribe to the Multi, you will never receive the items. And once subscribe, you need to request the items when you have the capacity to handle them. Yes a lot of that is hidden in general, but such signature would require the user to deal with that kind of logic.
That's why it's better to do your logic but return another Multi which will be consumed by a subscriber method (only @Incoming). So the subscription and back-pressure are handled for you.
I want to batch process the Kafka polled payload. But current implementation support gets message by message. Is it possible to support getting the list of pulled records from Kafka.
Example