wagslane / go-rabbitmq

A wrapper of streadway/amqp that provides reconnection logic and sane defaults
https://blog.boot.dev/golang/connecting-to-rabbitmq-in-golang-easy/
MIT License
805 stars 128 forks source link

NotifyPublish ReconnectCount - how is it intended to be used? #161

Closed jamoflaw closed 4 months ago

jamoflaw commented 6 months ago

Similar to #66 I am trying to enable confirms using the NotifyPublish handler - attempting to manually keep track of the DeliveryTag being set by incrementing each time I successfully publish

However on reconnect we don't seem to have any mechanism for the publisher to know that a reconnect happens so this manual tracking of DeliveryTags falls over when the channels DeliveryTag silently resets to 0

For example:

    // Publisher func
    go func() {

        publisher.NotifyPublish(func(p rabbitmq.Confirmation) {
            publisher_confirms <- p
        })

        // Initialise a delivery counter to keep track of the DeliveryTags assigned to the messages
        deliveryCounter := messagestore.DeliveryTag(1)
        for msg := range messages {

            err := publisher.PublishWithContext(
                context.Background(),
                msg.Body,
                []string{msg.RoutingKey},

                rabbitmq.WithPublishOptionsExchange("ingress"),
                rabbitmq.WithPublishOptionsContentType(msg.ContentType),
                rabbitmq.WithPublishOptionsContentEncoding(msg.ContentEncoding),
                rabbitmq.WithPublishOptionsAppID(msg.AppId),
                rabbitmq.WithPublishOptionsPersistentDelivery,
            )
            if err != nil {
                logger.Errorf("error publishing message, %s", err)
                continue
            }

            store.AppendMessage(deliveryCounter, msg)
            deliveryCounter++

        }
    }()

    // Confirms func
    go func() {

        for {
            select {
            case confirm := <-publisher_confirms:
                // Try to confirm
                if confirm.Ack {
                    /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
                    // Whatever I do here  you can't work out the delivery counter since the reconnection count isn't related to what the delivery counter was _before_
                    // it was reset
                    /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
                    store.Ack(messagestore.DeliveryTag(confirm.DeliveryTag + uint64(confirm.ReconnectionCount)))
                } else {
                    store.Nack(messagestore.DeliveryTag(confirm.DeliveryTag+uint64(confirm.ReconnectionCount)), true)
                }
            }
        }
    }()

Am I missing something in how it is supposed to be used? Is there any internal delivery tag which could be returned from the publisher object instead to track it?

jamoflaw commented 6 months ago

A potentially naïve fix (assuming I'm not missing how its supposed to be used) would be to expose the GetNextPublishSeqNo() method from the underlying channel

in channel_manager.go

func (chanManager *ChannelManager) GetNextPublishSeqNo() uint64 {
    return chanManager.channel.GetNextPublishSeqNo()
}

in publish.go

func (publisher *Publisher) GetNextPublishSeqNo() uint64 {
    publisher.disablePublishDueToBlockedMux.RLock()
    defer publisher.disablePublishDueToBlockedMux.RUnlock()
    return publisher.chanManager.GetNextPublishSeqNo()
}

however the thread safety of the lib is something I'm not 100% certain about - grabbing the next delivery tag sequence + publishing in two steps opens the possibility of a race condition if the publisher were to be shared between two go routines (is that even something which is possible / supported?)

I've tested this on a simple publisher / consumer app and that now works with a app where the producer is contained within a single go routine (showing changes from the original code)

    // Publisher func
    go func() {

        publisher.NotifyPublish(func(p rabbitmq.Confirmation) {
            publisher_confirms <- p
        })
        publisher.NotifyReturn(func(r rabbitmq.Return) {
            publisher_returns <- &r
        })

        // Initialise a delivery counter to keep track of the DeliveryTags assigned to the messages
        for msg := range messages {

            deliveryTag := messagestore.DeliveryTag(publisher.GetNextPublishSeqNo())
            err := publisher.PublishWithContext(
                context.Background(),
                msg.Body,
                []string{msg.RoutingKey},

                rabbitmq.WithPublishOptionsExchange("ingress"),
                rabbitmq.WithPublishOptionsContentType(msg.ContentType),
                rabbitmq.WithPublishOptionsContentEncoding(msg.ContentEncoding),
                rabbitmq.WithPublishOptionsAppID(msg.AppId),
                rabbitmq.WithPublishOptionsPersistentDelivery,
            )
            if err != nil {
                logger.Errorf("error publishing message, %s", err)
                continue
            }

            store.AppendMessage(deliveryTag, msg)
        }
    }()

    // Confirms func
    go func() {

        for {
            select {
            case confirm := <-publisher_confirms:
                // Try to confirm
                if confirm.Ack {
                    logger.Infof("Attempting to ack DeliveryTag: %d and ReconnectionCount: %d", confirm.DeliveryTag, confirm.ReconnectionCount)
                    store.Ack(messagestore.DeliveryTag(confirm.DeliveryTag))
                } else {
                    logger.Infof("Attempting to nack DeliveryTag: %d and ReconnectionCount: %d", confirm.DeliveryTag, confirm.ReconnectionCount)
                    store.Nack(messagestore.DeliveryTag(confirm.DeliveryTag), true)
                }

            case <-publisher_returns:

            }
        }
    }()
wagslane commented 5 months ago

The reconnect count should let you know when a reconnect happens, does that not help you know when the counter is reset?