twmb / franz-go

franz-go contains a feature complete, pure Go library for interacting with Kafka from 0.8.0 through 3.8+. Producing, consuming, transacting, administrating, etc.
BSD 3-Clause "New" or "Revised" License
1.84k stars 190 forks source link

unexpected data in plain response #458

Closed suxiangdong closed 1 year ago

suxiangdong commented 1 year ago

When consuming cls (Tencent's logging service) data via kafka protocol, authentication fails using "sasl_plaintext" protocol. However, with the same configuration, using sarama library can authenticate and consume data normally. I can't find the problem and need your help, thanks.

https://cloud.tencent.com/document/product/614/72651

version: sarama: v1.38.1 franz-go: v1.10.0

sarama demo:

package main

import (
    "fmt"

    "github.com/Shopify/sarama"
)

var kafkaAddresses = []string{"kafkaconsumer-ap-guangzhou.cls.tencentcs.com:9096"}
var topic = "xxxx"

func main() {
    config := sarama.NewConfig()

    fmt.Println("clientID: ", config.ClientID)
    config.Net.SASL.Mechanism = "PLAIN"
    config.Net.SASL.Version = int16(1)
    config.Net.SASL.Enable = true
    config.Net.SASL.User = "xxxxxx"
    config.Net.SASL.Password = "xxxxxx"
    //config.Version = sarama.V1_0_0_0

    consumer, err := sarama.NewConsumer(kafkaAddresses, config)
    if err != nil {
        fmt.Printf("fail to start consumer, err:%v\n", err)
        return
    }
    partitionList, err := consumer.Partitions(topic)
    if err != nil {
        fmt.Printf("fail to get list of partition:err%v\n", err)
        return
    }
    fmt.Println("partitionList:", partitionList)

    for partition := range partitionList {
        pc, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetOldest)
        if err != nil {
            fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
            return
        }
        defer pc.AsyncClose()

        go func(sarama.PartitionConsumer) {
            for msg := range pc.Messages() {
                fmt.Println(fmt.Sprintf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value))
            }
        }(pc)
    }

    for {
    }
}

franz-go demo:

package main

import (
    "context"
    "fmt"
    "github.com/twmb/franz-go/pkg/kgo"
    "github.com/twmb/franz-go/pkg/sasl/plain"
    "os"
)

func main() {
    opts := []kgo.Opt{kgo.SeedBrokers([]string{"kafkaconsumer-ap-guangzhou.cls.tencentcs.com:9096"}...),
        kgo.WithLogger(kgo.BasicLogger(os.Stdout, kgo.LogLevelDebug, func() string {
            return ""
        })),
        kgo.ConsumeTopics("xxxxxxxx"),
        //kgo.DisableAutoCommit(),
        kgo.SASL(plain.Auth{
            User: "xxxxx",
            Pass: "xxxxx",
        }.AsMechanism()),
        //kgo.DialTLSConfig(&tls.Config{
        //  InsecureSkipVerify: false,
        //}),
        //kgo.BlockRebalanceOnPoll(),
        //kgo.RequireStableFetchOffsets(),
    }
    cli, err := kgo.NewClient(opts...)
    if err != nil {
        fmt.Println(err)
        return
    }
    if err := cli.Ping(context.Background()); err != nil {
        fmt.Println(err)
        return
    }
    ctx := context.Background()
    for {
        msg := cli.PollRecords(ctx, 1000)
        if msg.IsClientClosed() {
            fmt.Println("closed")
            return
        }
        msg.EachRecord(func(r *kgo.Record) {
            fmt.Println(string(r.Value))
        })
    }
}

error:

[DEBUG] opening connection to broker; addr: kafkaconsumer-ap-guangzhou.cls.tencentcs.com:9096, broker: seed 0
[INFO] immediate metadata update triggered; why: client initialization
[DEBUG] connection opened to broker; addr: kafkaconsumer-ap-guangzhou.cls.tencentcs.com:9096, broker: seed 0
[DEBUG] issuing api versions request; broker: seed 0, version: 3
[DEBUG] wrote ApiVersions v3; broker: seed 0, bytes_written: 31, write_wait: 55.375µs, time_to_write: 71.833µs, err: <nil>
[DEBUG] read ApiVersions v3; broker: seed 0, bytes_read: 114, read_wait: 22.875µs, time_to_read: 47.764584ms, err: <nil>
[DEBUG] issuing SASLHandshakeRequest; broker: seed 0
[DEBUG] wrote SASLHandshake v1; broker: seed 0, bytes_written: 24, write_wait: 14.292µs, time_to_write: 59.125µs, err: <nil>
[DEBUG] read SASLHandshake v1; broker: seed 0, bytes_read: 23, read_wait: 12µs, time_to_read: 47.269458ms, err: <nil>
[DEBUG] beginning sasl authentication; broker: seed 0, addr: kafkaconsumer-ap-guangzhou.cls.tencentcs.com:9096, mechanism: PLAIN, authenticate: true
[DEBUG] issuing SASLAuthenticate; broker: seed 0, version: 0, step: 0
[DEBUG] wrote SASLAuthenticate v0; broker: seed 0, bytes_written: 128, write_wait: 47.625µs, time_to_write: 61.25µs, err: <nil>
[DEBUG] read SASLAuthenticate v0; broker: seed 0, bytes_read: 123, read_wait: 14.208µs, time_to_read: 48.619375ms, err: <nil>
[ERROR] unable to initialize sasl; broker: seed 0, err: unexpected data in plain response
[DEBUG] connection initialization failed; addr: kafkaconsumer-ap-guangzhou.cls.tencentcs.com:9096, broker: seed 0, err: unexpected data in plain response
[DEBUG] opening connection to broker; addr: kafkaconsumer-ap-guangzhou.cls.tencentcs.com:9096, broker: seed 0
unexpected data in plain response
twmb commented 1 year ago

