Airflow Provider for OpenTelemetry is an Airflow provider composed of the follwing:
Airflow release 2.10.0 Now has OTEL traces implemented, which means that Airflow can natively emit traces on jobs and DAG runs in OTEL format. This provider works well with the version of Airflow that has this trace enabled.
disabled
, while otel hook would use the connection configuration not from its connection info, but directly from the airflor's otel configuration. That means users don't have to create redundant connection to it.enabled
, and otel hook would use the connection configuration from the connections of type otel
.NOT
function.After checking out this repo in your local env, you can install the opentelemetry provider using pip.
pip install ./airflow_provider_opentelemetry
OTEL Connection would have the following parameters in its configuration UI:
It's the URL for emitting OTEL data (in OTLP HTTP protocol). Example: http://my-otel-endpoint:4318
Sometimes, the endpoint may require you to specify header name of the API Key. In that case, you may provide one.
API key that is paired with the HTTP header name
As for sending metrics data, everything is sent in timed interval (e.g. every 30 seconds). This will specify that internal in the unit of milliseconds.
This is currently NOT implemented (coming up in future), but when checked, this will effectively 'disable' the hook and listener. You may need to restart Airflow in order for this to take action, but a great way to turn it off without deleting the connection.
You can use span
decorator to indicate that a particular function would be emitting its span when running.
from airflow_provider_opentelemetry.hooks.otel import OtelHook
# get otel hook first. Use the parameterless constructor in 2.10+ to share the server's OTLP config.
otel_hook = OtelHook("otel_conn")
...
# use hotel_hook's span decorator
@otel_hook.span
def setup():
span = trace.get_current_span()
span.set_attribute('app.msg', 'sleeping for 1 second')
time.sleep(1)
# simple setup operator in python
t0 = PythonOperator(
task_id="setup",
python_callable=setup
)
You can also use a special parameter called dag_context
to receive the DAG run's context as input of the function, to initiate a new span.
from airflow_provider_opentelemetry.hooks.otel import OtelHook
...
def setup(**dag_context):
with otel_hook.start_as_current_span(name="do_setup", dag_context=dag_context) as s:
s.set_attribute("data quality", "fair")
s.set_attribute("description", "You can add attributes in otel hook to have business or data specific details on top of existing task instnace span.")
with otel_hook.start_as_current_span(name="do_sleep") as ss:
ss.set_attribute("sleep for", "one second")
time.sleep(1)
# simple setup operator in python
t0 = PythonOperator(
task_id="setup",
python_callable=setup
)
The hook exposes a TracerProvider
that can be used to send additional traces through the same OTLP connection. To add additional traces or to use instrumentation libraries, use the trace_provider
property as below:
from airflow_provider_opentelemetry.hooks.otel import OtelHook
import requests
from opentelemetry import trace
from opentelemetry.instrumentation.requests import RequestsInstrumentor
# get the Open Telemetry hook
otel_hook = OtelHook()
RequestsInstrumentor().instrument(tracer_provider=otel_hook.tracer_provider)
tracer = trace.get_tracer("trace_test.tracer", tracer_provider=otel_hook.tracer_provider)
@otel_hook.span
def call_github():
# N.B. This is using `tracer` not `otel_hook`
with tracer.start_as_current_span(name="Make GitHub call")
r = requests.get('https://api.github.com/events')
# simple setup operator in python
t0 = PythonOperator(
task_id="call_github",
python_callable=call_github
)
You can also submit your very own log message which then can automatically converted and linked to OTEL log record within the span context.
def setup(**dag_context):
with otel_hook.start_as_current_span(name="do_setup", dag_context=dag_context) as s:
s.set_attribute("data quality", "fair")
s.set_attribute("description", "You can add attributes in otel hook to have business or data specific details on top of existing task instnace span.")
with otel_hook.start_as_current_span(name="do_sleep") as ss:
ss.set_attribute("sleep for", "one second")
# emit 'info' log.
otel_hook.otellog('info','this is an information log')
time.sleep(1)
# simple setup operator in python
t0 = PythonOperator(
task_id="setup",
python_callable=setup
)
⚠️ Metrics, unlike traces, do not have contexts (e.g. dag run, trace, etc), and have intervals for them to be reported in a fixed amount of time. Because of this, please note that if what you instrument would likely be very short and end before the set interval time (e.g. 5 seconds), those metrics will not have time to be emitted. In that case, generating span or span events may be a better way to instrument those.
You can use otel hook to increment counter of certain actions
otel_hook.incr('my.counter')
...
otel_hook.decr('my.counter')
You can use otel hook to set value of a gauge type metric
otel_hook.gauge('my.gauge', 12345)
You can use timing to record delta time
last_finish_time = timezone.utcnow()
last_duration = last_finish_time - processor.start_time
file_name = 'myfile.txt'
otel_hook.timing("my.last_duration", last_duration, tags={"file_name": file_name})
This provide can provide external links in the trace instance view. You may need to define the following env. variable to do so.
AIRFLOW_OTEL_TRACE_LINK='https://ui.honeycomb.io/demo/environments/otel-workshop/trace?trace_id={trace_id}&span={span_id}&trace_start_ts={start_date_ts}'
When the environment is defined, you will be able to see the following button named OTEL Trace Link
shown under Extra LInks
section.
The above example is when you are using Honeycomb as the OTEL backend to ingest the traces.
The following attributes are supported in the template string: