apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.14k stars 14.32k forks source link

Missing dagrun.duration.success/failed.<dag_id> metric in statsd exporter for airflow deployed on Kubernetes cluster #18630

Closed bhupixb closed 1 year ago

bhupixb commented 3 years ago

Apache Airflow version

2.1.3

Operating System

Debian GNU/Linux 10 (buster)

Versions of Apache Airflow Providers

apache-airflow-providers-amazon==2.0.0 apache-airflow-providers-celery==2.0.0 apache-airflow-providers-cncf-kubernetes==2.0.2 apache-airflow-providers-docker==2.0.0 apache-airflow-providers-elasticsearch==2.0.2 apache-airflow-providers-ftp==2.0.0 apache-airflow-providers-google==5.0.0 apache-airflow-providers-grpc==2.0.0 apache-airflow-providers-hashicorp==2.0.0 apache-airflow-providers-http==2.0.0 apache-airflow-providers-imap==2.0.0 apache-airflow-providers-microsoft-azure==2.0.0 apache-airflow-providers-mysql==2.1.0 apache-airflow-providers-postgres==2.0.0 apache-airflow-providers-redis==2.0.0 apache-airflow-providers-sendgrid==2.0.0 apache-airflow-providers-sftp==2.1.0 apache-airflow-providers-slack==4.0.0 apache-airflow-providers-sqlite==2.0.0 apache-airflow-providers-ssh==2.1.0

Deployment

Other

Deployment details

We have modified official airflow helm chart to meet our needs. Kubernetes version: 1.15.12

What happened

We have followed official documentation for setting up metrics in airflow using statsd. Then we are using Prometheus to pull these metrics from statsd. Here is our configmap for statsd mapping.yaml https://ideone.com/cotYSG.

The issue that we are facing is that in statsd we are not getting these metrics dagrun.duration.success.<dag_id> and dagrun.duration.failed.<dag_id> in statsd. Most other metrics are coming fine.

Our statsd configuration: metrics: statsd_on: 'True' statsd_port: 9125 statsd_prefix: airflow statsd_host: airflow-statsd

What you expected to happen

Metrics dagrun.duration.success.<dag_id> and dagrun.duration.failed.<dag_id> should also come to statsd. These metrics are required to setup some alerts in prometheus e.g. for long running dags.

How to reproduce

  1. Deploy Airflow with the official helm chart
  2. Set up Statsd exporter for metric collection
  3. Schedule 5-6 runs of any example dags over a period of 30 minutes
  4. Metric will be not be available on prometheus

Anything else

This issue is happening for 99% of the time, a few time we see the above 2 metrics in prometheus, but unable to find the correlation why it came on that specific time and for that dag.

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 3 years ago

Thanks for opening your first issue here! Be sure to follow the issue template!

potiuk commented 3 years ago

It's an interesting on. Statsd client uses UDP packet to send metrics and I think lack of those metrics (which are emited JUST before the task completes) that indicate that the UDP packet are either not sent at all by the process running the task or not received by statsd deamon. I think most likely is the first case.

Are you using Kubernetes or Celery Executor?

