rabbitmq / rabbitmq-stream-go-client

A client library for RabbitMQ streams
MIT License
170 stars 20 forks source link

go-stream-locator #210

Closed Rozztig closed 4 months ago

Rozztig commented 1 year ago

Describe the bug

Our service using rabbitmq-stream-go-client gets stuck at random times resulting in downtime. The issue starts when the RabbitMQ server logs "unknown command".

¤ unknown command {request,11,{close,1,<<79,75>>}}, closing connection. ¤ unknown command {response,18,{close,1}}, closing connection.

After seeing the unknown command our service fails to create new consumers. Instead we can see an increasing number of go-steam-locator in the RabbitMQ UI. This will go on until our service runs out of memory and restarts. So the issue is that the go-stream-locators get stuck forever. (Usually around 30-40k connections with only go-stream-locators before our service OOM)

Reproduction steps

  1. Throttle RabbitMQ. (resource wise in Kubernetes)
  2. Having a churn of 200/s opening and closing. NOTE: In production we have a churn around 3/s. Still happens once/twice a week
  3. Look for unknown command in RabbitMQ logs.
  4. Look in the RabbitMQ UI for stuck go-stream-locators ...

Expected behavior

For the go-stream-locators to vanish and messages going out to the consumers. Not hanging and taking about resources until crashing.

Additional context

We had this issue for 6 months now. We have decreased the frequency by lowering the opening and closing of connections. But it still happens randomly even with a lot of resources available. Using RabbitMQ 3.11.14 and rabbitmq-stream-go-client 1.1.2

Gsantomaggio commented 1 year ago

If you could provide a repository or source code to reproduce the issue would help to understand the problem. Thank you

michaelklishin commented 1 year ago

Or a Wireshark traffic capture and a set of exact steps used to "throttle RabbitMQ". Guessing is a very expensive way of troubleshooting distributed systems.

Gsantomaggio commented 1 year ago

Also, you should have some log client side. I am trying to reproduce the issue without success with:

    reader := bufio.NewReader(os.Stdin)
    env, err := stream.NewEnvironment(
        stream.NewEnvironmentOptions().
            SetVHost("/"))
    CheckErr(err)

    fmt.Print("Starting test")
    waitChan := make(chan struct{}, 1)
    for i := 0; i < 5000; i++ {
        waitChan <- struct{}{}
        if i%500 == 0 {
            fmt.Printf(" progress:%d \n", i)
        }
        go func() {
            streamName := uuid.New().String()
            err = env.DeclareStream(streamName, nil)
            CheckErr(err)
            producer, err := env.NewProducer(streamName, nil)
            CheckErr(err)
            producer.Close()
            consumer, err := env.NewConsumer(streamName, func(consumerContext stream.ConsumerContext, message *amqp.Message) {
                fmt.Printf("message received %s \n", message.GetData())
            }, nil)
            CheckErr(err)
            consumer.Close()
            _ = env.DeleteStream(streamName)
            <-waitChan
        }()
    }
    fmt.Print("Test completed")
Rozztig commented 1 year ago

If you could provide a repository or source code to reproduce the issue would help to understand the problem. Thank you

Thank you for your fast replies. Will try to create a example-project today that reproduces the issue.

During our testing we had throttled RabbitMQ in Kubernetes by setting the resource limits. Giving the pod a maximum of cpu: 500m memory: 2Gi This is just to trigger the issue faster. In production we give the pod 24Gi memory and cpu: 2 vcores. But we are nowhere near using that much resources and still experience the issue randomly.

But will reply later when we got a repo with an example reproducing the issue. Thanks!

Gsantomaggio commented 4 months ago

Closed by inactivity