houseofcat / turbocookedrabbit

A user friendly RabbitMQ library written in Golang.
MIT License
107 stars 20 forks source link

Publish Confirmation Timeout #24

Closed istrau2 closed 3 years ago

istrau2 commented 3 years ago

config:

Seasoning:
    EncryptionConfig:
      Enabled: false
      Type: aes
      TimeConsideration: 1
      Threads: 2
    CompressionConfig:
      Enabled: false
      Type: gzip
    PoolConfig:
      URI: amqps://user:password@instancename.rmq2.cloudamqp.com/virtualhost
      Heartbeat: 1800
      ConnectionTimeout: 5
      SleepOnErrorInterval: 1
      MaxConnectionCount: 10
      MaxCacheChannelCount: 10
    ConsumerConfigs:
      order_event_sync_ib:
        ConsumerName: order.event.sync.ib
        Enabled: true
        QueueName: order.event.sync.ib
        AutoAck: true
        Exclusive: true
        NoWait: true
        SleepOnErrorInterval: 1
        SleepOnErrorInterval: 1
    PublisherConfig:
      AutoAck: true
      SleepOnErrorInterval: 1
      SleepOnErrorInterval: 1
      PublishTimeOutInterval: 5
      MaxRetryCount: 5
  Topology:
    Exchanges:
    - Name: request.fix.itiviti
      Type: "fanout"
      PassiveDeclare: false
      Durable: true
      AutoDelete: false
      InternalOnly: false
      NoWait: true
    Queues:
    - Name: egress.itiviti
      PassiveDeclare: false
      Durable: true
      AutoDelete: false
      Exclusive: false
      NoWait: true
    QueueBindings:
    - QueueName: egress.itiviti
      ExchangeName: request.fix.itiviti
      NoWait: true
      NoWait: true

code:

func NewRabbitMQProvider(cfg *RabbitMQConfig, applicationName string) (*RabbitMQProvider, error) {
    cp, err := tcr.NewConnectionPool(cfg.Seasoning.PoolConfig)
    if err != nil {
        return nil, util.WrapError(err)
    }

    t := tcr.NewTopologer(cp)
    if err != nil {
        return nil, util.WrapError(err)
    }
    err = t.BuildTopology(&cfg.Topology, false)
    if err != nil {
        return nil, util.WrapError(err)
    }

    return &RabbitMQProvider{
        seasoning: cfg.Seasoning,
        connPool:  cp,
    }, nil
}

func (p *RabbitMQProvider) Publish(letter *tcr.Letter, skipReceipt bool, publishReceiptHandler func(*tcr.PublishReceipt)) {
    publisher := p.ProvidePublisher()
    p.handlerChan <- publishReceiptIDHandler{
        handler:  publishReceiptHandler,
        letterID: letter.LetterID.String(),
    }
    publisher.PublishWithConfirmationTransient(letter, 10)
}

letter := &tcr.Letter{
        LetterID: id,
        Body:     []byte(nos.ToMessage().String()),
        Envelope: &tcr.Envelope{
            Exchange:     "request.fix.itiviti",
            RoutingKey:   "egress.itiviti",
            ContentType:  "text/plain",
            Mandatory:    true, // put this message on at least one queue, if not, send it back
            Immediate:    true, // send immediately or not at all
            Priority:     0,    // not relevant
            DeliveryMode: 1,    // do not store messages to disc to resend in case of server crash, storing in disc affects latency and would cause messages to be asyncronously sent
        },
    }

    b.rabbitMQProvider.Publish(letter, false, func(r *tcr.PublishReceipt) {
        if r.Success {
            fmt.Println(fmt.Sprintf("letter %s published successfully", letter.LetterID.String()))
        } else {
            fmt.Println(fmt.Sprintf("letter %s errored on publish with: %s", letter.LetterID.String(), r.Error.Error()))
        }
    })

seeing the following error message:

letter 3183b158-2a7e-11ec-a460-acde48001122 errored on publish with: publish confirmation for LetterID: 3183b158-2a7e-11ec-a460-acde48001122 wasn't received in a timely manner (10ms) - recommend retry/requeue

houseofcat commented 3 years ago

