open-telemetry / opentelemetry-python-contrib

OpenTelemetry instrumentation for Python modules
https://opentelemetry.io
Apache License 2.0
728 stars 600 forks source link

Celery publish from celery creating separate traces #609

Open zionsofer opened 3 years ago

zionsofer commented 3 years ago

Describe your environment python==3.7.4/3.7.6 Platform==Local - MacOS(Darwin-20.5.0-x86_64-i386-64bit), Remote - Linux (many distros) Otel release==0.22b0 (verified on OTEL main as well - same code and same behavior)

Steps to reproduce When running two celery apps where one uses another, when instrumenting the celery apps with OpenTelemetry, the second celery worker creates a span within a separate trace from the first one.

Reproduction example: otel_celery_first.py

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry.instrumentation.celery import CeleryInstrumentor

from celery import Celery
from celery.signals import worker_process_init

@worker_process_init.connect(weak=False)
def init_celery_tracing(*args, **kwargs):
    trace.set_tracer_provider(TracerProvider())
    span_processor = BatchSpanProcessor(ConsoleSpanExporter())
    trace.get_tracer_provider().add_span_processor(span_processor)
    CeleryInstrumentor().instrument()

app = Celery("first", broker="amqp://localhost")

@app.task(name="first.ping")
def ping():
    print("first ping")
    second_app = Celery("second", broker="amqp://localhost")
    second_app.send_task("second.ping", queue="second")

otel_celery_second.py

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry.instrumentation.celery import CeleryInstrumentor

from celery import Celery
from celery.signals import worker_process_init

@worker_process_init.connect(weak=False)
def init_celery_tracing(*args, **kwargs):
    trace.set_tracer_provider(TracerProvider())
    span_processor = BatchSpanProcessor(ConsoleSpanExporter())
    trace.get_tracer_provider().add_span_processor(span_processor)
    CeleryInstrumentor().instrument()

app = Celery("second", broker="amqp://localhost")

@app.task(name="second.ping")
def ping():
    print("second ping")

test_celery.py

from celery import Celery

app = Celery("first", broker="amqp://localhost")

app.send_task("first.ping", queue="first")

Running the workers: celery -A otel_celery_first worker -n first@%h -Q first celery -A otel_celery_second worker -n second@%h -Q second

Sending the task: python test_celery.py

What is the expected behavior? Two spans, with the same trace ID.

First celery worker output:

[2021-07-27 20:35:03,691: WARNING/ForkPoolWorker-8] first ping
{
    "name": "run/first.ping",
    "context": {
        "trace_id": "0x15ac5bd9daf196693bf11a2dc5973763",
        "span_id": "0x83b707a77a03780a",
        "trace_state": "[]"
    },
    "kind": "SpanKind.CONSUMER",
    "parent_id": null,
    "start_time": "2021-07-27T17:35:03.691780Z",
    "end_time": "2021-07-27T17:35:03.724748Z",
    "status": {
        "status_code": "UNSET"
    },
    "attributes": {
        "celery.action": "run",
        "celery.state": "SUCCESS",
        "messaging.conversation_id": "8bc5049e-0ad0-461f-a91b-e2e182ffd30d",
        "messaging.destination": "first",
        "celery.delivery_info": "{'exchange': '', 'routing_key': 'first', 'priority': 0, 'redelivered': False}",
        "messaging.message_id": "8bc5049e-0ad0-461f-a91b-e2e182ffd30d",
        "celery.reply_to": "bf7eae06-9d67-35ba-976c-4e58d3612c7e",
        "celery.hostname": "gen39617@Zion-MacBook-Pro",
        "celery.task_name": "first.ping"
    },
    "events": [],
    "links": [],
    "resource": {
        "telemetry.sdk.language": "python",
        "telemetry.sdk.name": "opentelemetry",
        "telemetry.sdk.version": "1.1.0",
        "service.name": "unknown_service"
    }
}

Second celery worker output:

[2021-07-27 20:35:03,731: WARNING/ForkPoolWorker-8] second ping
{
    "name": "run/second.ping",
    "context": {
        "trace_id": "0x15ac5bd9daf196693bf11a2dc5973763",
        "span_id": "0xa8ff39fb540c86cc",
        "trace_state": "[]"
    },
    "kind": "SpanKind.CONSUMER",
    "parent_id": null,
    "start_time": "2021-07-27T17:35:03.731708Z",
    "end_time": "2021-07-27T17:35:03.733191Z",
    "status": {
        "status_code": "UNSET"
    },
    "attributes": {
        "celery.action": "run",
        "celery.state": "SUCCESS",
        "messaging.conversation_id": "6dead959-c702-4aed-960d-68641bec23e4",
        "messaging.destination": "second",
        "celery.delivery_info": "{'exchange': '', 'routing_key': 'second', 'priority': 0, 'redelivered': False}",
        "messaging.message_id": "6dead959-c702-4aed-960d-68641bec23e4",
        "celery.reply_to": "dbf19329-41f0-32da-9b83-626ea38407f8",
        "celery.hostname": "gen39573@Zion-MacBook-Pro",
        "celery.task_name": "second.ping"
    },
    "events": [],
    "links": [],
    "resource": {
        "telemetry.sdk.language": "python",
        "telemetry.sdk.name": "opentelemetry",
        "telemetry.sdk.version": "1.1.0",
        "service.name": "unknown_service"
    }
}

