cloudevents / sdk-go

Go SDK for CloudEvents
https://cloudevents.github.io/sdk-go/
Apache License 2.0
810 stars 217 forks source link

Need to wrap `sarama.AsyncProducer` for tracing (OpenTelemetry/Datadog) #745

Open rahulchheda opened 2 years ago

rahulchheda commented 2 years ago

I need to wrap the sarama.AsyncProducer, which is an un-exported field in Sender struct. Is this possible to do?

n3wscott commented 2 years ago

We typically have wrapped the Sender and pull metrics off of the Sender api. See the metrics folder https://github.com/cloudevents/sdk-go/tree/main/observability/opentelemetry/v2

Now, if this is not going to work for you, I would like to know and we can work out a way to make this work for your use-case. Thanks!!

rahulchheda commented 2 years ago

So, I need to use Datadog's APM i.e: https://github.com/DataDog/dd-trace-go/blob/v1/contrib/Shopify/sarama/example_test.go. How can I use this w/ cloudevents?

n3wscott commented 2 years ago

Oh yes I see, you need to be able to wrap the producer directly...

producer = saramatrace.WrapAsyncProducer(cfg, producer)

ok! let's take a look here.

n3wscott commented 2 years ago

Oh hey, looks like if you make the producer directly you can make this work with the cloudevents lib via https://github.com/cloudevents/sdk-go/blob/c80789d5488a078e05dc0db1a05f6562aca8168c/protocol/kafka_sarama/v2/sender.go#L45, example:

    cfg := sarama.NewConfig()
    cfg.Version = sarama.V0_11_0_0 // minimum version that supports headers which are required for tracing

    producer, err := sarama.NewAsyncProducer([]string{"localhost:9092"}, cfg)
    if err != nil {
        panic(err)
    }
    defer producer.Close()

    producer = saramatrace.WrapAsyncProducer(cfg, producer)

    // Not this way:  sender, err := kafka_sarama.NewSender([]string{"127.0.0.1:9092"}, saramaConfig, "test-topic")

    // this way, create the cloudevents protocol with a pre-created sender, already wrapped.
    cesender, err := kafka_sarama.NewSenderFromSyncProducer("test-topic", producer)
    if err != nil {
        log.Fatalf("failed to create protocol: %s", err.Error())
    }

    defer cesender.Close(context.Background())

    c, err := cloudevents.NewClient(cesender, cloudevents.WithTimeNow(), cloudevents.WithUUIDs())
    if err != nil {
        log.Fatalf("failed to create client, %v", err)
    }

this example was me combining https://github.com/DataDog/dd-trace-go/blob/v1/contrib/Shopify/sarama/example_test.go and https://github.com/cloudevents/sdk-go/blob/main/samples/kafka/sender/main.go using the NewSenderFromSyncProducer method.

Does that get you unblocked?

rahulchheda commented 2 years ago

Got it, thanks @n3wscott. Closing this one out. :)

rahulchheda commented 2 years ago

@n3wscott So, this worked for wrapping up Producer, any ideas about wrapping Consumer w/ this?

n3wscott commented 2 years ago

I am not 100% sure, but I think you use this: https://pkg.go.dev/go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama#WrapConsumer

rahulchheda commented 2 years ago

So, I tried this, but actually I need a cloudevents Consumer instance rather than sarama consumer, and I can create a new cloudevents consumer w/ sarama consumer