moleculerjs / moleculer-channels

Reliable messages for Moleculer services via external queue/channel/topic.
MIT License
73 stars 15 forks source link

[Redis] Claim "NACKed" messages #3

Closed AndreMaz closed 3 years ago

AndreMaz commented 3 years ago

Redis streams does not have nack() method to reject messages.

Workaround:

  1. If a message was not acknowledged for a certain amount of time, then it is considered NACKed

From the docs https://redis.io/commands/xpending

Fetching data from a stream via a consumer group, and not acknowledging such data, has the effect of creating pending entries.

  1. Each topic, periodically checks for pending entries and claims them via the XAUTOCLAIM
  2. Claimed messages are processed and ACKed

Things to discuss:

Note: XAUTOCLAIM requires Redis >= 6.2 (released on 22 February). We need to update the info in the Readme.md

AndreMaz commented 3 years ago

AMQP does not like this test and produces the following error:

image

However, it should be possible to publish messages to any channel even if nobody is listening to it at the moment.

icebob commented 3 years ago

If a message is constantly being rejected (by not being ACKed) it will be in a constant "claim-reject" loop. How to handle it? For example, after 10 failing attempts break the loop by ACK the message and log an error warning about the failure?

I don't think we can "drop" the failed messages. It can cause data loss. Maybe we can dedicate a list in Redis with a configurable name e.g. "{{topic}}-FAILED_MESSAGES" and put it after "x" retries. WDYT?

What's the appropriate time to consider a message NACKed? I've set it to 10 milliseconds but it should be much higher

Yeah, should be much greater because it can be a job queue where you process long-running tasks. So would be good to configure it as a channel option and a global option. And the default value can be 1 minute.

AMQP does not like this test and produces the following error:

I think RabbitMQ doesn't support publishing a message to a non-existing queue. And we can't create the queue-exchange pair at publishing because we don't know the consumer group names. Moreover, we should make a comparison matrix for adapters to describe which adapter supports which features... :(

Other use cases: What will happen, if you have a message "test.topic" with 3 groups (users, mail, payment). The users and mail processed the message and lacked, but payment is not. The claimed message will be processed again in the users and mail services? Or just in payment service?

AndreMaz commented 3 years ago

I don't think we can "drop" the failed messages. It can cause data loss. Maybe we can dedicate a list in Redis with a configurable name e.g. "{{topic}}-FAILED_MESSAGES" and put it after "x" retries. WDYT?

I really don't know what's the best way to handle this. If the goal is just to ensure that the message was delivered then we need to ACK them regardless of the error/success during the message processing. If the goal is to have some "retry" policy where NACKed messages have an "X" number of processing attempts then the {{topic}}-FAILED_MESSAGES is a good idea. However, if we go this way and place the messages in a dedicated topic, what should happen next? Just place the messages there and ignore them?

Yeah, should be much greater because it can be a job queue where you process long-running tasks. So would be good to configure it as a channel option and a global option. And the default value can be 1 minute.

Good idea. I will do it

I think RabbitMQ doesn't support publishing a message to a non-existing queue. And we can't create the queue-exchange pair at publishing because we don't know the consumer group names. Moreover, we should make a comparison matrix for adapters to describe which adapter supports which features... :(

Yeah, this makes the integration tests more difficult.

What will happen, if you have a message "test.topic" with 3 groups (users, mail, payment). The users and mail processed the message and acked, but payment is not. The claimed message will be processed again in the users and mail services? Or just in payment service?

No, message won't be processed again by the "users" nor "mail" services. In the example that you've provided you would have 3 dedicated consumer groups. An error in payment service only affect its consumer group.

However, assuming that you have 2 (original + replica) instances of "users" service and one of them rejects. In this case, the message would be claimed (after some time) by another "users" service instance. If both services can't process the message then it will enter into the "claim-reject" loop

image

icebob commented 3 years ago

However, if we go this way and place the messages in a dedicate topic, what should happen next? Just place the messages there and ignore them?

Yes. Because it depends on the business logic, the devs can write an action which reads the failed messages and republish, or just clear the messages, or send emails when a messages added to the topic....etc Imagine, there is a checkout flow where you should call the payment processor, but a fresh deployment crashes a payment service so every checkout is in pending state. Only noticed by the developer after 1 day. If we drop these messages then all checkouts in 1 day loss. But if there is a failed topic, after fixing the issue, the developer can "replay" the failed messages.

But we can make it optional. E.g. with a "failTopic" option which is null. If it's null, we drop the messages after x retries, if it contains a topic name, we will put the messages into the topic after retries.

Yeah, this makes the integration tests more difficult.

Unfortunately yes. But I will test it, maybe there is auto-create feature.

No, message won't be processed again by the "users" nor "mail" services. In the example that you've provided you would have 3 dedicated consumer groups. An error in payment service only affect its consumer group.

Thanks for the clarification, it will work properly.

AndreMaz commented 3 years ago

But we can make it optional. E.g. with a "failTopic" option which is null. If it's null, we drop the messages after x retries, if it contains a topic name, we will put the messages into the topic after retries.

Alright, looks good to me.

Thanks for the clarification, it will work properly.

After looking closer at the code I think that we need to remove this line https://github.com/moleculerjs/moleculer-channels/blob/6d27b87e9199f6f5e01676830379445c9b133b44/src/index.js#L97

to avoid creating situations that you've described in the original question. If the users, mail, payment are in the same group then messages will be balanced across the 3 consumers, which is not the desired behavior. The correct way of balancing is by having replicas as shown in the image above.

icebob commented 3 years ago

to avoid creating situations that you've described in the original question. If the users, mail, payment are in the same group then messages will be balanced across the 3 consumers, which is not the desired behavior. The correct way of balancing is by having replicas as shown in the image above.

I think the line is good. It's just an option that the dev can overwrite the group which is the name of service. But mostly it will be the service name.

AndreMaz commented 3 years ago

Ok. I'll add the FAILED_MESSAGES logic tomorrow