I'd say if that's the former, the the most likely case is that the process finishes more often than not before the UDP packet is actually sent (in Celery Executor the process that sends the metric are not finished after task end, they should be (I am not 100% sure of the forking strategy there) runing all the time, but in Kubernetes executor the process might exit faster than the UDP packet is actually sent.

bhupixb commented 3 years ago

Yes we are using Kubernetes Executor only.

potiuk commented 3 years ago

@ashb - I believe the UDP sending delay before process stops might be the real culprit here. I could not find any way to "wait" until the UDP "buffers" are empty, any other idea how we can fix it without introducing an artifficial delay before exiting (which is kinda lame)?

potiuk commented 3 years ago

Interesting discussion here https://www.toolbox.com/tech/programming/question/ensure-udp-packet-finished-sending-072214/

potiuk commented 3 years ago

We could potentially switch to TCPStatsClient and give that option to users

potiuk commented 3 years ago

HEy @bhupixb - I looked at the code and it seems it might be even fixable without any change to airflow.

We already have https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#statsd-custom-client-path configuration parameter and setting it to statsd.TCPStatsClient might solve the problem.

Can you please try it (you need to make sure that your statsd accepts TCP connections')?

ashb commented 3 years ago

Interesting discussion here https://www.toolbox.com/tech/programming/question/ensure-udp-packet-finished-sending-072214/

That is talking about full reliable delivery which is more than we need here.

My understanding was that even if it's stuck in a buffer somewhere and the process exits right after it would still send, but I'm not 100% sure of that. But given it sometimes shows up you are likely right about the cause Jarek.

Though actually: those metrics are emitted from within dagrun.update_state() -- and I had a quick look and couldn't see it being called from with TaskInstance, or LocalTaskJob, so it might be coming form the scheduler? In which case the process exit is unlikely to be the cause

potiuk commented 3 years ago

Though actually: those metrics are emitted from within dagrun.update_state() -- and I had a quick look and couldn't see it being called from with TaskInstance, or LocalTaskJob, so it might be coming form the scheduler? In which case the process exit is unlikely to be the cause

Could it be the mini-scheduler running in KubernetesPod?

ashb commented 3 years ago

Nope, the mini scheduler only looks at TaskIstances, but doesn't change the state of the dagrun unless it is paused (the Scheduler ignores runs of paused DAGs, so the LocalTaskJob will update the state from running to success when paused, but only when paused)

bhupixb commented 3 years ago

HEy @bhupixb - I looked at the code and it seems it might be even fixable without any change to airflow.

We already have https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#statsd-custom-client-path configuration parameter and setting it to statsd.TCPStatsClient might solve the problem.

Can you please try it (you need to make sure that your statsd accepts TCP connections')?

Will try this and update here.

ags2121 commented 3 years ago

HEy @bhupixb - I looked at the code and it seems it might be even fixable without any change to airflow. We already have https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#statsd-custom-client-path configuration parameter and setting it to statsd.TCPStatsClient might solve the problem. Can you please try it (you need to make sure that your statsd accepts TCP connections')?

Will try this and update here.

Curious if this works for you, I'm seeing a similar thing with CeleryExecutor on ECS Fargate/Redis (same set up otherwise). These are the only metrics coming through at the moment, but I expect to see a lot more:

airflow.can_connect
airflow.dagbag_size
airflow.collect_dags
airflow.executor.open_slots
airflow.scheduler_heartbeat
airflow.dagbag_import_errors
airflow.executor.queued_tasks
airflow.executor.running_tasks
airflow.dag.loading_duration.avg
airflow.dag.loading_duration.max
airflow.dag_processing.processes
airflow.dag.loading_duration.count
airflow.dag.loading_duration.median
airflow.dag_processing.import_errors
check_run.airflow.can_connect.critical
airflow.dag_processing.last_runtime.avg
airflow.dag_processing.last_runtime.max
airflow.dag_processing.total_parse_time
airflow.dag_processing.last_duration.avg
airflow.dag_processing.last_duration.max
airflow.dag.loading_duration.95percentile
airflow.dag_processing.last_runtime.count
airflow.dag_processing.last_duration.count
airflow.dag_processing.last_runtime.median
airflow.dag_processing.last_duration.median
airflow.dag_processing.last_run.seconds_ago
airflow.dag_processing.last_runtime.95percentile
airflow.dag_processing.last_duration.95percentile
ags2121 commented 3 years ago

HEy @bhupixb - I looked at the code and it seems it might be even fixable without any change to airflow. We already have https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#statsd-custom-client-path configuration parameter and setting it to statsd.TCPStatsClient might solve the problem. Can you please try it (you need to make sure that your statsd accepts TCP connections')?

Will try this and update here.

I'm seeing this exception when I set the custom client:

airflow.exceptions.AirflowConfigException: Your custom Statsd client must extend the statsd.StatsClient in order to ensure backwards compatibility.

setting it like this:

AIRFLOW__METRICS__STATSD_CUSTOM_CLIENT_PATH=statsd.TCPStatsClient
bhupixb commented 3 years ago

+1 to @ags2121 . getting the same as ags2121 mentioned. @potiuk any comments on this?

potiuk commented 3 years ago

Ah. Interesting - seems that we indeed explicitly check if that client is UDP one

https://github.com/apache/airflow/blob/2f88009bbf8818f3b4b553a04ae3b848af43c4aa/airflow/stats.py#L379

Probably you could try to change (in your image) to use from statsd.base import StatsBase and change to StatsBase few lines below.

I am not 100% sure if it would work though :(

ags2121 commented 3 years ago

I got the import change to work using:

from statsd.client.base import StatsClientBase as StatsClient

and AIRFLOW__METRICS__STATSD_CUSTOM_CLIENT_PATH is set to statsd.TCPStatsClient

and I updated my statsd server to run on TCP, however now I get this error from the scheduler:

scheduler  | [2021-10-04 13:48:59,423] {stats.py:390} INFO - Successfully loaded custom Statsd client
scheduler  | Traceback (most recent call last):
scheduler  |   File "/home/airflow/.local/bin/airflow", line 8, in <module>
scheduler  |     sys.exit(main())
scheduler  |   File "/home/airflow/.local/lib/python3.9/site-packages/airflow/__main__.py", line 40, in main
scheduler  |     args.func(args)
scheduler  |   File "/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/cli_parser.py", line 48, in command
scheduler  |     return func(*args, **kwargs)
scheduler  |   File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/cli.py", line 92, in wrapper
scheduler  |     return f(*args, **kwargs)
scheduler  |   File "/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/commands/scheduler_command.py", line 70, in scheduler
scheduler  |     job.run()
scheduler  |   File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/base_job.py", line 236, in run
scheduler  |     Stats.incr(self.__class__.__name__.lower() + '_start', 1, 1)
scheduler  |   File "/home/airflow/.local/lib/python3.9/site-packages/airflow/stats.py", line 233, in wrapper
scheduler  |     return fn(_self, stat, *args, **kwargs)
scheduler  |   File "/home/airflow/.local/lib/python3.9/site-packages/airflow/stats.py", line 270, in incr
scheduler  |     return self.statsd.incr(stat, count, rate)
scheduler  |   File "/home/airflow/.local/lib/python3.9/site-packages/statsd/client/base.py", line 35, in incr
scheduler  |     self._send_stat(stat, '%s|c' % count, rate)
scheduler  |   File "/home/airflow/.local/lib/python3.9/site-packages/statsd/client/base.py", line 59, in _send_stat
scheduler  |     self._after(self._prepare(stat, value, rate))
scheduler  |   File "/home/airflow/.local/lib/python3.9/site-packages/statsd/client/base.py", line 74, in _after
scheduler  |     self._send(data)
scheduler  |   File "/home/airflow/.local/lib/python3.9/site-packages/statsd/client/stream.py", line 33, in _send
scheduler  |     self.connect()
scheduler  |   File "/home/airflow/.local/lib/python3.9/site-packages/statsd/client/stream.py", line 59, in connect
scheduler  |     self._sock.connect(addr)
scheduler  | ConnectionRefusedError: [Errno 111] Connection refused
potiuk commented 3 years ago

You need to enable TCP in your statsd daemon. By default it uses UDP only.

ags2121 commented 3 years ago

yeah I thought I had TCP configured on... I'm thinking now that it might not be possible to switch to TCP (we're using a datadog agent as the statsd service)

github-actions[bot] commented 1 year ago

This issue has been automatically marked as stale because it has been open for 365 days without any activity. There has been several Airflow releases since last activity on this issue. Kindly asking to recheck the report against latest Airflow version and let us know if the issue is reproducible. The issue will be closed in next 30 days if no further activity occurs from the issue author.

github-actions[bot] commented 1 year ago

This issue has been closed because it has not received response from the issue author.

LSparkzwz commented 1 week ago

Hello, I'm having the same problem in Airflow v2.10.2

As said in the OP I'm getting the other metrics but not dagrun.duration.success. and dagrun.duration.failed. in statsd.

Airflow has been deployed in Kubernetes using this Helm Chart: https://github.com/apache/airflow/tree/main/chart

I read the suggestions in this thread but I can't make a custom image, is there a way to force TCP through the Helm Chart values?