houseofcat / turbocookedrabbit

A user friendly RabbitMQ library written in Golang.
MIT License
107 stars 20 forks source link

PublishWithConfirmation infinte loop #15

Closed dfelici87 closed 3 years ago

dfelici87 commented 3 years ago

Hi,

I use PublishWithConfirmation function but if rabbit return an error (confirmation.Ack = false) the function enter into an infinite loop because there is a "goto Publish" statement that reset the timeout and re-publish a message every times.

It's correct? or It's a bug? Can you manage attempts?

thanks

houseofcat commented 3 years ago

It's correct, confirmation.Ack == false means that the server is telling us it did not get the message and that we need to resend it.

This could in theory be an infinite loop if the server kept replying false but that would be a server side issue that needs solving. PublishWithConfirmation is working as intended to fulfill the goal to get the message server side and reliability is the most important feature. There is nothing to manage in this flow other than using a timeout. The timeout is implicitly applied to waiting response after Publishing, that should be sufficient in exiting a standard communication problem.

If you want simpler Publishing then use a different Publish call/mechanism or you can write your own version while still using ChannelPools.

ppiccolo commented 3 years ago

I can understand Your point but how can I catch this error ?

houseofcat commented 3 years ago

The infinite loop? Well you could write your own or switch to this one manual Publish method

https://github.com/houseofcat/turbocookedrabbit/blob/ab307a1ac5fa0399b9c6ba40ba53565d1191b093/v2/pkg/tcr/publisher.go#L206

It will stop trying after context you provide expires.

ppiccolo commented 3 years ago

Ok thank you, but how I can get back the channel error, no way ?

I think that could be useful to be able from client side to see that there is a server side error.

houseofcat commented 3 years ago

Why would you need to do that? In golang you can step into the package and put a breakpoint?

If you mean after deployment and wanted to log such a message you have to get creative in golang without Dependency Injection. Here you would need writing a logging interface or return the error out. This function is designed to keep retrying on error - meaning we can't return the error - because reliable transmission is the highest priority here.

If you want Publish with errors try one of the simpler publish functions. Here, for example, we output err via a chan that you can subscribe to and log all messages. https://github.com/houseofcat/turbocookedrabbit/blob/ab307a1ac5fa0399b9c6ba40ba53565d1191b093/v2/pkg/tcr/publisher.go#L100

This gives you access to the original message (presumably for retry) and the original error.

Also you can write your own version using components like the ChannelPool. You don't have to use the helpers I provided.

houseofcat commented 3 years ago

PublishWithConfirmation is just loop on Publish.

Switch to Publish. https://github.com/houseofcat/turbocookedrabbit/blob/ab307a1ac5fa0399b9c6ba40ba53565d1191b093/v2/pkg/tcr/publisher.go#L78

ppiccolo commented 3 years ago

If you want Publish with errors try one of the simpler publish functions. Here, for example, we output err via a chan that you can subscribe to and log all messages.

This is exactly what I'm trying to do but if I try to publish a message to a non existing exchange I'm not getting back any error on the chan that you provide.

But maybe I'm missing something...

package main

import (
    log "github.com/sirupsen/logrus"
    turbo "github.com/houseofcat/turbocookedrabbit/v2/pkg/tcr"
    "time"
)

func ErrorHandler(err error){
    log.Errorf("Error: %v", err)
}