It looks like Sarama does not enforce the response is empty. Per RFC-4616, the server does not send a challenge. I guess it can just be ignored.

twmb commented 1 year ago

What error do you get with this patch: https://github.com/twmb/franz-go/commit/ec2ba2f9b7a6226828d93598fccfcb24a4f76be8 ? (be sure to strip any password if Tencent is returning that in the response)

I have another branch: https://github.com/twmb/franz-go/tree/plain_challenge_detail but if I can change the patch to be specific to something Tencent is saying, then I will do some string comparison rather than removing the check entirely.

suxiangdong commented 1 year ago

What error do you get with this patch: ec2ba2f ? (be sure to strip any password if Tencent is returning that in the response)

I have another branch: https://github.com/twmb/franz-go/tree/plain_challenge_detail but if I can change the patch to be specific to something Tencent is saying, then I will do some string comparison rather than removing the check entirely.

The error message returned contains the username and password used for authentication. I have removed the sensitive information.

error:

[DEBUG] opening connection to broker; addr: kafkaconsumer-ap-guangzhou.cls.tencentcs.com:9096, broker: seed 0
[INFO] immediate metadata update triggered; why: querying metadata for consumer initialization
[DEBUG] connection opened to broker; addr: kafkaconsumer-ap-guangzhou.cls.tencentcs.com:9096, broker: seed 0
[DEBUG] issuing api versions request; broker: seed 0, version: 3
[DEBUG] wrote ApiVersions v3; broker: seed 0, bytes_written: 61, write_wait: 61.792µs, time_to_write: 26.208µs, err: <nil>
[DEBUG] read ApiVersions v3; broker: seed 0, bytes_read: 114, read_wait: 132.292µs, time_to_read: 69.705958ms, err: <nil>
[DEBUG] issuing SASLHandshakeRequest; broker: seed 0
[DEBUG] wrote SASLHandshake v1; broker: seed 0, bytes_written: 24, write_wait: 37.75µs, time_to_write: 29.709µs, err: <nil>
[DEBUG] read SASLHandshake v1; broker: seed 0, bytes_read: 23, read_wait: 38.416µs, time_to_read: 70.716084ms, err: <nil>
[DEBUG] beginning sasl authentication; broker: seed 0, addr: kafkaconsumer-ap-guangzhou.cls.tencentcs.com:9096, mechanism: PLAIN, authenticate: true
[DEBUG] issuing SASLAuthenticate; broker: seed 0, version: 0, step: 0
[DEBUG] wrote SASLAuthenticate v0; broker: seed 0, bytes_written: 128, write_wait: 68.292µs, time_to_write: 38.833µs, err: <nil>
[DEBUG] read SASLAuthenticate v0; broker: seed 0, bytes_read: 123, read_wait: 40.667µs, time_to_read: 97.642417ms, err: <nil>
[ERROR] unable to initialize sasl; broker: seed 0, err: unexpected data in plain response: "\x00${USERNAME}\x00${PASSWORD}"
[DEBUG] connection initialization failed; addr: kafkaconsumer-ap-guangzhou.cls.tencentcs.com:9096, broker: seed 0, err: unexpected data in plain response: "\x00${USERNAME}\x00${PASSWORD}"
[DEBUG] opening connection to broker; addr: kafkaconsumer-ap-guangzhou.cls.tencentcs.com:9096, broker: seed 0
suxiangdong commented 1 year ago

It looks like Sarama does not enforce the response is empty. Per RFC-4616, the server does not send a challenge. I guess it can just be ignored.

After ignoring the challenge, it works fine. Is the server side not supposed to return information? In RFC-4616, I didn't quite understand where it was specified.

twmb commented 1 year ago

Strange. No, it's not expected to return the information. 4422 dictates that extra information should be explained -- 4616 does not include any explanation for extra information. If you pinned Sarama to v0.11, I think Sarama would also fail -- Sarama's v0 enforces no extra data in the response (Sarama actually enforces four null bytes for the v0 version of sasl handshake, but four null bytes corresponds to "no size").

I'll do a bit more checking today and if I can't find anything, I guess it can be fine to ignore the unexpected response... maybe. I'll think about it.

suxiangdong commented 1 year ago

I get it, thanks.

I can temporarily fork the repository to use in my project. If ignoring this unexpected response can be implemented within franz-go, I'm switching back.

twmb commented 1 year ago

It looks like

I can skip the data, I'll release a patch for this

suxiangdong commented 1 year ago

Okay, thanks.

twmb commented 1 year ago

Sorry for the delay, busy lately. I'll see about trying to close this out today.