DataDog / dd-trace-py

Datadog Python APM Client
https://ddtrace.readthedocs.io/
Other
543 stars 411 forks source link

confluent-kafka distributed tracing #9772

Closed joshverma closed 2 months ago

joshverma commented 3 months ago

Summary of problem

I'm running a producer and a consumer service, each in a separate docker container, using ddtrace-run. I have the following environment variables (in addition to the other required ddtrace env variables):

      - DD_KAFKA_PROPAGATION_ENABLED=true
      - DD_KAFKA_DISTRIBUTED_TRACING_ENABLED=true
      - DD_TRACE_KAFKA_ENABLED=true

Here is my code calling produce():

bytes = pickle.dumps(data)
self.producer.produce(topic=self.topic, value=bytes, callback=type(self).verify_sink(bytes))

This causes an error

ddtrace.internal.utils.ArgumentError: headers (at position 6) is invalid

Which version of dd-trace-py are you using?

v2.9.2

Which version of pip are you using?

v23.0.1

Which libraries and their versions are you using?

`pip freeze` arrow==1.1.1 attrs==23.2.0 bytecode==0.13.0 cattrs==23.1.2 certifi==2024.7.4 charset-normalizer==2.0.12 confluent-kafka==2.4.0 coverage==7.2.7 dataclasses==0.6 dataclasses-json==0.5.4 datadog==0.42.0 ddsketch==3.0.1 ddtrace==2.9.2 Deprecated==1.2.14 envier==0.5.2 exceptiongroup==1.2.1 freezegun==0.3.15 grpcio==1.39.0 grpcio-tools==1.39.0 gtfs-realtime-bindings==0.0.5 idna==3.7 importlib-metadata==6.5.0 iniconfig==2.0.0 marshmallow==3.19.0 marshmallow-enum==1.5.1 mypy-extensions==1.0.0 opentelemetry-api==1.22.0 packaging==24.0 pendulum==2.0.5 pluggy==0.13.1 protobuf==3.20.3 py==1.11.0 pytest==6.2.4 pytest-cov==2.12.1 python-dateutil==2.9.0.post0 python-json-logger==2.0.2 pytzdata==2020.1 requests==2.26.0 six==1.16.0 stringcase==1.2.0 toml==0.10.2 typing-inspect==0.9.0 typing_extensions==4.7.1 urllib3==1.26.19 wrapt==1.16.0 xmltodict==0.12.0 zipp==3.15.0

How can we reproduce your problem?

Bring up a consumer and producer with the environment variables mentioned above. Run with ddtrace-run and call produce.

What is the result that you get?

When ddtrace intercepts the producer sending messages to the Kafka topic, I get the error:

File "/usr/local/lib/python3.7/site-packages/ddtrace/contrib/kafka/patch.py", line 201, in traced_produce
    args, kwargs = set_argument_value(args, kwargs, 6, "headers", headers)
File "/usr/local/lib/python3.7/site-packages/ddtrace/internal/utils/__init__.py", line 71, in set_argument_value
    raise ArgumentError("%s (at position %d) is invalid" % (kw, pos))

What is the result that you expected?

I thought that running the application with ddtrace-run and the above environment variables will automatically instrument distributed tracing. My question is, is there anything else that needs to be done to remedy this error? Do I need to manually create a span or send a header in the Kafka message?

Thanks in advance!

joshverma commented 2 months ago

@brettlangdon @wconti27, do I still need to manually activate the context like:

  with tracer.trace("kafka.produce",
                              service="service") as span:
                tracer.sample(span)
                headers = {
                    'x-datadog-trace-id': str(span.context.trace_id),
                    'x-datadog-parent-id': str(span.context.span_id),
                    'x-datadog-sampling-priority': str(span.context.sampling_priority)
                }

                # Sends headers in message to Kafka topic for distributed tracing
                self.producer.produce(topic=self.topic,
                                      value=bytes,
                                      callback=type(self).callback_func(bytes),
                                      headers=headers)

Or will the headers automatically be injected with the trace context after setting one of these environment variables?

- DD_KAFKA_PROPAGATION_ENABLED=true
- DD_KAFKA_DISTRIBUTED_TRACING_ENABLED=true