bsm / sarama-cluster

Cluster extensions for Sarama, the Go client library for Apache Kafka 0.9 [DEPRECATED]
MIT License
1.01k stars 222 forks source link

Receiving protobuf data triggers a kafka unknown error #296

Closed hookover closed 5 years ago

hookover commented 5 years ago

version: kafka: 1.1.0 sarama-cluster: 2.1.15 sarama:1.22.1

topic: A, string data eg:

producer
>a
>b 
>c

topic: B, protobuf byte data (cannal read binlog data send to kafka https://github.com/alibaba/canal) eg:

  *��
mysql-bin.000001ǟ�. *UTF-80��ɷ-8B
                                 rupiah_loanJuser_loan_planP�Xb

kafka-console-consumer customer A、B: no any errors topic A customer: no any errors topic B customer errors:

//Every 3 seconds print this: 
Errors kafka: error while consuming rupiah_loan_user_loan/0: kafka server: Unexpected (unknown?) server error.

code

    config := cluster.NewConfig()
    config.Consumer.Return.Errors = true
    config.Consumer.Offsets.Initial = sarama.OffsetOldest

    customer, err := cluster.NewConsumer([]string{app.Cfg.App.GetString("KAFKA_BROKER_LIST")}, "dev.test", []string{"credit_data_plus_topic","rupiah_loan_user_loan_plan"}, config)
    if err != nil {
        fmt.Println("kafaka启动失败")
    }
    for {
        select {
        case noti := <-customer.Notifications():
            fmt.Printf("noti: %v\n", noti)
        case err := <-customer.Errors():
            fmt.Printf("errors:%v\n", err)
        case msg := <-customer.Messages():
            fmt.Println(msg.Topic, msg.Offset)
        default:
            time.Sleep(time.Second * 1)
            fmt.Println("间隔1s")
        }
    }

log:

rupiah_loan_user_loan_plan 208
rupiah_loan_user_loan_plan 210
rupiah_loan_user_loan_plan 212
rupiah_loan_user_loan_plan 214
rupiah_loan_user_loan_plan 216
rupiah_loan_user_loan_plan 218
errors:kafka: error while consuming rupiah_loan_user_loan_plan/0: kafka server: Unexpected (unknown?) server error.
rupiah_loan_user_loan_plan 220
rupiah_loan_user_loan_plan 222
rupiah_loan_user_loan_plan 224
errors:kafka: error while consuming rupiah_loan_user_loan_plan/0: kafka server: Unexpected (unknown?) server error.
rupiah_loan_user_loan_plan 226
errors:kafka: error while consuming rupiah_loan_user_loan_plan/0: kafka server: Unexpected (unknown?) server error.
errors:kafka: error while consuming rupiah_loan_user_loan_plan/0: kafka server: Unexpected (unknown?) server error.
errors:kafka: error while consuming rupiah_loan_user_loan_plan/0: kafka server: Unexpected (unknown?) server error.
errors:kafka: error while consuming rupiah_loan_user_loan_plan/0: kafka server: Unexpected (unknown?) server error.
errors:kafka: error while consuming rupiah_loan_user_loan_plan/0: kafka server: Unexpected (unknown?) server error.
errors:kafka: error while consuming rupiah_loan_user_loan_plan/0: kafka server: Unexpected (unknown?) server error.
errors:kafka: error while consuming rupiah_loan_user_loan_plan/0: kafka server: Unexpected (unknown?) server error.
..........

But when I restart the program, it will be consumed from beginning to end. However, if config.Consumer.Offsets.Initial is configured as sarama.OffsetNewest, a piece of data is consumed and the error is always reported. like this

#  go run main.go
rupiah_loan_user_loan_plan 250
errors:kafka: error while consuming rupiah_loan_user_loan_plan/0: kafka server: Unexpected (unknown?) server error.
errors:kafka: error while consuming rupiah_loan_user_loan_plan/0: kafka server: Unexpected (unknown?) server error.
^Csignal: interrupt

# go run main.go
rupiah_loan_user_loan_plan 260
errors:kafka: error while consuming rupiah_loan_user_loan_plan/0: kafka server: Unexpected (unknown?) server error.
errors:kafka: error while consuming rupiah_loan_user_loan_plan/0: kafka server: Unexpected (unknown?) server error.
errors:kafka: error while consuming rupiah_loan_user_loan_plan/0: kafka server: Unexpected (unknown?) server error.
hookover commented 5 years ago

Using other libraries to solve the problem

wbrucelee commented 5 years ago

I also meet this problem