segmentio / kafka-go

Kafka library in Go
MIT License
7.51k stars 776 forks source link

Guide how to use Client and why it is public #578

Closed sunshine69 closed 3 years ago

sunshine69 commented 3 years ago

Describe the solution you'd like A new pr merged recently allow the use the kafka.Client but the use is confusing.

I am trying to use the new Client to do something please see the code snippet below

func playground() {
    ctx := context.Background()
    MyAddr := net.TCPAddr{
        IP:   []byte("10.22.21.21"),
        Port: 9092,
    }
    timeout, _ := time.ParseDuration("5s")
    client := kafka.Client{
        Addr:    &MyAddr,
        Timeout: timeout,
    }

    dcreq := kafka.DescribeConfigsRequest{
        Addr: &MyAddr,
        Resources: []kafka.DescribeConfigRequestResource{{
            ResourceType: kafka.ResourceTypeTopic,
            ResourceName: "de21197.datafabric.api.request.logic.orchestrator",
        }},
    }
    dcres, err := client.DescribeConfigs(ctx, &dcreq)
    if err != nil {
        log.Fatalf("error %v\n", err)
    }
    fmt.Printf("%v\n", dcres)
}

That code does not trigger any build failure. Means good? But no. Run it I got

stevek@work 10:56 ~/a/p/c/d/g/k/backup-topic> go test -run '^TestBackupTopics$' -v
=== RUN   TestBackupTopics
2020/12/27 11:02:06 error kafka.(*Client).DescribeConfigs: [8] Broker Not Available: not a client facing error and is used mostly by tools when a broker is not alive
exit status 1
FAIL    backup-topic    0.004s

So it looks like the Client is constructed but it never call Dial when we use it. That is confusing because if you allow to use of the Client alone it should be able to do it.

In short if my code compiles then it should work. Otherwise hide it as private and prevent the use of it in the first instance.

So I modify the code into this below

func playground() {
    ctx := context.Background()
    conn, _ := kafka.DialContext(ctx, "tcp", "10.22.21.21")
    defer conn.Close()

    MyAddr := conn.RemoteAddr()
    timeout, _ := time.ParseDuration("5s")
    client := kafka.Client{
        Addr:    MyAddr,
        Timeout: timeout,
    }

    dcreq := kafka.DescribeConfigsRequest{
        Addr: MyAddr,
        Resources: []kafka.DescribeConfigRequestResource{{
            ResourceType: kafka.ResourceTypeTopic,
            ResourceName: "de21197.datafabric.api.request.logic.orchestrator",
        }},
    }
    dcres, err := client.DescribeConfigs(ctx, &dcreq)
    if err != nil {
        log.Fatalf("error %v\n", err)
    }
    fmt.Printf("%v\n", dcres)
}

And it seems to do something whihc is expected.

stevek@work 11:02 ~/a/p/c/d/g/k/backup-topic> go test -run '^TestBackupTopics$' -v
=== RUN   TestBackupTopics
&{0s [{2 de21197.datafabric.api.request.logic.orchestrator [3] Unknown Topic Or Partition: the request is for a topic or partition that does not exist on this broker []}]}
--- PASS: TestBackupTopics (4.24s)
PASS
ok      backup-topic    4.249s

Now I don't know if this is the correct usage of Client I am not sure if it is correct?

Supporting documentation

achille-roussel commented 3 years ago

Hello @sunshine69!

Your usage looks correct, and the errors you are getting appear to be runtime errors, which would indicate that there is a problem with the configuration of the Kafka cluster.

Did the de21197.datafabric.api.request.logic.orchestrator topic exist in the cluster you were connecting to?

sunshine69 commented 3 years ago

Hi,

the last code is expected as topic does not really exists. the complain is that why the first code snippet does not work?

so if I construct an address and pass to client the client does not auto dial to the server if connection is not yet establish. this is confusing, I would think it should do that as the first code is more intuitive.

the second code works as we already connect using that address from the connection and pass to client. That is a bit not intuitive.

my opinion is we should fix it in the first case so it works. but I move to another project now so have no time for it.

thanks a lot for your reply - all the best.