Open jianyuan opened 6 years ago
Thank you for your feedback. Celery support is in our backlog. Please feel free to share more information or submit pull requests.
I've done something to work with celery but it is not finished yet.
X_RAY_HEADER_KEY = "_x_ray_parent_id"
@celery.task
@app_context
def ping():
return "pong"
@signals.before_task_publish.connect()
def push_xray_headers(headers=None, **kwargs): # noqa pylint: disable=unused-argument
"""If a new task is called within a segment we send it as a header.
This also works if a task is called from a web worker which uses one of the provided middlewares.
"""
current_segment = xray_recorder.current_segment()
if isinstance(current_segment, Segment):
headers[X_RAY_HEADER_KEY] = current_segment.trace_id
@signals.task_prerun.connect()
def pull_xray_headers(**kwargs): # noqa pylint: disable=unused-argument
"""Before the task run, we check if someone sent an x ray header and pull it.
Strange enough, it won't be inside task.request.headers (which is always None), the headers live in task.request.
After we get the header we start a new segment with it.
"""
task: Task = celery.current_worker_task
parent_id = task.request.get(X_RAY_HEADER_KEY, None)
xray_recorder.begin_segment(f"celery-task-{task.__name__}", parent_id=parent_id)
@signals.task_postrun.connect()
def end_xray_segment(state=None, retval=None, **kwargs): # noqa pylint: disable=unused-argument
"""In case there is a running x ray segment, end it and if there was an exception, attach it."""
segment: Optional[Segment] = xray_recorder.current_segment()
if not segment:
return
if state == CELERY_FAILURE:
if retval.__traceback__:
extracted_traceback = traceback.extract_tb(retval.__traceback__)
else:
extracted_traceback = []
segment.add_exception(retval, extracted_traceback)
xray_recorder.end_segment()
One thing that was not optimal is related to the service name. On my main module I configured xray with these parameters:
aws_xray_sdk.core.xray_recorder.configure(
service=os.environ.get("AWS_XRAY_TRACING_NAME", "Name not defined"),
context_missing="LOG_ERROR",
sampling=config("ENABLE_AWS_XRAY", cast=config.boolean, default=False),
)
but instead of showing the tasks' execution inside service's name it always shows inside individual tasks using the same name as the begin_segment.
like this one
As I understood it should show as configured in service CaveCeleryQa1
in this case but only the segment name is there.
I got it working by using aws_xray_sdk.ext.util.construct_xray_header
and aws_xray_sdk.ext.util.inject_trace_header
.
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core.utils import stacktrace
from aws_xray_sdk.ext.util import construct_xray_header, inject_trace_header
from celery import signals
def xray_before_task_publish(**kwargs):
task_name = kwargs.get('sender')
headers = kwargs.get('headers', {})
body = kwargs.get('body', {})
task_id = headers.get('id') or body.get('id') # Celery 3/4 support
subsegment = xray_recorder.begin_subsegment(
name=task_name,
namespace='remote',
)
if subsegment is None:
# Not in segment
return
subsegment.put_metadata('task_id', task_id, namespace='celery')
if headers:
inject_trace_header(headers, subsegment)
def xray_after_task_publish(**kwargs):
xray_recorder.end_subsegment()
def xray_task_prerun(**kwargs):
task = kwargs.get('sender')
task_id = kwargs.get('task_id')
xray_header = construct_xray_header(task.request.headers)
segment = xray_recorder.begin_segment(
name=task.name,
traceid=xray_header.root,
parent_id=xray_header.parent,
)
segment.save_origin_trace_header(xray_header)
segment.put_metadata('task_id', task_id, namespace='celery')
def xray_task_postrun(**kwargs):
xray_recorder.end_segment()
def xray_task_failure(**kwargs):
einfo = kwargs.get('einfo')
segment = xray_recorder.current_segment()
if einfo:
stack = stacktrace.get_stacktrace(limit=xray_recorder._max_trace_back)
segment.add_exception(einfo.exception, stack)
def connect_celery_signal_receivers():
signals.task_prerun.connect(xray_task_prerun)
signals.task_postrun.connect(xray_task_postrun)
signals.task_failure.connect(xray_task_failure)
signals.before_task_publish.connect(xray_before_task_publish)
signals.after_task_publish.connect(xray_after_task_publish)
@jianyuan That looks nice. Just one correction, on version 4 the headers are not on task.request.headers
but on task.request
, my exact version is 4.2.1.
Thank you for providing all these examples @jaysonsantos @jianyuan. We're happy to work with you to add this change to our SDK if you have time to submit a PR. As mentioned before this library is also in our backlog so if you don't have time for the PR, please let us know if we can re-use some of the work you've done.
Any news on that?
Any updates?
Updated the @jianyuan's code. Tested on Celery 5.3.1
@signals.before_task_publish.connect
def xray_before_task_publish(sender, headers, **kwargs):
task_name = sender
task_id = headers.get("id")
subsegment = xray_recorder.begin_subsegment(
name=task_name.split(".")[-1],
namespace="remote",
)
if subsegment is None:
return
subsegment.put_metadata("task_id", task_id, namespace="celery")
if headers:
inject_trace_header(headers, subsegment)
@signals.after_task_publish.connect
def xray_after_task_publish(**kwargs):
xray_recorder.end_subsegment()
@signals.task_prerun.connect
def xray_task_prerun(task_id, task, **kwargs):
xray_header = construct_xray_header(task.request.headers)
segment = xray_recorder.begin_segment(
name=task.name.split(".")[-1],
traceid=xray_header.root,
parent_id=xray_header.parent,
)
segment.save_origin_trace_header(xray_header)
segment.put_metadata("task_id", task_id, namespace="celery")
@signals.task_postrun.connect
def xray_task_postrun(**kwargs):
xray_recorder.end_segment()
@signals.task_failure.connect
def xray_task_failure(einfo, **kwargs):
segment = xray_recorder.current_segment()
if einfo:
stack = stacktrace.get_stacktrace(limit=xray_recorder.max_trace_back)
segment.add_exception(einfo.exception, stack)
Is there any update on this?
Celery exposes some useful signals in which the SDK can hook onto to propagate the trace.
References: