Closed alucarddelta closed 1 year ago
Did you instrument the celery program which is creating the tasks?
@srikanthccv Yes, in my api I do call the celeryinstrumentor, I have this located in the main.py file which launches the FastAPI application.
from fastapi import FastAPI
from opentelemetry import trace
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from app.core.config import SETTINGS
from app.core.tracer import set_traceing
from app.api.api_v1.api import api_router
from app.api.api_v1.metadata import metadata
app = FastAPI(
title=SETTINGS.PROJECT_NAME_SHORT,
description=SETTINGS.PROJECT_DESCRIPTION,
version=SETTINGS.PROJECT_VERSION,
openapi_tags=metadata,
docs_url="/",
openapi_url="/api/v1/openapi.json",
)
TRACER = set_traceing(__name__)
jaeger_exporter = JaegerExporter(
agent_host_name="localhost",
agent_port=6831,
)
span_processor = BatchSpanProcessor(jaeger_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)
CeleryInstrumentor().instrument()
FastAPIInstrumentor.instrument_app(app, excluded_urls="/api/v1/openapi.json")
app.include_router(api_router, prefix="/api/v1")
Ideally trace context propagation should happen automatically. If you can share a repo with some minimal reproducible application that would greatly help debug the issue.
Hey I'm running into this issue as well, I tried to reproduce it in a sample app I just made (that pretty closely follows the dependency versions of everything) and I'm unable to reproduce it there, but I am experiencing a similar issue in the original codebase I based this off of.
The issue I have in my case though is that the CeleryInstrumentor().instrument()
command is not instrumenting the celery client (but it is doing the worker) which is the reason the spans are not included under the same trace ID.
I will update the above example if I am able to reproduce my issue locally here. Any ideas or suggestions would be greatly appreciated :)
I have created a project demonstrating the issue. https://github.com/alucarddelta/issue_opentelm_celery_linking
Test1 - FastAPI -> Celery Worker -> FastAPI Test2 - FastAPI -> Fast API (checks that FastAPI instrument is working) Test3 - FastAPI -> Celery Worker -> same Celery Worker (different task) -> FastAPI Test4 - FastAPI -> Celery Worker -> different Celery Worker -> FastAPI
In the app1 FastAPI notes, it has details of each of the issues observed.
Example of the same Task1. Where App1 is not linking to "run/no_modification" task and /test1_b does have it linked, but has a disconnected span.
Inside the worker, I'm able to set_attribute on the CeleryInstrumentor span. So it does appear the span is created and is accessible as expected. But the linking to the FastAPI via the CeleryInstrumentor issue remains.
@CELERY.task(name="test_task", bind=True, track_started=True)
def test_task(self: Task, obj_in, **kwargs):
current_span = trace.get_current_span()
current_span.set_attribute("var", obj_in["some_var"])
@alucarddelta I was able to identify and reproduce this issue now in my own example code above and looking over your example code and I think we are experiencing the same issue. (you are using send_task, I'm using signature but I think the net effect is the same based on looking over your example).
what I see happening is that the worker tasks need to be discoverable by the calling service (e.g. fastapi) since it needs the task object itself based on this line
one of the the _trace_before_publish()
checks is to pull the string_id of the task sent and then pull the task object itself from the task registry. it looks like in both examples now the tasks are not imported or shared between modules and only called out to through the broker by string identifier.
I believe this is a bug since celery does support this use case (calling unknown tasks by string identifier) but maybe there is a way to stub the registration as a workaround for now. I can update here if I find more
@goatsthatcode I think I also hit a wall with similar findings. I can't recall specific details exactly as it has been some time I looked into this issue but I was stuck at this point https://github.com/celery/celery/discussions/7189.
I am able to reproduce the issue with this minimalistic snippet:
from celery import Celery, group, signature
BROKER_URL = 'redis://localhost:6379/0'
app = Celery('tasks', broker=BROKER_URL)
@app.task
def foo():
return 'hello foo'
group([signature("tasks.bar"), foo.s()])()
When I instrument and run this code, I get only one "publish" span for the "foo" task. The task published as signature("tasks.bar")
does not generate any "publish" span and does not inject any propagation context into the headers.
I am able to reproduce the issue with this minimalistic snippet:
from celery import Celery, group, signature BROKER_URL = 'redis://localhost:6379/0' app = Celery('tasks', broker=BROKER_URL) @app.task def foo(): return 'hello foo' group([signature("tasks.bar"), foo.s()])()
When I instrument and run this code, I get only one "publish" span for the "foo" task. The task published as
signature("tasks.bar")
does not generate any "publish" span and does not inject any propagation context into the headers.
This is a great example. I'm going to update the above PR to try and adapt this into a unittest since I wasn't sure it would reproduce the issue with the app being initialized in the same module namespace. I guess I agree with the idea that sending a request to task that doesnt even exist should still create a publish span since that is what is happening in the code (even though in this case we never would expect the task to actually complete).
I have recently ran into this behavior and think there may be an additional wrinkle. Specifically I have found that, due to the way CeleryInstrumentor
registers the span context propagation to the before_task_publish
signal, the _trace_before_publish()
callable is only available in contexts where CeleryInstrumentor().instrument()
has actually been executed, ie not on the client if you follow the documented usage to exec on worker_process_init
signal. Interestingly, calling CeleryInstrumentor().instrument()
on the FastAPI client then "resolves" the problem because the client also has the callable registered and therefore it gets executed.
This can be confirmed by starting both the client and worker in a REPL and executing
$ import celery
$ any('_trace_before_publish' in str(r) for r in celery.signals.before_task_publish.receivers)
The client instance (after startup) will return False
while the worker instance returns True
. Once I added the instrumentation call to both the app startup path of the FastAPI client and the worker (via the suggested worker_process_init
signal) then trace contexts were properly joined.
It's exactly the solution we were looking for!! @naormalca It worked for us🤗 Is it known when the new version with this fix is going to be released? We are really looking forward to it! @srikanthccv
Hi,
I have a FastAPI frontend (with FastAPIInstrumentor set up) that sends tasks to a Celery backend using CeleryInstrumentor. However I having issues with linking the traces together to see an end to end request.
When I view the trace information in Jaeger, the trace appears as 2 separate services, an API service and a Celery service. There is no information linking the 2 together to show 1 full service trace.
However I am able to partially correct this. If I send some context information manually to the Celery task, I can then attach the Context to the span inside the worker. This works to a point, as while all child spans are linked back the API, the main CeleryInstrumentor parent span is not updated therefor is not linked.
Am I missing something that would link or at least update the CeleryInstrumentor parent span back to the API spans.
API send celery task:
celery task
Search result Unlinked CeleryInstrumentor Span Linked Spans with Updated Child Context