confluentinc / confluent-kafka-go

Confluent's Apache Kafka Golang client
Apache License 2.0
4.5k stars 647 forks source link

Work with go gin error #1160

Open darrkz opened 3 months ago

darrkz commented 3 months ago

Description

I use confluent-kafka-go with github.com/gin-gonic/gin web server

all code logic is : get message info from http get convert it to json string write the json string to kafka using confluent-kafka-go asyncProducer

code like this

func AsyncProducer(p *kafka.Producer, topic string, msg string) {
    //-- -------------------------------
    //--> @Description
    //-->
    // https://docs.confluent.io/kafka-clients/go/current/overview.html#asynchronous-writes
    deliveryChan := make(chan kafka.Event, 1000)
    err := p.Produce(&kafka.Message{
        TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
        Value:          []byte(msg),
        Headers:        []kafka.Header{{Key: "myTestHeader", Value: []byte("header values are binary")}},
    }, deliveryChan)

    if err != nil {
        fmt.Println("Failed to produce message")
        fmt.Printf("some error: \n%v\n", err)
        panic(err)
    }

    go func() {
        for e := range p.Events() {
            switch ev := e.(type) {
            case *kafka.Message:
                if ev.TopicPartition.Error != nil {
                    fmt.Printf("=Failed to deliver message: %v\n%s\n=", ev.TopicPartition, ev.TopicPartition.Error)
                } else {
                    fmt.Printf("Successfully produced record to topic %s partition [%d] @ offset %v\n",
                        *ev.TopicPartition.Topic, ev.TopicPartition.Partition, ev.TopicPartition.Offset)
                }
            case kafka.Error:
                // Generic client instance-level errors, such as
                // broker connection failures, authentication issues, etc.
                //
                // These errors should generally be considered informational
                // as the underlying client will automatically try to
                // recover from any errors encountered, the application
                // does not need to take action on them.
                fmt.Printf("kafka Error: %v\n", ev)
            default:
                fmt.Printf("Ignored kafka event: %s\n", ev)
            }
        }
    }()
}

when I access the web server , it retuen http 500 and return error

Local: Invalid argument or configuration

if I add os.Exit(0) before run go-gin, no message was writen to kafka if I add os.Exit(0) before run go-gin and sleep for a while , about 10 second, then the messages used to test can write to kafka correctly

if I remove the os.Exit(0) , and the gin web server start ok , but when I access the server with http get method, it returns http 500

So I don't konw why, and how to fix this

How to reproduce

Checklist

Please provide the following information:

Local: Invalid argument or configuration /mnt/e/go/go_gin_kafka/utils/bigdata/gokafka.go:154 (0x59a4c4) AsyncProducer: panic(err) /mnt/e/go/go_gin_kafka/webserver.go:78 (0x86718b) WriteKafka: goKafka.AsyncProducer(p, ConfMap["topic"], message) /home/darkz/go/pkg/mod/github.com/gin-gonic/gin@v1.9.1/context.go:174 (0x86765b) (Context).Next: c.handlersc.index /mnt/e/go/go_gin_kafka/webserver.go:141 (0x867649) DataCheck: c.Next() /home/darkz/go/pkg/mod/github.com/gin-gonic/gin@v1.9.1/context.go:174 (0x86749e) (Context).Next: c.handlersc.index /mnt/e/go/go_gin_kafka/webserver.go:123 (0x86748c) Url2Json: c.Next() // 作用类似于goto.. 直接跳到下一个中间件执行完再返回 /home/darkz/go/pkg/mod/github.com/gin-gonic/gin@v1.9.1/context.go:174 (0x83b599) (Context).Next: c.handlersc.index /home/darkz/go/pkg/mod/github.com/gin-gonic/gin@v1.9.1/recovery.go:102 (0x83b587) CustomRecoveryWithWriter.func1: c.Next() /home/darkz/go/pkg/mod/github.com/gin-gonic/gin@v1.9.1/context.go:174 (0x83a6dc) (Context).Next: c.handlersc.index /home/darkz/go/pkg/mod/github.com/gin-gonic/gin@v1.9.1/logger.go:240 (0x83a6c3) LoggerWithConfig.func1: c.Next() /home/darkz/go/pkg/mod/github.com/gin-gonic/gin@v1.9.1/context.go:174 (0x839caa) (Context).Next: c.handlersc.index /home/darkz/go/pkg/mod/github.com/gin-gonic/gin@v1.9.1/gin.go:656 (0x839c97) serveError: c.Next() /home/darkz/go/pkg/mod/github.com/gin-gonic/gin@v1.9.1/gin.go:649 (0x839a04) (Engine).handleHTTPRequest: serveError(c, http.StatusNotFound, default404Body) /home/darkz/go/pkg/mod/github.com/gin-gonic/gin@v1.9.1/gin.go:576 (0x8394f1) (Engine).ServeHTTP: engine.handleHTTPRequest(c) /home/darkz/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.22.0.linux-amd64/src/net/http/server.go:3137 (0x6e0b6d) serverHandler.ServeHTTP: handler.ServeHTTP(rw, req) /home/darkz/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.22.0.linux-amd64/src/net/http/server.go:2039 (0x6dc6e7) (conn).serve: serverHandler{c.server}.ServeHTTP(w, w.req) /home/darkz/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.22.0.linux-amd64/src/runtime/asm_amd64.s:1695 (0x47a420) goexit: BYTE $0x90 // NOP

 - [x] Provide broker log excerpts

None....


 - [x] Critical issue
milindl commented 3 months ago

Hi @darrkz , can you log before producing the topic name? it might be because topic string is empty

darrkz commented 3 months ago

@milindl that's OK, I found I miss a var setting outer function main, and use := redefined it in main, so it could not user in other where. And I think the log not show this err clearly, cost so much time....

milindl commented 3 months ago

Alright @darrkz thanks for the update . Would be be interested in making a PR to improve the messaging around the error? I'm not certain how the error was solved so it might be useful to others in the future.

darrkz commented 3 months ago

@milindl Just my mistake, and not something wrong about confluent-kafka-go...

The pr I will make it later...

darrkz commented 3 months ago

@milindl pr is here https://github.com/confluentinc/confluent-kafka-go/pull/1162

Please review for it