smallrye / smallrye-reactive-messaging

SmallRye Reactive Messaging
http://www.smallrye.io/smallrye-reactive-messaging/
Apache License 2.0
239 stars 178 forks source link

@Outgoing messages to Kafka get 'lost' while initialising application #845

Closed kabir closed 3 years ago

kabir commented 3 years ago

If I do something simple not backed by anything external it works fine

@ApplicationScoped
public class Bean {
    @Outgoing("source")
    public PublisherBuilder<Integer> source() {
        return ReactiveStreams.of(10, 200, 3000);
    }

    @Incoming("source")
    public void toUpperCase(Integer payload) {
        System.out.println(payload);
    }

However, if I adjust this to be backed by Kafka it no longer works (to-kafka and from-kafka are mapped in my microprofile-config.properties):

@ApplicationScoped
public class Bean {
    @Outgoing("to-kafka")
    public PublisherBuilder<Integer> source() {
        return ReactiveStreams.of(10, 200, 3000);
    }

    @Incoming("from-kafka")
    public void toUpperCase(Integer payload) {
        System.out.println(payload);
    }

I played a bit with the @Outgoing method, and this does not work either:

    @Outgoing("to-kafka")
    public Publisher<Integer> source() {
        return ReactiveStreams.of(10, 200, 3000).buildRs();
    }

In both cases I see the source() method get called before it does all the Kafka connection stuff, so perhaps that is why.

It seems to work again if I 'delay' the sending:

    private Random random = new Random();

    @Outgoing("to-kafka")
    public Publisher<Integer> source() {
        return Flowable.interval(5, TimeUnit.SECONDS)
                .map(tick -> random.nextInt(100));
    }
cescoffier commented 3 years ago

That should not be the case. Can you provide a reproducer?

On Sat 31 Oct 2020 at 16:03, Kabir Khan notifications@github.com wrote:

If I do something simple not backed by anything external it works fine

@ApplicationScoped public class Bean { @Outgoing("source") public PublisherBuilder source() { return ReactiveStreams.of(10, 200, 3000); }

@Incoming("source")
public void toUpperCase(Integer payload) {
    System.out.println(payload);
}

However, if I adjust this to be backed by Kafka it no longer works (to-kafka and from-kafka are mapped in my microprofile-config.properties):

@ApplicationScoped public class Bean { @Outgoing("to-kafka") public PublisherBuilder source() { return ReactiveStreams.of(10, 200, 3000); }

@Incoming("from-kafka")
public void toUpperCase(Integer payload) {
    System.out.println(payload);
}

I played a bit with the @Outgoing https://github.com/Outgoing method, and this does not work either:

@Outgoing("to-kafka")
public Publisher<Integer> source() {
    return ReactiveStreams.of(10, 200, 3000).buildRs();
}

In both cases I see the source() method get called before it does all the Kafka connection stuff, so perhaps that is why.

It seems to work again if I 'delay' the sending:

private Random random = new Random();

@Outgoing("generated-price")
public Publisher<Integer> source() {
    return Flowable.interval(5, TimeUnit.SECONDS)
            .map(tick -> random.nextInt(100));
}

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/smallrye/smallrye-reactive-messaging/issues/845, or unsubscribe https://github.com/notifications/unsubscribe-auth/AADCG7PB3DUU5ZFFEAQJMTDSNQRKRANCNFSM4TF532ZA .

kabir commented 3 years ago

Hi,

I have created a reproducer based on the Kafka example: https://github.com/smallrye/smallrye-reactive-messaging/compare/2.4.0...kabir:reproducer-845?expand=1

As it is it reproduces the behaviour as in no messages are received.

There is a commented out monstrosity alternative @Outgoing method with what I resorted to in the WildFly TS to work with what we have there to simulate a delay. If I use that method instead I see messages being received.

cescoffier commented 3 years ago

Hum, did you forget to set:

mp.messaging.incoming.from-kafka.auto.offset.reset=earliest

Without, your consumer starts at the latest offset of the partition. If the consumer joins the cluster after the producer it would be 3 - and so don't get any message (as offset 0 is 'hello', 1 is 'reactive' and '2' is messaging).

In the log, you should see something like (with the setting)

18:56:31,211 INFO  [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=kafka-consumer-from-kafka, groupId=...] Resetting offset for partition ...-0 to offset 0.  <----- HERE 

Without, you will have something like:

19:01:05,174 INFO  [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=kafka-consumer-from-kafka, groupId=...] Resetting offset for partition ...-0 to offset 3. <----- HERE
kabir commented 3 years ago

I wasn't aware of that property. But it works fine now when I added it. Thanks for the explanation!