func main() {

    cfg := &turbo.RabbitSeasoning{
        EncryptionConfig:  &turbo.EncryptionConfig{
            Enabled:           false,
            Type:              "",
            Hashkey:           nil,
            TimeConsideration: 0,
            MemoryMultiplier:  0,
            Threads:           0,
        },
        CompressionConfig: &turbo.CompressionConfig{
            Enabled: false,
            Type:    "",
        },
        PoolConfig:        &turbo.PoolConfig{
            ApplicationName:      "test_error",
            URI:                  "amqp://guest:guest@x.x.x.x:5672/",
            Heartbeat:            1,
            ConnectionTimeout:    10,
            SleepOnErrorInterval: 1,
            MaxConnectionCount:   100,
            MaxCacheChannelCount: 100,
            TLSConfig:            nil,
        },
        ConsumerConfigs:   nil,
        PublisherConfig:   &turbo.PublisherConfig{
            AutoAck:                false,
            SleepOnIdleInterval:    1,
            SleepOnErrorInterval:   1,
            PublishTimeOutInterval: 1,
            MaxRetryCount:          2,
        },
    }
    svc, err := turbo.NewRabbitService(cfg, "","", nil, ErrorHandler)

    if err != nil {
        log.Fatalf("%v", err)
    }

    go func() {

        for {
            select {
            case err := <-svc.CentralErr():
                log.Errorf("Error %v", err)
            case recipt := <-svc.Publisher.PublishReceipts():
                if recipt.Error != nil {
                    log.Errorf("Error %v", recipt.ToString())
                    break
                }
                log.Infof("Info %v", recipt.ToString())
            default:
                time.Sleep(10 * time.Microsecond)
                break
            }
        }
    }()

    log.Infof("%v", "publishing...")

    for i := 0; i < 100; i++ {
        err = svc.Publish("Hi", "nothing", "nothing", "", true, nil)
        if err != nil {
            log.Fatalf("%v", err)
        }
    }

    time.Sleep(10 * time.Second)

    log.Infof("%v", "Done")
}

however I'm agree with you that the behavior of PublishWithConfirmation is correct and the suggestion you given can fit the requirements.

Maybe this could be another issue.

Thanks.

houseofcat commented 3 years ago

I believe what you are experiencing is a Dead Letter Queueing. Edit: I am starting to have my doubts though. I would normally expect this to error like it does in C#.

This test publishes to an exchange that doesn't exist and uses none of my library for connectivity.

// TestBasicPublishToNonExistentExchange tests what happen when a publish to exchange
// that doesn't exist also doesn't error.
func TestBasicPublishToNonExistentExchange(t *testing.T) {
    defer leaktest.Check(t)()

    letter := tcr.CreateMockLetter("DoesNotExist", "TcrTestQueue", nil)
    amqpConn, err := amqp.Dial(Seasoning.PoolConfig.URI)
    if err != nil {
        t.Error(t, err)
        return
    }

    amqpChan, err := amqpConn.Channel()
    if err != nil {
        t.Error(t, err)
        return
    }

    err = amqpChan.Publish(
        letter.Envelope.Exchange,
        letter.Envelope.RoutingKey,
        letter.Envelope.Mandatory,
        letter.Envelope.Immediate,
        amqp.Publishing{
            ContentType: letter.Envelope.ContentType,
            Body:        letter.Body,
            MessageId:   letter.LetterID.String(),
            Timestamp:   time.Now().UTC(),
            AppId:       "TCR-Test",
        })

    if err != nil {
        t.Error(t, err)
        return
    }

    amqpChan.Close()
    amqpConn.Close()
}

As you can see the test doesn't exit out early. image

And that exchange does not exist. image

And you would expect no error in a DLQing scenario which happens to be what we see here.

The reason it loops in a PublishWithConfirmation call is because I get a second message (the confirmation message). This tells me it was not properly received server side at its destination - which is correct.

As good practice goes, a pattern that I have seen is that devs will commonly declare the queue and exchange (and bind them) on every app startup. It's why I added full topology support via JSON file too. You get a simple way to keep a source controlled version of your topology and also easily hop between environments.

Assuming it really is dead lettering, then alternatively - and more of devops / server administration solution - is to configure your Dead Letter Policy server side. Route the messages in this situation somewhere as to not lose them. https://www.rabbitmq.com/dlx.html

ppiccolo commented 3 years ago

Edit: I am starting to have my doubts though. I would normally expect this to error like it does in C#.

Definitely this is my expectation too... need further investigation

houseofcat commented 3 years ago

@ppiccolo Hey Paolo, simple temporary work around is to use the Publish or PublishWithConfirmationContext for right now. I believe the issue is that somewhere, somehow, it's not following AMQP v0.9.1 spec in streadway/amqp. I have opened an issue with them and will keep this open to monitor the situation.

https://github.com/streadway/amqp/issues/504

Also consider that you should always try to build your queues, exchanges, and bindings, on startup just to be safe that they are not missing.

ppiccolo commented 3 years ago

Ok, tnx a lot.