What is the actual behavior? Two spans, with a different trace ID for each:

First celery worker output:

[2021-07-27 20:35:03,691: WARNING/ForkPoolWorker-8] first ping
{
    "name": "run/first.ping",
    "context": {
        "trace_id": "0x15ac5bd9daf196693bf11a2dc5973763",
        "span_id": "0x83b707a77a03780a",
        "trace_state": "[]"
    },
    # same attributes
}

Second celery worker output:

[2021-07-27 20:35:03,731: WARNING/ForkPoolWorker-8] second ping
{
    "name": "run/second.ping",
    "context": {
        "trace_id": "0x2488345758db06e139f722b4bba08cb3,
        "span_id": "0xa8ff39fb540c86cc",
        "trace_state": "[]"
    },
    # same attributes
}

I would expect a span of apply_async which is also missing from the first worker.

Additional context I believe this has got to do with the _trace_before_publish signal handler:

def _trace_before_publish(self, *args, **kwargs):
    task = utils.retrieve_task_from_sender(kwargs)
    task_id = utils.retrieve_task_id_from_message(kwargs)

    if task is None or task_id is None:
         return

The task is tried to be retrieved from the celery registry by the sender name, which is the task name that we send to. But, the first worker does not explicitly declare the tasks it sends as part of its registry, so the task is not found and thus returns None which causes the function to exit prematurely. Perhaps there's a better to way to handle this?

github-actions[bot] commented 3 years ago

This issue was marked stale due to lack of activity. It will be closed in 30 days.

yossicohn commented 3 years ago

Any news with this one that is problematic for celery usage. any workaround advice?

srikanthccv commented 3 years ago

I don't think there is any work around here other than fixing the bug.

yossicohn commented 2 years ago

@owais, @lonewolf3739 @zionsofer and I found a workaround for this.

Workaround We have found that adding the task definition ("second.ping") to the first Service i.e. adding a task to the worker tasks registry, would enable the _trace_before_publish function to find the task in the registry.

It would be suffice to add for example:

@app.task(name="second.ping")
def ping():
    pass

Now, task = utils.retrieve_task_from_sender(kwargs) can get the task. and from there it would mostly work and the trace would contain both span is expected.

if the workaround is ok, we can update the documentation. as it is a major use case when using celery, since, usually, a worker (first service) publishes a task that would be consumed by another service (second worker).

@owais another thing is the SpanKind that is created for the current processing worker. in the function _trace_prerun I see that the SpanKind=CONSUMER and it seems that SpanKind=SERVER is what should be there. The current resulting Service Map, while using the workaround mentioned above will not show a node for the Second Celery worker, while it should as it is a server in the system. Changing the SpanKind=SERVER fixes that. WDYT ?

owais commented 2 years ago

SpanKind should definitely NOT be SERVER. This looks like a limitation in the APM backend you are using.

I don't think users should have to worry about the workaround. Ideally, instrumenting with Otel should require zero code changes especially to existing non-otel related code. IMO this is a bug in the Celery instrumentation and should be fixed so it works for all users.

blumamir commented 2 years ago

IMO this is a bug in the Celery instrumentation and should be fixed so it works for all users.

@owais - I run into this issue and wondering what the right fix is. I see that now we use the task instance to store a dictionary from task_id to span. Since we store the dict on the task object with setattr, we cannot support str tasks. I wonder if it makes sense to instead store the open spans on a celery-global dictionary (stored as instrumentation property) with key (task_name, task_id, is_publish) in order to resolve this issue.

srikanthccv commented 2 years ago

@blumamir You might want to check this proposed WIP fix by a contributor https://github.com/goatsthatcode/opentelemetry-python-contrib/pull/1. I think there are multiple Github issues created for this same problem.

blumamir commented 2 years ago

@blumamir You might want to check this proposed WIP fix by a contributor goatsthatcode#1. I think there are multiple Github issues created for this same problem.

Thanks! I commented on the PR