streadway / amqp

Go client for AMQP 0.9.1
http://godoc.org/github.com/streadway/amqp
BSD 2-Clause "Simplified" License
4.88k stars 621 forks source link

Noticed A Confirms Issue After Connection Loss #459

Open houseofcat opened 4 years ago

houseofcat commented 4 years ago

When attempting to use a ChannelPool of ackable channels with a Confirmation chan hosted inside a wrapper struct (ChannelHost), I noticed that the Confirmation channel exploded with Confirmations, all identical DeliveryTags (usually 0) no matter the size of the chan buffer during a amqp.Connection disconnect.

The chan used by amqp.Channel for NotifyPublish was also initiated close to from the amqp library. Which is fine, except I became deadlocked on getting these confirmations out of the chan, of which the buffer was maxed out. I have ran into a pain point where I would have loved to be able to add a listener and then removed that listener manually so I could increase the longevity of pooled amqp.Channels.

What triggered the connection closure was a little chaos engineering. Imagine a pool of amqp.Channels ready to be used with a PublishWithConfirmation setup, let it run for a few seconds, then severe all amqp.Connections manually with:

rabbitmqctl.bat close_all_connections "suck it long, suck it hard trebek"

Now on recovery, we obviously reconstructed the amqp.Channel assigned it a new Confirmation chan and proceeded to use it, assuming we were successful on amqp.Connection recovery.

Now, because we are doing a confirmation and I don't allow the individual instances of amqp.Channel to be used in parallel, it is safe for me to flush all preceding confirmations that might be sitting the buffer and then do a synchronous next publish by waiting for the very next confirmation. So I created this this FlushConfirms() which drained the confirmation buffer. This was working until my Chaos Engineering - so the issue only arises after severing the main amqp.Connection.

Code that was called after amqp.Connection was fully recovered and we need the amqp.Channel back in the Host.

// MakeChannel tries to create (or re-create) the channel from the ConnectionHost its attached to.
func (ch *ChannelHost) MakeChannel() (err error) {
    ch.chanLock.Lock()
    defer ch.chanLock.Unlock()

    ch.Channel, err = ch.connHost.Connection.Channel()
    if err != nil {
        return err
    }

    if ch.Ackable {
        err = ch.Channel.Confirm(false)
        if err != nil {
            return err
        }

        ch.Confirmations = make(chan amqp.Confirmation, 100)
        ch.Channel.NotifyPublish(ch.Confirmations)
    }

    ch.Errors = make(chan *amqp.Error, 100)
    ch.Channel.NotifyClose(ch.Errors)

    return nil
}

// FlushConfirms removes all previous confirmations pending processing.
func (ch *ChannelHost) FlushConfirms() {
    ch.chanLock.Lock()
    defer ch.chanLock.Unlock()

    counter := 0
FlushLoop:
    for {
        if ch.connHost.Connection.IsClosed() {
            return
        }

        select {
        case confirmation := <-ch.Confirmations: // Some weird use case where the Channel is being flooded with confirms after connection disrupt
            counter++
            if counter == 10 {
                fmt.Printf("ChannelID: %d confirmations flooded (confirmation deliverytag: %d) - initiating bypass!\r\n", ch.ID, confirmation.DeliveryTag)
                break FlushLoop
            }
        default:
            return
        }
    }
}

This FlushConfirms() produces the following output for example! So I had to receive at least 10 messages, printing out the last of the 10, and you can see it was deliverytag 0, so it's getting reused even thought that isn't supposed to be happening. The interesting thing here, is that only one channel was used for this publish! Each amqp.Channel gets a unique chan so everything should in theory multi-thread safe. I am not sure how this is happening!

API server listening at: 127.0.0.1:17827
ChannelID: 22 - confirmations flooded (confirmation deliverytag: 0) - initiating bypass!
ChannelID: 24 - confirmations flooded (confirmation deliverytag: 0) - initiating bypass!
ChannelID: 2 - confirmations flooded (confirmation deliverytag: 0) - initiating bypass!
ChannelID: 4 - confirmations flooded (confirmation deliverytag: 0) - initiating bypass!
ChannelID: 6 - confirmations flooded (confirmation deliverytag: 0) - initiating bypass!
ChannelID: 8 - confirmations flooded (confirmation deliverytag: 0) - initiating bypass!
ChannelID: 10 - confirmations flooded (confirmation deliverytag: 0) - initiating bypass!
ChannelID: 12 - confirmations flooded (confirmation deliverytag: 0) - initiating bypass!
ChannelID: 14 - confirmations flooded (confirmation deliverytag: 0) - initiating bypass!
ChannelID: 16 - confirmations flooded (confirmation deliverytag: 0) - initiating bypass!

I did confirm the exact same value and tag.

And finally the Publish

