adjust / rmq

Message queue system written in Go and backed by Redis
MIT License
1.57k stars 206 forks source link

Differentiate between Reject, Ack, Push errors in DeliveryError #151

Closed PaulisMatrix closed 1 year ago

PaulisMatrix commented 1 year ago

Hey, is it possible to find out which error actually got recorded as DeliveryError? If so can you please let me know.

Referencing this: https://github.com/adjust/rmq/blob/93f17171da879a8e7b84ff959d2684562fab7e47/example/consumer/main.go#L115, it would be helpful to find out which error we are getting so that depending on its count we can decide what to do with it.

For example, if I get ack error count as 3, I can call Delivery.Reject() specifically for this error count only and not for others.

case *rmq.DeliveryError:
  // delivery error on ack, reject, push.
  // maintain a count to reject after X retries
  if err.Count >= retryCount {
      // reject the request
      log.Printf("request rejected after %d retries", retryCount)
      err.Delivery.Reject()
  }
  log.Print("delivery error: ", err.Delivery, err)

I would really appreciate if I get answers to my below questions :D

  1. Any reason as to why the no TTL is set for different keys which are being created so as to not maintain stale keys? Every time the server is started, again new redis keys are getting created. For example, the consumers set. We have to perform manual cleanups.

"rmq::connection::mart-ok5yeF::queue::[orders]::consumers"

"rmq::connection::mart-d63YXn::queue::[orders]::consumers"

  1. Any reason why the rejected deliveries are not directly deleted and added to a separate list? To this key delivery.rejectedKey Those would again remain stale right?

Thank you!!! @wellle

wellle commented 1 year ago

Hey!

  1. I don't think it's currently possibly which errors are from Ack and which are from Reject. The DeliveryError does contain the Delivery though, so maybe you could keep track of that yourself?
  2. There are no TTLs on most keys because we don't want data loss. As long as deliveries are in Redis they are not being lost. If you had a TTL then deliveries would be inaccessible after a while. This doesn't seem desirable.
  3. You shouldn't have to do any manual cleanups though. Please read about the cleaner here and make sure you have exactly one cleaner instance active per queue system. This will automatically move deliveries back from unacked to ready if a consumer died for some reason before it could ack/reject the delivery. If you do manual cleanup you're likely dropping some pending deliveries.
  4. Rejected deliveries get moved to a separate queue to allow reprocessing later. See Queue.ReturnRejected(). For example you may have had a temporary issue leading to failed deliveries. Once the issue is fixed you can return the rejected deliveries back to the ready list to have them be attempted again.
PaulisMatrix commented 1 year ago

Hey, thank you for replying!!

For the 3rd point, I should've been more clear. By manual cleanup I mean, manually deleting the keys those were initialized before on server startup. As I added, with each server start, they keys for queues,consumers,etc are getting created again but the ones which were already created, remain in redis right. I guess we can't really avoid that.

Yes aware of cleaner and I do have setup one cleaner per instance.

wellle commented 1 year ago

Most of the Redis keys we use are lists. Once lists are empty the keys disappear. So if you see a key and delete it you may be deleting data which has not been consumed.

Can you share which keys you see and manually delete? Feel free to redact consumer and queue names etc.

PaulisMatrix commented 1 year ago

Yeah, talking about these keys. Most of them are redis sets

1) "rmq::queues"
2) "rmq::connection::test-M31Wux::heartbeat"
3) "rmq::connection::test-M31Wux::queue::[retry-orders]::consumers"
4) "rmq::connection::test-dnuCVv::heartbeat"
5) "rmq::connections"
6) "rmq::connection::test-M31Wux::queues"
7) "rmq::connection::test-M31Wux::queue::[orders]::consumers"
wellle commented 1 year ago

I just did a test using the examples found in this repo:

  1. Start a producer, consumer and cleaner.
  2. Observe redis keys with redis-cli -n 2 keys "*". There are keys related to two queues and three connections.
  3. Stop the producer and consumer.
  4. Wait up to two minutes for the cleaner to clean up.
  5. There are keys related to two queues and one connection (the cleaner).
  6. Stop the cleaner and wait for up to a minute for the heartbeat to stop.
  7. There keys related to two queues and no connections:
    1) "rmq::queue::[foobars]::ready"
    2) "rmq::connections"
    3) "rmq::queues"
    4) "rmq::queue::[things]::rejected"
    5) "rmq::queue::[things]::ready"

You have ready and rejected deliveries in the queues. You shouldn't drop them using Redis commands. You can use PurgeReady or PurgeRejected instead if needed.

And you have the list of queues and connections. The cleaner connection is still listed because that connection hasn't been cleaned yet.

Cleaning works as expected. Nothing seems to be left behind accidentally. You should not need to delete anything manually using Redis commands.

PaulisMatrix commented 1 year ago

Alright, got it. Thank you for the detailed clarification!!

I understood my mistake. I was running the cleaner in my worker only, in a separate goroutine. So whenever I stopped worker, cleaner would also stop meaning connections will not get cleaned up.

Made the change to run them separately. 😅