You have verified the exchange, queue, and queue binding exists?

Also try without mandatory or priority.

istrau2 commented 3 years ago

@houseofcat Thanks for the response.

Yes, exchange, queue and binding exists, in fact, they were created by the Topologer using the code in the OP.

I tried setting mandatory to false and priority to 1 to no avail. A couple more points to maybe help solve this:

  1. tcr.NewConnectionPool(cfg.Seasoning.PoolConfig) takes somewhere between 5-10 seconds to return, the length of time surprised me.
  2. I am using an amqps:// url without providing certificates via the TCLConfig

I am unsure as to how to debug this. I keep playing with various settings and checking the rabbitmq manager to see if any messages went through but there never are. I always get the same error back in the publish receipt as well.

houseofcat commented 3 years ago

5671 is the port for amqps, not 5672. Assuming your CloudAmqp is hooked up just fine and amqps is enabled, it could be something else.

You might not be forming a connection. https://www.cloudamqp.com/docs/amqp.html

Also since you are publishing remotely, you should allow a healthy timeout. 10ms is probably too short for long distance remote amqp servers.

istrau2 commented 3 years ago

@houseofcat I think it is connecting alright because i don't get any errors from the connection pool and because the topologer was able to connect exchanges/queues/bindings.

In terms of the timeout, I tried changing:

PublisherConfig:
      PublishTimeOutInterval: 5000

but still got the same message (with 10ms:

letter 40d1e192-2ab1-11ec-9b10-acde48001122 errored on publish with: publish confirmation for LetterID: 40d1e192-2ab1-11ec-9b10-acde48001122 wasn't received in a timely manner (10ms) - recommend retry/requeue

I had assumed that the unit was seconds in PublishTimeOutInterval. Come to think of it, I had it set to 5 before and it still responded with an error mentioning 10ms. Thoughts?

houseofcat commented 3 years ago

Sounds like its not reading the value, and I haven't tested anything you have done with Yaml based configuration. I would verify you are editing the write Config and not accidentally loading JSON etc.

You should step through your code when it calls publish to verify the time out value and where its getting assigned. You have custom code sitting on top of your publish and I can confirm publish timeout the unit test is working as expected.

houseofcat commented 3 years ago

@istrau2 https://github.com/houseofcat/turbocookedrabbit/blob/d988bddf1d5cdf1f8359d4a2be8cf44d51c67933/v2/pkg/tcr/publisher.go#L269

https://github.com/houseofcat/turbocookedrabbit/blob/d988bddf1d5cdf1f8359d4a2be8cf44d51c67933/v2/pkg/tcr/publisher.go#L309

And based on your code, you are calling it with a value of 10. publisher.PublishWithConfirmationTransient(letter, 10)

While publishTimeOutDuration is used on PublishWithConfirmation as well, a lot of feedback was that occasionally, the developer would like to dynamically set the timeout based on specific considerations so I added this as an override. You have a global timeout and you have a local timeout override.

I maintain that RTT for CloudAmqp should be greater than 10 ms. An ICMP ping from middle Florida averages 19-25 ms. I would hazard a guess an actual RabbitMQ payload would be slower. Our library would never succeed with a value of 10 set.

istrau2 commented 3 years ago

@houseofcat Thanks for the responses.

I've fixed the timeout. Unfortunately, still seeing the error:

letter c7012262-2b26-11ec-ab3f-acde48001122 errored on publish with: publish confirmation for LetterID: c7012262-2b26-11ec-ab3f-acde48001122 wasn't received in a timely manner (1000ms) - recommend retry/requeue

istrau2 commented 3 years ago

@houseofcat please see #25, that has fixed the timeout issue.

Now, I am seeing an error here: https://github.com/houseofcat/turbocookedrabbit/blob/master/v2/pkg/tcr/publisher.go#L297

channel/connection is not open

why would the channel not be open? what is the best way to debug?

Thanks again

istrau2 commented 3 years ago

After looking through the logs I found:

operation basic.publish caused a connection exception not_implemented: "immediate=true" which led me to this thread: https://github.com/streadway/amqp/issues/45

and after setting immediate flag in the letter to false, the publish works.