0x4b53 / amqp-rpc

🐰 Framework to use RabbitMQ as RPC
MIT License
45 stars 8 forks source link

Server should restart on failure to Ack messages #107

Closed anders-m-ek closed 5 months ago

anders-m-ek commented 5 months ago

Service got stuck with massive number of unacked messages after some network glitch. Upon failure to Ack, the server should attempt restart. Log;

could not Ack delivery (77e3447b-907e-4d70-8e06-8700cda38621): Exception (504) Reason: "channel/connection is not open"
bombsimon commented 5 months ago

Are you using the ack aware acknowledger or are you acking manually? By default messages are not acknowledge but from the log it looks like the ack middleware is used: https://github.com/0x4b53/amqp-rpc/blob/57892bfe36babbcfac9074081e2edd19ace23cb3/middleware/ack.go#L26-L29

It's a bit tricky to handle restarts when using that because in the middleware there's no shared context to the server go routine, it's all just automatically run inside the handler (which doesn't return anything): https://github.com/0x4b53/amqp-rpc/blob/57892bfe36babbcfac9074081e2edd19ace23cb3/server.go#L436

One way to do this is to simply manually ack in your handlers but that will not automatically restart the server. It will also be a bit cumbersome to have to call this from every handler.

Since you don't have to acknowledge messages I don't think there's a good way to always acknowledge and take action based on the result. Instead I think a more reasonable approach would be to run a separate restart loop and have a context aware acknowledger that can restart the server as needed. Maybe that's something we could bundle in this repo too!

Something like this (not tested, semi pseudo code):

func runForever() {
     // Do your regular setup with SIGINT listner
    shutdownCh := setupShutdownCh()

     // Create a channel with a type holding context
    myMiddlewareCh:= make(chan MyType)

    // Create a middleware that acks but knows about this channel, similar to the built in middleware
    // acknowledging ever delivery but on failure has access to the channel.
    myMiddleware := setupMiddlewareThatAcks(myMiddlewareCh)

    for {
        // Setup the server with this new middleware
        server := setupAmqpServer(myMiddleware)

        // Run it in a separate go routine
        go server.ListenAndServe()

        // Check for either a chutdown or an acknoledger failure
        select {
        case <-shutdownCh:
            // Regular shutdown, exit the loop
            break
        case myContext :=<-myMiddlewareCh:
            // We got info from our middleware, take action! I.e. shutdown and continue
           if myContext.FailedAndShouldRestart:
               server.Stop()
               continue
           }

           panic(myContext.Reason)
    }
}

Another stab at this would be to extend the existing middleware and have it retry failed acks but I'm not sure how that would work. I guess we can keep them in memory and retry the ones that fail. Or, if that's fine with the user, just hang and retry the failed ack immediately until it succeeds. However if this doesn't work and a server restart is needed this will not be a good approach.

bombsimon commented 5 months ago

I created #108 and #109. The idea is to use the combination of the new middleware signature and the restart channel to trigger the restart.

restartCh := make(chan struct{})

ackMiddlewareFn := AckDelivery(func (err error, correlationID string) {
    log.Printf("could not ack delivery (%s): %v\n", correlationID, err)
    log.Println("restarting server")

    restartCh <- struct{}{}
})

s:= NewServer(uri).
    WithRestartChan(restartCh)
    AddMiddleware(ackMiddlewareFn)

s.ListenAndServe()

However seems like we haven't pinned any version for golangci-lint so now there's a ton of linting issues. I'll try to get to those first and rebase my PRs once done.

akarl commented 5 months ago

There is unfortunately no way to Ack a message on a new Channel. The only thing we can do when this fails is to reconnect and let RabbitMQ redeliver the messages.

bombsimon commented 5 months ago

Now with the new restart functionality and the updated middleware I think it's the beat we can do from a middleware perspective. Even more fine grained handling would have to be implemented outside this framework.