onebeyond / rascal

A config driven wrapper for amqp.node supporting multi-host connections, automatic error recovery, redelivery flood protection, transparent encryption / decryption and channel pooling.
MIT License
451 stars 69 forks source link

Add the ability to cancel the channel without forcing it to close #171

Closed avra-m3 closed 3 years ago

avra-m3 commented 3 years ago

I'd like to stop consuming messages when I receive a SIGTERM and wait for some long running tasks to complete and ack/nack. before calling broker.shutdown().

The deferChannelClose option is not workable as it forces me to wait an arbitrary fixed period of time after which if any tasks are still running they can no longer be acknowledged. Is it possible to add a parameter to avoid calling scheduleClose on the channel? or alternatively expose the channel so I can call cancel directly?

cressie176 commented 3 years ago

Hi @Hammer-Inc, Agree this is a reasonable thing to do, but unfortunately not currently supported by rascal. I'll give some thought to how it might be implemented.

avra-m3 commented 3 years ago

Hey @cressie176,

If I raised a pr would you consider it? I went diving through the code to understand whether I could achieve this, so have a rough idea of how to achieve it.

Thanks for the fast reply 😃

cressie176 commented 3 years ago

Hi @Hammer-Inc,

Very happy to accept contributions to Rascal, however in this case I haven't thought about which API we should adopt. So far I've avoided exposing channels to the user application so Rascal can manage the error handling. I'm as yet on whether I would prefer a different method, or an option passed to the existing one.

I can also envisage the following complications

  1. Support for the same functionality from broker.unsubscribeAll
  2. What to do if a channel error occurs after cancelling the consumer but before closing the channel. The existing error handler would grab a new channel and start consuming again. Not having an error handler would crash your app.

I'll give it some thought over the weekend. Also happy for you to comment with a proposal / some options.

avra-m3 commented 3 years ago

Good points; I definitely agree that exposing channels is likely not be the best way to solve this. I see exposing a separate method being counterintuitive to anyone familiar with amqp as it currently stands, and I'm unsure what the naming would look like - can't call it unsubscribe which is my first thought.

Regarding point 2, what happens if such an error occurs during the deferChannelClose period currently? my thinking was that the same behavior would be adopted. The primary difference is the user must manually close the channel rather than rascal doing it for them after deferChannelClose.

avra-m3 commented 3 years ago

I'll leave you with it over the weekend and avoid re-writing our consumer in amqplib as it's getting more complex by the minute.

Thanks again for the quick response 😄

cressie176 commented 3 years ago

On reflection I think the graceful shutdown behaviour you want is superior to the existing deferCloseChannel functionality and should be provided out of the box. I believe the work involved is as follows...

cressie176 commented 3 years ago

Precursor here. @BorePlusPlus, any chance of a review?

cressie176 commented 3 years ago

I've had a stab at the above. PR can be found here

avra-m3 commented 3 years ago

Replace the deferCloseChannel functionality (1, 2) with an inflight message counter held in the SubscriberSession, incremented when an acknowledgeable (i.e. subscriber.options.noAck != true) message is delivered and decremented after the message is acked or nacked. Rascal should also provide a timeout option in case the inflight messages is not acked or nacked within a reasonable time, but the timeout should not be defaulted.

This is exactly what I was trying to achieve, awesome to see how fast you've got on to it, absolutely interested in alpha testing your work :)

cressie176 commented 3 years ago

Would love an alpha test - are you able to build directly from GitHub?

I still have to do some destructive manual testing for things like a queue deletion, heartbeat failure, server restart, network failure etc.

Also I want to check whether thinks like broker.nuke work properly if the unsubscribe times out.

cressie176 commented 3 years ago

Manual tests all passed.

avra-m3 commented 3 years ago

I've added your commit to our code and it works exactly as expected, no changes or hacking around required.

For reference, here is our shutdown code with this change;

process.once("SIGTERM", async () => {
    process.on("SIGTERM", () =>
      console.log(
        "Shutdown in progress already, send SIGKILL to shutdown immediately"
      )
    );
    process.on("SIGINT", () => process.exit(1));
    await broker.shutdown();
    process.exit(0);
  });

let's just say it used to be much longer that that 😅

cressie176 commented 3 years ago

Great stuff. I hadn't ever thought of nesting a second SIGTERM to warn people that shutdown was already in progress. Nice idea.

cressie176 commented 3 years ago

@Hammer-Inc, a small change...

Now indirect calls to cancel a consumer (i.e. broker.shutdown, broker.nuke and broker.bounce) will emit an error if the timeout is expired but continue, whereas direct calls (i.e. broker.unsubscribeAll and subscription.cancel will yield/reject if they fail to complete in time.

cressie176 commented 3 years ago

FYI hoping to publish this weekend. Sorry it's been so long.

cressie176 commented 3 years ago

Published as Rascal@14 (decided to do a single release after all)