// PublishWithConfirmation sends a single message to the address on the letter with confirmation capabilities.
// This is an expensive and slow call - use this when delivery confirmation on publish is your highest priority.
// A timeout failure drops the letter back in the PublishReceipts.
// A confirmation failure keeps trying to publish (at least until timeout failure occurs.)
func (pub *Publisher) PublishWithConfirmation(letter *Letter, timeout time.Duration) {

    if timeout == 0 {
        timeout = pub.publishTimeOutDuration
    }

    timeoutAfter := time.After(timeout)

GetChannelAndPublish:
    for {
        // Has to use an Ackable channel for Publish Confirmations.
        chanHost := pub.ConnectionPool.GetChannelFromPool()

        // Flush all previous publish confirmations
        chanHost.FlushConfirms()

    Publish:
        err := chanHost.Channel.Publish(
            letter.Envelope.Exchange,
            letter.Envelope.RoutingKey,
            letter.Envelope.Mandatory,
            letter.Envelope.Immediate,
            amqp.Publishing{
                ContentType:  letter.Envelope.ContentType,
                Body:         letter.Body,
                Headers:      amqp.Table(letter.Envelope.Headers),
                DeliveryMode: letter.Envelope.DeliveryMode,
            },
        )
        if err != nil {
            pub.ConnectionPool.ReturnChannel(chanHost, true)
            continue // Take it again! From the top!
        }

        // Wait for very next confirmation on this channel, which should be our confirmation.
        for {
            select {
            case <-timeoutAfter:
                pub.publishReceipt(letter, fmt.Errorf("publish confirmation for LetterId: %d wasn't received in a timely manner (%dms) - recommend retry/requeue", letter.LetterID, timeout))
                pub.ConnectionPool.ReturnChannel(chanHost, false)
                return

            case confirmation := <-chanHost.Confirmations:

                if !confirmation.Ack { // retry publishing
                    goto Publish
                }

                pub.publishReceipt(letter, nil)
                pub.ConnectionPool.ReturnChannel(chanHost, false)
                break GetChannelAndPublish

            default:

                time.Sleep(time.Duration(time.Millisecond * 3))
            }
        }
    }
}

I have solved it by switching to using transient channels on a per publish (with confirmations) but I know it's not super ideal just that it is working.

// PublishWithConfirmationTransient sends a single message to the address on the letter with confirmation capabilities on transient Channels.
// This is an expensive and slow call - use this when delivery confirmation on publish is your highest priority.
// A timeout failure drops the letter back in the PublishReceipts. When combined with QueueLetter, it automatically
//   gets requeued for re-publish.
// A confirmation failure keeps trying to publish (at least until timeout failure occurs.)
func (pub *Publisher) PublishWithConfirmationTransient(letter *Letter, timeout time.Duration) {

    if timeout == 0 {
        timeout = pub.publishTimeOutDuration
    }

    timeoutAfter := time.After(timeout)

    for {
        // Has to use an Ackable channel for Publish Confirmations.
        channel := pub.ConnectionPool.GetTransientChannel(true)
        confirms := make(chan amqp.Confirmation, 1)
        channel.NotifyPublish(confirms)

    Publish:
        err := channel.Publish(
            letter.Envelope.Exchange,
            letter.Envelope.RoutingKey,
            letter.Envelope.Mandatory,
            letter.Envelope.Immediate,
            amqp.Publishing{
                ContentType:  letter.Envelope.ContentType,
                Body:         letter.Body,
                Headers:      amqp.Table(letter.Envelope.Headers),
                DeliveryMode: letter.Envelope.DeliveryMode,
            },
        )
        if err != nil {
            channel.Close()
            if pub.sleepOnErrorInterval < 0 {
                time.Sleep(pub.sleepOnErrorInterval)
            }
            continue // Take it again! From the top!
        }

        // Wait for very next confirmation on this channel, which should be our confirmation.
        for {
            select {
            case <-timeoutAfter:
                pub.publishReceipt(letter, fmt.Errorf("publish confirmation for LetterId: %d wasn't received in a timely manner (%dms) - recommend retry/requeue", letter.LetterID, timeout))
                channel.Close()
                return

            case confirmation := <-confirms:

                if !confirmation.Ack {
                    goto Publish //nack has occurred, republish
                }

                // Happy Path, publish was received by server and we didn't timeout client side.
                pub.publishReceipt(letter, nil)
                channel.Close()
                return

            default:

                time.Sleep(time.Duration(time.Millisecond * 3))
            }
        }
    }
}

Code is located in this Repo: https://github.com/houseofcat/turbocookedrabbit

PubSub test to easily reproduce said scenario is here: https://github.com/houseofcat/turbocookedrabbit/blob/727874ecaf4548be5dcec3327178e0318460bb9f/tests/main_pubsub_test.go#L179

This may very well be an issue of my understanding of the confirmation process but I did confirm that strategy at least of Transient channel usage via pub/sub examples in this repo. It feels like its a continuous stream of broadcasts until success. The problem with that is the Connection died, the Channel was recreated, this chan is also closed. So it's just really weird.

Deadlock

go version go1.14.4 windows/amd64

houseofcat commented 4 years ago

After playing around with it some more...

The length prior to the select is 0 and it is in fact closed (1). So it never needs to be nil and remade... it's literally empty until it hits the select. I have no data race condition, nothing bizarre going on... so I just tried a return statement instead.

    counter := 0
FlushLoop:
    for {
        if ch.connHost.Connection.IsClosed() {
            return
        }

        if len(ch.Confirmations) == 100 {
            ch.Confirmations = nil
            ch.Confirmations = make(chan amqp.Confirmation, 100)
            ch.Channel.NotifyPublish(ch.Confirmations)
        }

        select {
        case confirmation := <-ch.Confirmations: // Some weird use case where the Channel is being flooded with confirms after connection disrupt

            counter++
            if counter == 10 {
                fmt.Printf("ChannelID: %d - confirmations flooded (confirmation deliverytag: %d) - initiating bypass!\r\n", ch.ID, confirmation.DeliveryTag)
                break FlushLoop
            }
        default:
            return
        }
    }

This just works simply, I don't know if I have a memory leak but it doesn't appear to be. It isn't a race condition and it isn't a goroutine leak either. Happily able to use the ChannelPool for PublishWithConfirmations again.

// FlushConfirms removes all previous confirmations pending processing.
func (ch *ChannelHost) FlushConfirms() {
    ch.chanLock.Lock()
    defer ch.chanLock.Unlock()

    for {
        if ch.connHost.Connection.IsClosed() {
            return
        }

        select {
        case <-ch.Confirmations: // Some weird use case where the Channel is being flooded with confirms after connection disrupt
            return
        default:
            return
        }
    }
}