quarkusio / quarkus

Quarkus: Supersonic Subatomic Java.
https://quarkus.io
Apache License 2.0
13.44k stars 2.58k forks source link

Kafka event in a @Startup application is not emitted #12820

Closed r00ta closed 3 years ago

r00ta commented 3 years ago

Describe the bug Using the smallrye kafka extension, if a @PostConstruct method of an application annotated with @Startup creates an event, it is not emitted at all. See below an example:

import javax.annotation.PostConstruct;
import javax.inject.Singleton;

import io.quarkus.runtime.Startup;
import io.reactivex.BackpressureStrategy;
import io.reactivex.subjects.PublishSubject;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
@Startup
public class Emitter {

    private static final Logger LOGGER = LoggerFactory.getLogger(Emitter.class);

    private final PublishSubject<String> eventSubject;

    private int counter = 0;

    public Emitter() {
        this.eventSubject = PublishSubject.create();
    }

    @PostConstruct
    public void publish() {
        this.emit("startup");
    }

    @Outgoing("my-topic")
    public Publisher<String> getEventPublisher() {
        return eventSubject.toFlowable(BackpressureStrategy.BUFFER);
    }

    public void emit(final String payload) {
        LOGGER.info("Emit counter: " + counter++);
        eventSubject.onNext(payload);
    }
}

Expected behavior The event should be pushed to the kafka topic.

Actual behavior The event is not pushed.

To Reproduce I've created a project to reproduce the issue: https://github.com/r00ta/issues-reproducers/tree/main/quarkus-kafka-startup . The application also exposes a path /force that you can use to force the creation of the event: only such events are emitted properly.

Steps to reproduce the behavior:

  1. git clone https://github.com/r00ta/issues-reproducers.git
  2. cd quarkus-kafka-startup
  3. docker-compose up
  4. in another shell run mvn clean compile quarkus:dev
  5. open localhost:9000 and have a look at the my-topic topic on kafdrop: no events are there at all.
  6. you can get localhost:8080/force so to push events, you can check on kafdrop that the events are there.

Configuration

# Kafka Tracing
mp.messaging.outgoing.my-topic.group.id=kogito-runtimes
mp.messaging.outgoing.my-topic.connector=smallrye-kafka
mp.messaging.outgoing.my-topic.topic=my-topic
mp.messaging.outgoing.my-topic.value.serializer=org.apache.kafka.common.serialization.StringSerializer

Screenshots** Screenshot from 2020-10-20 11-05-43

As you can see the event is created before the mediators are connected, I guess the problem is there.

Environment (please complete the following information):

Additional context (Add any other context about the problem here.)

stuartwdouglas commented 3 years ago

The issue is here: https://github.com/smallrye/smallrye-reactive-messaging/blob/master/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/extension/MediatorManager.java#L171

As part of the connection process this will get a reference to the bean. This results in the post construct method being called before everything is connected.

r00ta commented 3 years ago

Hi @stuartwdouglas , thank you very much for checking this. Would you suggest any workaround in the meanwhile?

stuartwdouglas commented 3 years ago

I think if you moved the publisher to its own class it would work around it.

r00ta commented 3 years ago

Hi @stuartwdouglas , thank you very much for the tip! With the following refactoring:

@ApplicationScoped
public class MyPublisher {
    private static final Logger LOGGER = LoggerFactory.getLogger(MyPublisher.class);

    private final PublishSubject<String> eventSubject = PublishSubject.create();

    private int counter = 0;

    @Outgoing("my-topic")
    public Publisher<String> getEventPublisher() {
        return eventSubject.toFlowable(BackpressureStrategy.BUFFER);
    }

    public void emit(final String payload) {
        LOGGER.info("Emit counter: " + counter++);
        eventSubject.onNext(payload);
    }
}

And

@Singleton
@Startup
public class Emitter {

    @Inject
    MyPublisher myPublisher;

    public void publish(@Observes StartupEvent event) {
        myPublisher.emit("startup");
    }
}

Everything works properly!

stuartwdouglas commented 3 years ago

cc @cescoffier

cescoffier commented 3 years ago

Oh yes, that's a known issue. I need to improve the wiring.

cescoffier commented 3 years ago

Just a head's up - the new wiring algorithm that will be included in SmallRye Reactive Messaging 2.6.0 fixes it.