quarkusio / quarkus

Quarkus: Supersonic Subatomic Java.
https://quarkus.io
Apache License 2.0
13.79k stars 2.68k forks source link

@Incoming should stop when application not ready #23760

Open sclorng opened 2 years ago

sclorng commented 2 years ago

Description

When the application is not ready, message should not be consumed from Channel as they will certainly fail to be processed anyway. In some case, retry logic will lead to the message being discarded or sent to a dead letter queue.

Application can be not ready due to issue with the broker itself but also with other dependencies like database.

In a k8s deployment, if the application is unready, Kubernetes does not route traffic to that instance. The app should behave the same with message listener.

Implementation ideas

No response

quarkus-bot[bot] commented 2 years ago

/cc @geoand, @iocanel

geoand commented 2 years ago

cc @cescoffier @ozangunalp

cescoffier commented 2 years ago

Which connector are you using? If there is an issue with the connection to the broker it should stop and mark the appli a unhealthy

sclorng commented 2 years ago

Which connector are you using? If there is an issue with the connection to the broker it should stop and mark the appli a unhealthy

We use rabbitmq but it's not about the connector but other dependencies like database. Retries can help for transient errors but when they are persistent, all messages goes to dead letter queue and it's a mess to recover. It should just pause and resume.

cescoffier commented 2 years ago

Ah I see. Basically it's something else that is down.

Unfortunately there is no easy way to do that (except for kafka which can be paused (without stopping calling poll or you will have other issues)). We would need to think about that and see what can be done.

First we would need a way to know that the application is unhealthy. There is no 'specified' API (and reactive messaging cannot rely on a Quarkus specific API).

Then, we would need to add pause capability to the connectors. We need to be careful as it can lead to making the app unhealthy which mean we would never be alive again.

Finally, we would need to be notified when the app become healthy again.

The problem should not happen to Kafka which will stop the consumption, but I can see the problem with the other connectors.

@radcortez do you know a way to get access to that health data? Maybe using @Observe (which would be the less intrusive way).

CC @ozangunalp

sclorng commented 2 years ago

Then, we would need to add pause capability to the connectors. We need to be careful as it can lead to making the app unhealthy which mean we would never be alive again.

Finally, we would need to be notified when the app become healthy again.

The problem should not happen to Kafka which will stop the consumption, but I can see the problem with the other connectors.

@radcortez do you know a way to get access to that health data? Maybe using @observe (which would be the less intrusive way).

CC @ozangunalp

Yep, I'd like to see some discussions around this. Maybe there is thing we can do in userland to stop the app (if it cannot be paused) but we need to observe the healthiness. I see we can inject the SmallRyeHealthReporter but I don't know if probe results are cache or request on every getWellness() call which would not possible to do before processing each message.

In Spring boot, there is something like @observe (https://www.baeldung.com/spring-liveness-readiness-probes#2-listening-to-a-change)

cescoffier commented 2 years ago

This interesting aspect is that it's not limited to @Incoming. Typically, should you also pause the @Scheduled tasks, probably the same applies to gRPC streams, infinispan continuous queries...

radcortez commented 2 years ago

@radcortez do you know a way to get access to that health data? Maybe using @observe (which would be the less intrusive way).

Yes, check here: https://github.com/smallrye/smallrye-health/blob/a369f3c9a06a2e8f6b7011e1747f89989face17e/implementation/src/main/java/io/smallrye/health/SmallRyeHealthReporter.java#L52

The problem with the Observer is that HealthChecks are implemented as you call the check and not the service calling you to tell you that is healthy. I guess if we have a periodic health checker, it could report to an observer.

cescoffier commented 2 years ago

Unfortunately, we need to avoid direct access to SmallRye implementation classes, or we would need to implement a layer in RM and implement it in Quarkus.

An idea could be to have the periodic task in Quarkus. This task would send a specific event (which state do we want alive/ready / not-ready / not-alive, but what about all the other possible checks). That event would be consumed in the various extensions and each of them will do what needs to be done (for RM, pausing the incoming connectors).

An important note: pausing a connector may not stop the delivery immediately. It will stop it eventually. So, the consumption will stop once you receive the event + this lag.

radcortez commented 2 years ago

Unfortunately, we need to avoid direct access to SmallRye implementation classes, or we would need to implement a layer in RM and implement it in Quarkus.

What if we expose this in the SR Health API layer?

cescoffier commented 2 years ago

Unfortunately, we need to avoid direct access to SmallRye implementation classes, or we would need to implement a layer in RM and implement it in Quarkus.

What if we expose this in the SR Health API layer?

If the API is in a separate 'api' only module, yes, it could work. The @Observe would not be called if it does not use SR Health, but that's fine (other impl could use the same interface or even better: push it to the spec :-))

radcortez commented 2 years ago

If the API is in a separate 'api' only module, yes, it could work.

Yes, SR Health has an API-only module. @xstefank what do you think?

xstefank commented 2 years ago

Yes, API is a standalone module, but why we can't use normal CDI observers for this? Do we need a custom API? Well, SR Health can now be used in non-CDI environment, but I guess if normal CDI observers would work, we already have a standard API.

radcortez commented 2 years ago

Do we have an observer-based implementation?

cescoffier commented 2 years ago

@xstefank but what would be the received event structures? You need classes to represent these (no no... byte[] or java.lang.Object are not great ideas)

xstefank commented 2 years ago

hmm, no, we don't have anything like this for now. But no problem adding it. Just I am trying to narrow down how such API should work. reporter.registerObserver((state) -> {}) ?

cescoffier commented 2 years ago

I would have used CDI events, which reduce even more the API surface (as CDI is a common piece).

ozangunalp commented 2 months ago

There is now a way to pause/resume channels using reactive messaging. Hence once could use that on a health-check listener/observer.

aureamunoz commented 1 week ago

Is this still relevant? Can I close it?

ozangunalp commented 1 week ago

I am not sure. For Reactive Messaging there is now a way to pause/resume consumption, but for health checks to be observed, we need #564