Closed Etienne-Carriere closed 1 year ago
Thanks for opening your first issue here! Be sure to follow the issue template!
How is Opentracing / Opentelemetry different from Sentry? We have native integration with Sentry and I wonder if we should combine these two features or develop it independently. http://apache-airflow-docs.s3-website.eu-central-1.amazonaws.com/docs/apache-airflow/latest/logging-monitoring/errors.html
Hello @mik-laj , thank you for the link about sentry. It will helps to design a hook (in a first time in an external repo and perhaps merged in a second time) Opentracing/Opentelemetry is an open-source standard for APM (in the same level of sentry)
Hello @Etienne-Carriere @mik-laj -> we started recently seeing those annoying logs in Airflow 2.0.
{opentelemetry_tracing.py:29} INFO - This service is instrumented using OpenTelemetry. OpenTelemetry could not be imported; please add opentelemetry-api and opentelemetry-instrumentation packages in order to get BigQuery Tracing data.
Is this something you know about ?
@potiuk Looks like it's from something BigQuery related.
I know @mjpieters was playing/hacking about with adding opentracing to Airflow too
Yeah. It can likely be disabled by adding some custom logging configuration - possibly that's what we should do I think
Added #13131
👍 For this feature.
@mik-laj
I created a little test script to illustrate building a trace from a dag run:
https://github.com/dm03514/airflow-tracer/blob/main/README.md
Tracing is a different type of telemetry (alongside statsd and sentry). It provides a "Gantt" Chart view into a dag. Some tracing providers (like LIghtstep) provide advanced dashboarding and alerting.
The POC project above builds a trace by querying airflows database. I'm hoping that there is a way to add tracing support inside airflow so that when dags are being executed they can emit open telemetry traces!
With tracing each task span needs a reference to the global span and i'm not sure how easy / hard this is to share data across tasks?
Start the root span when the dagrun starts:
root_span = tracer.start_span(
name=dag.dag.dag_id
)
Start a task specific span when each task starts:
ctx = trace.set_span_in_context(root_span)
span = tracer.start_span(
name=ti.task_id,
context=ctx
)
... # invoke task
span.end() # task is complete
Close the root span when all tasks are complete
root_span.end()
def main(cli):
dag = serialized_dag.SerializedDagModel.get(dag_id=cli.dag_id)
dagrun = dag.dag.get_dagrun(execution_date=cli.execution_date)
tis = dagrun.get_task_instances()
root_span = tracer.start_span(
name=dag.dag.dag_id,
start_time=dt_to_ns_epoch(dagrun.start_date)
)
root_span.end(end_time=dt_to_ns_epoch(dagrun.end_date))
for ti in tis:
ctx = trace.set_span_in_context(root_span)
span = tracer.start_span(
name=ti.task_id,
context=ctx,
start_time=dt_to_ns_epoch(ti.start_date),
)
span.set_attribute('airflow.state', ti.state)
span.set_attribute('airflow.operation', ti.operator)
if ti.job_id is not None:
span.set_attribute('airflow.job_id', ti.job_id)
if ti.state != 'success':
span.set_attribute('error', True)
span.end(end_time=dt_to_ns_epoch(ti.end_date))
thank you!
Interesting little test; but note it generates a trace from the tasks after the fact without support for additional spans created inside each task invocation, and you the tracing context is not shared overy any RPC / REST / etc. calls to other services to inherit the tracing context.
I integrated Jaeger tracing (opentracing) support into a production Airflow setup using Celery. Specific challenges I had to overcome:
The dagrun span is 'virtual', in that it exists from when the dagrun is created until the last task is completed. The scheduler will then update the dagrun state in the database. But tasks need a valid parent span to attach their own spans to.
I solved this by creating a span in a monkey-patch to DAG.create_dagrun()
, injecting the span info in to the dagrun configuration together with the start time, then discarding the span. Then, in DagRun.set_state()
, when the state changes to a finished state, I create a jaeger_client.span.Span()
object from scratch using the dagrun conf-stored data, and submit that to Jaeger.
Tasks inherit the parent (dagrun) span context from the dagrun config; I patched Task.run_raw_task()
to run the actual code under a tracer.start_active_span()
context manager. This captures timing and any exceptions.
You need an active tracer for traces to be captured and sent on to the tracer agent. So I registered code to run in the cli_action_loggers.register_pre_exec_callback()
hook when the scheduler
or dag_trigger
sub-commands run, which then registers a closer with cli_action_loggers.register_post_exec_callback
. Closing a tracer in dag_trigger
takes careful work with the asyncio / tornado loop used by the Jaeger client, you'll lose traces if you don't watch out. I found that you had to go hunt for the I/O loop attached to the trace reporter object and call tracer.close()
from a callback sent to that loop as the only fail-proof method of getting the last traces out. I don't know if opentracing needs this level of awareness of the implementation details.
You generally want to start a trace when the Airflow webserver receives a trigger, so instrument the Flask layer too. This is where the sampling decision needs to be taken too; tracing often only instruments a subset of all jobs, but in this project I set the sampling frequency to 100% at this level.
I added a custom configuration section to airflow.cfg to allow me to tweak Jaeger tracing parameters.
But, with that work in place, we now get traces in mostly real time, with full support for tracing contexts being shared with other services called from Airflow tasks. We can trace a job through the frontend, submitting a job to Airflow, then follow any calls from tasks to further REST APIs, all as one system.
I'd prefer it if the tracing context was not shoehorned into the dagrun configuration; I'd have created additional database tables or columns for this in the Airflow database model if I had to do this inside the Airflow project itself.
Note that I did not use the Celery task hooks here to track the task spans, because Celery has its own overhead that we wanted to keep separate. The opentracing_instrumentation
already has Celery hooks you can use, but I needed the timings to be closer to the actual task invocation.
Anothing thing to consider is per-DAG or per-task tagging you want to add to the spans. For this project I needed to track specific data from the submitted DAG config so we can compare task runs using the same input configuration.
@mjpieters can you share any of the code at all? I think I understand what you're saying but I think also I would be rediscovering problems you have already solved without the code.
@WattsInABox the code is part of a private project, and the majority is either project specific or specific to the jaeger_client
implementation, neither of which apply here. E.g. the custom configuration section and how to close the client are tied very closely to the jaeger_client
codebase, as is how you create a jaeger_client.span.Span()
instance from scratch.
I can see about talking to the client about scrubbing the project-specific details from the code, however.
What's a good place to set up instrumentation and exporting in the various components: scheduler, webserver, workers - ?
Indeed I think OpenTelemetry integration might be a good idea. I've heard (CC: @subashcanapathy) that Amazon has been very involved in OpenTelemetry, so maybe that's a possibility that somehow Amazon team can help with that?
@potiuk what do you think injecting this under theses modules ? airflow.hooks.base airflow.sensors.base airflow.models.baseoperator
@dm03514 thanks, i tried your test code, it is exporting spans successfully. But exporting spans directly from tasks would be much useful in terms of tracing different operations like hook execution, api calls etc.
@mjpieters I tried exporting span from a airflow task:
packages: apache-airflow==2.1.3 opentelemetry-api==1.9.1 opentelemetry-sdk==1.9.1 opentelemetry-exporter-otlp==1.9.1 opentelemetry-instrumentation==0.28b1
env variables related to opentelemetry: OTEL_TRACES_EXPORTER=otlp OTEL_EXPORTER_OTLP_ENDPOINT="http://localhost:55680" OTEL_RESOURCE_ATTRIBUTES="service.name=test_airflow_worker" OTEL_TRACES_SAMPLER="always_on"
Airflow task:
from airflow.models import BaseOperator
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
class TracingTestOperator(BaseOperator):
def execute(self, context):
resource = Resource(attributes={
"service.name": "test_airflow_worker"
})
trace.set_tracer_provider(TracerProvider(resource=resource))
tracer = trace.get_tracer(__name__)
otlp_exporter = OTLPSpanExporter(endpoint="http://localhost:55680", insecure=True)
span_processor = BatchSpanProcessor(otlp_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)
with tracer.start_as_current_span("test_task_span"):
print("Hello Airflow!")
Above code is not exporting spans to the collector, even though collector (otlp) is up. Strangely, same code is exporting span successfully when I run it as standalone python (directly invoking execute method).
We are just about to formalize all the open-telemetry experiences and proposal into an AIP (Airlfow Improvement Proposal). Stay tuned for a draft AIP proposal on that one.
sure, I would be happy to contribute in this.
I'm using OpenTelemetry with Celery on our worker processes, so maybe this will help.
We have a python module that hooks into the Celery worker processes directly via a Signal as suggested by the OpenTelemetry Celery docs.
For us to hook into Celery from Airflow, we have a Python module that re-exports Airflow's DEFAULT_CELERY_CONFIG
, provided the environment variable AIRFLOW__CELERY__CELERY_CONFIG_OPTIONS
. This helps us load our own module to inject the Celery signal when prior to the Celery app is being created. We then instrument the worker processes with whatever OpenTelemetry providers we need in a worker_process_init
signal.
my_app/tracing.py
from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG
from celery.signals import worker_process_init
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.instrumentation.grpc import GrpcInstrumentorClient
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import sampling
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
CELERY_CONFIG = DEFAULT_CELERY_CONFIG
@worker_process_init.connect(weak=False) # type: ignore
def instrument_worker(*args: typing.Any, **kwargs: typing.Any) -> None:
tracer_provider = TracerProvider(
resource=Resource(attributes={'service.name': 'my-service'}),
sampler=sampling.ALWAYS_ON
)
tracer_provider.add_span_processor(SimpleSpanProcessor(OTLPSpanExporter()))
trace.set_tracer_provider(tracer_provider)
GrpcInstrumentorClient().instrument()
RequestsInstrumentor().instrument()
Our worker process is then started with the following configuration option:
export AIRFLOW__CELERY__CELERY_CONFIG_OPTIONS=my_app.tracing.CELERY_CONFIG
For parent context's when executing spans, we embed a OpenTelemetry trace ID into the DAG run configuration from whatever service is executing the DAG. Operators simply pull that trace ID out of the DAG run configuration and start their own trace contextmanager.
There is however two strange things that occur with this.
BatchSpanProcessor
as this runs in a separate thread queuing up spans, but it seems that not all the spans were being exported to our OTLP collector and records were missing. Using the SimpleSpanProcessor
did not have that issue.OTLPSpanExporter
may cause the following errors below. I haven't been able to pin-point why that occurs, but the ConsoleSpanExporter
does not suffer from that issue. It doesn't seem like any tasks are failing and largely this seems to occur on Workers that have no tasks being currently ran. Celery's worker_process_init
is limited to a 4-second blocking call, so maybe it's a startup timing issue on the Worker process that causes it? I attempted to increase Celery's timeout (worker_proc_alive_timeout) there to something higher like 20 seconds and still saw the same issue though.[2022-02-07 20:34:21,364: ERROR/MainProcess] Process 'ForkPoolWorker-32' pid:3506 exited with 'signal 11 (SIGSEGV)' [2022-02-07 20:34:21,421: ERROR/MainProcess] Task handler raised error: WorkerLostError('Worker exited prematurely: signal 11 (SIGSEGV) Job: 245.')
Hi ,
I want to trace dags status in open telemetry. I am not able to export traces on open telemetry collector. I hope @aa3pankaj also faced same issue.
Please let me know if any solution for this.
Thanks
See https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-49+OpenTelemetry+Support+for+Apache+Airflow - OpenTelemetry is not implemented in Airflow. There are some ways you can try to autoinstrument Airflow but it has limitations and you have to wait for the https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-49+OpenTelemetry+Support+for+Apache+Airflow to be completed (or maybe even contribute in implementing it once it is approved)
Hi folks! Just curious: is there a way we could auto instrument Airflow through the use of cluster policies? For example, what if we created a task policy that a wraps the callback and execute methods of a task? The wrapper could perform actions similar to the Celery example above.
I'm just mindful that our observability/telemetry concerns can be very specific. And that makes me worry that the generic opentelemetry framework outlined here may not fit all our needs. Either way, though, the changes proposed seem quite substantial and that makes me wonder if there's a more lightweight first step we can make in that direction.
Would love to hear folks' thoughts/feedback!
Hi folks! Just curious: is there a way we could auto instrument Airflow through the use of cluster policies? For example, what if we created a task policy that a wraps the callback and execute methods of a task? The wrapper could perform actions similar to the Celery example above.
You could use cluster policies, yes. But also you have (in Airlfow 2.5.0) listeners that you could use for that purpose. https://airflow.apache.org/docs/apache-airflow/stable/listeners.html - they were designed for a different purpose, but I believe you can go a long way by utilising these two mechanisms if you think the "standard" approach of Airlfow will not be good enough for you. Airlow in it's "2" line (now at 2.5) is more and more "platform" that you can extend in various ways and we are approaching it in the way that you can write your own extensions by plugging in your code with well-defined APIs in the way that will allow you to customize it heavily, while relying on the fact that you will be able to upgrade Airflow without losing your customizations. We have a few documentation updates coming in 2.5.1 specifically targetting to make it clearer what you CAN and what you CAN't modify in this way. Both cluster policies and listeners are "public interface" of Airlow and you can utilize them for that purpose.
@dm03514 thanks, i tried your test code, it is exporting spans successfully. But exporting spans directly from tasks would be much useful in terms of tracing different operations like hook execution, api calls etc.
@mjpieters I tried exporting span from a airflow task:
packages: apache-airflow==2.1.3 opentelemetry-api==1.9.1 opentelemetry-sdk==1.9.1 opentelemetry-exporter-otlp==1.9.1 opentelemetry-instrumentation==0.28b1
env variables related to opentelemetry: OTEL_TRACES_EXPORTER=otlp OTEL_EXPORTER_OTLP_ENDPOINT="http://localhost:55680" OTEL_RESOURCE_ATTRIBUTES="service.name=test_airflow_worker" OTEL_TRACES_SAMPLER="always_on"
Airflow task:
from airflow.models import BaseOperator from opentelemetry import trace from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor class TracingTestOperator(BaseOperator): def execute(self, context): resource = Resource(attributes={ "service.name": "test_airflow_worker" }) trace.set_tracer_provider(TracerProvider(resource=resource)) tracer = trace.get_tracer(__name__) otlp_exporter = OTLPSpanExporter(endpoint="http://localhost:55680", insecure=True) span_processor = BatchSpanProcessor(otlp_exporter) trace.get_tracer_provider().add_span_processor(span_processor) with tracer.start_as_current_span("test_task_span"): print("Hello Airflow!")
Above code is not exporting spans to the collector, even though collector (otlp) is up. Strangely, same code is exporting span successfully when I run it as standalone python (directly invoking execute method).
Hi @aa3pankaj , I have the same problem, did you manage to solve it?
Hello @arodrber, instead of BatchSpanProcessor try using SimpleSpanProcessor once.
FYI. Suppport and POC on how to do the OpenTelemetry integration is already aproved https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-49+OpenTelemetry+Support+for+Apache+Airflow and @ferruzzi works on integration.
Thanks for that update, sorry I didn't do it myself. :+1:
Current state, if anyone cares:
I have a Breeze dev environment that launches Otel-Collector, Prometheus, and Grafana and configures them all to communicate here. On the Airflow side, I have (I believe) metrics parity with StatsD working,m but it only prints the metrics to the console at the moment, I'm still sorting out why it won't emit them to Otel-Collector. But once I get that connection sorted out I THINK I have a working POC with full StatsD feature parity and the option to choose between them using Airflow config.
Once I have that working and verified, I'll put an email in the dev list and see if there are any metrics we'd like added while I'm in there, and start on implementing Spans.
I have been away on vacation for the last week-ish and just getting caught up a bit, but I expect to have the POC complete and published on Monday I hope, barring nay other unforeseen hurdles.
Thanks for that update, sorry I didn't do it myself. 👍
Current state, if anyone cares:
I have a Breeze dev environment that launches Otel-Collector, Prometheus, and Grafana and configures them all to communicate here. On the Airflow side, I have (I believe) metrics parity with StatsD working,m but it only prints the metrics to the console at the moment, I'm still sorting out why it won't emit them to Otel-Collector. But once I get that connection sorted out I THINK I have a working POC with full StatsD feature parity and the option to choose between them using Airflow config.
Once I have that working and verified, I'll put an email in the dev list and see if there are any metrics we'd like added while I'm in there, and start on implementing Spans.
I have been away on vacation for the last week-ish and just getting caught up a bit, but I expect to have the POC complete and published on Monday I hope, barring nay other unforeseen hurdles.
Hi @ferruzzi - were you able to publish the POC? Would love to see how you instrumented metrics. I am instrumenting OTEL in Airflow now, exporting to Google Cloud Trace for dashboarding. Thanks a bunch!
Hey. I am still actively working on integrating OTel and have a few groundwork PRs merged. I am hoping to have a PR up to get all current StatsD Counters working next week.
@CodeIronChef - This is the current state, if you want to see where I am at. It currently emits only Counters, and you can see and graph them in Grafana and all that, but calls them Gauges for some reason. Still definitely a WIP. I'm going to get that sorted out as far as why it thinks they are Gauges, the move on to implement actual Gauges and Timers.
@ferruzzi Hi, maybe you can tell me the current status of integration with airflow? anyway thanks for the input
Airflow 2.7 will have full support for OTel metrics at feature parity with StatsD. Traces, Spans, and Logging are coming at a future date.
If you have a dev branch checked out, you can start playing with it now. Documentation (pre-release) is here.
There are a few known issues with missing metrics here if anyone has time and inclination to look into one.
@ferruzzi Given that Airflow 2.7 is now available, can we close this?
IMHO I believe this is a beginning of the journey - we have now just metrics produced via OTEL but there is a lot more to it: https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-49+OpenTelemetry+Support+for+Apache+Airflow (integrating OTEL metrics is just the first step of the first phase).. I am much more excited about Traces though , which is more what the issue is more about... But ssh
... there is a talk coming at Airflow Summit https://airflowsummit.org/sessions/2023/apache-airflow-and-opentelemetry/ ... so I guess we learn more then :D
@potiuk Totally agree. I guess I was just wondering if we still need to have this open given that there's now a platform to build on compared to when it was just a vague hope back when this was raised in Dec 2020. I'm sucker for trying to shrink backlogs!
Yeah, I'm with @potiuk. At an absolute minimum, I think we need Traces implemented before we can consider this complete.
There have been a few people asking about it already, I'm going to take some time to put together a Project/Issue with some subtasks in case anyone wants to take a small bite out of it. I'm going to be camping for the next two weeks, so it won't be until after that. As always, if someone else gets to it before that, that would be welcome.
I think we should close this issue infavor of dedicated, well defined scoped tasks.
This issue speaks about motivation and possible solutions. We already have that covered with AIP-49. What we need is AIP-49 project to open relevant tasks with enough information so the community can start pick up tasks.
I believe this specific issue has no additional value at thia point
Fine for me :). yeah. I agree it is superfluous.
Description
I would like to instrument airflow with tracing technology like Opentracing (https://opentracing.io/) / Opentelemetry (https://opentelemetry.io/)
Use case / motivation
The motivation are to :
Implementation ideas
For example, on Celery, we use celery signals to hook before and after each task (https://github.com/uber-common/opentracing-python-instrumentation/blob/master/opentracing_instrumentation/client_hooks/celery.py)
I am ok to contribute to the code but interested by some hints on where to look to .