apache / pulsar-client-go

Apache Pulsar Go Client Library
https://pulsar.apache.org/
Apache License 2.0
659 stars 336 forks source link

Got " Failed to create consumer at reconnect" when stop pulsar broker first and restart it later #1290

Open zhangbiao2009 opened 1 month ago

zhangbiao2009 commented 1 month ago

Expected behavior

Expect no error happens.

Actual behavior

image

Steps to reproduce

  1. run the go example program
  2. stop pulsar broker
  3. observe the output of go example, start pulsar broker after you see this error message: error="dial tcp [::1]:6650: connect: connection refused" remote_addr="pulsar://localhost:6650"
  4. you can see the Failed to create consumer at reconnect now.

program example to reproduce:

package main

import (
    "context"
    "fmt"
    "log"

    "net/http"
    _ "net/http/pprof"

    "github.com/apache/pulsar-client-go/pulsar"
)

func main() {

    go func() {
        log.Println(http.ListenAndServe("localhost:6060", nil))
    }()

    // Create a Pulsar client
    client, err := pulsar.NewClient(pulsar.ClientOptions{
        URL: "pulsar://localhost:6650",
    })
    if err != nil {
        log.Fatalf("Could not create Pulsar client: %v", err)
    }
    defer client.Close()

    consumer, err := client.Subscribe(pulsar.ConsumerOptions{
        Topic:            "my-topic",
        SubscriptionName: "my-subscription",
        Type:             pulsar.KeyShared,
    })
    if err != nil {
        fmt.Printf("Could not create consumer: %v\n", err)
        return
    }
    go func() {
        for {
            msg, err := consumer.Receive(context.Background())
            if err != nil {
                log.Printf("Error receiving message: %v", err)
                break
            }

            // Process the message
            fmt.Printf("Received message msgId: %v -- content: '%s'\n",
                msg.ID(), string(msg.Payload()))

            // Acknowledge the message
            consumer.Ack(msg)
        }
    }()

    // Keep the main function alive to allow the goroutine to run
    select {}
}

System configuration

Macbook Pro 2021 go version: 1.23.0 pulsar go sdk version: 0.13.1
 pulsar broker version: latest docker image. I run it with this command: docker run -it \ -p 6650:6650 \ -p 8080:8080 \ apachepulsar/pulsar:latest \ bin/pulsar standalone

nodece commented 1 month ago

my-topic does not exist.

zhangbiao2009 commented 1 month ago

my-topic does not exist.

@nodece Thanks for the reply.

I think I got what you mean, the reason why it fails to reconnect because the topic doesn't exist after I re-ran the docker pulsar command, because the topic was stored locally in the docker container, they were lost after I restarted the docker command. This error wouldn't happen in a real environment because topics are stored in BookKeeper. Is that correct?

I also tried specifying a volume for it to store topics persistently: docker run -it \ -p 6650:6650 \ -p 8080:8080 \ -v pulsar_data:/pulsar/data \ apachepulsar/pulsar:latest \ bin/pulsar standalone It can reconnect successfully when I repeat my test.

However, I still think this is an issue, I checked the running goroutines when the Topic Not Found error happened, the consumer.Receive() still blocks in this case. I think if we cannot reconnect successfully, consumer.Receive() should return an error, otherwise the caller can do nothing about it but getting stuck there. Do you think so?

nodece commented 1 month ago

We need to introduce the Golang context, which can help us break the receiving process.