google / go-cloud

The Go Cloud Development Kit (Go CDK): A library and tools for open cloud development in Go.
https://gocloud.dev/
Apache License 2.0
9.45k stars 799 forks source link

kafkapubsub topicName merge wrong #3413

Closed ilvelhs closed 3 months ago

ilvelhs commented 3 months ago
// OpenTopicURL opens a pubsub.Topic based on u.
func (o *URLOpener) OpenTopicURL(ctx context.Context, u *url.URL) (*pubsub.Topic, error) {
    for param, value := range u.Query() {
        switch param {
        case "key_name":
            if len(value) != 1 || len(value[0]) == 0 {
                return nil, fmt.Errorf("open topic %v: invalid query parameter %q", u, param)
            }

            o.TopicOptions.KeyName = value[0]
        default:
            return nil, fmt.Errorf("open topic %v: invalid query parameter %q", u, param)
        }
    }

    topicName := path.Join(u.Host, u.Path)
    return OpenTopic(o.Brokers, o.Config, topicName, &o.TopicOptions)
}

why topicName append u.Host to head

then: url = kafka://localhost:9092/a/b/c, topicName = localhost:9092/a/b/c ???

vangent commented 3 months ago
// URLOpener opens Kafka URLs like "kafka://mytopic" for topics and
// "kafka://group?topic=mytopic" for subscriptions.
//
// For topics, the URL's host+path is used as the topic name.
// The default URL opener will connect to a default set of Kafka brokers based
// on the environment variable "KAFKA_BROKERS", expected to be a comma-delimited
// set of server addresses.

So, you shouldn't put the server hostname in the URL, put it (or them) in the KAFKA_BROKERS environment variable.

ilvelhs commented 3 months ago

i call pubsub.go OpenTopic with urlstr=kafka://locahost:9092/mytopic,can't change url in kafka OpenTopicURL func

// OpenTopic opens the Topic identified by the URL given.
// See the URLOpener documentation in driver subpackages for
// details on supported URL formats, and https://gocloud.dev/concepts/urls
// for more information.
func OpenTopic(ctx context.Context, urlstr string) (*Topic, error) {
    return defaultURLMux.OpenTopic(ctx, urlstr)
}
vangent commented 3 months ago

I do not understand the question. You should be calling with OpenTopic with urlstr = kafka://mytopic. The localhost:90902, if that is where your Kafka broker is running, should be set in the environment variable.

ilvelhs commented 3 months ago

Nail it, thanks!