Closed dlesco closed 9 months ago
Thanks for the analysis and the details shared! This helps understanding your concerns directly.
Regarding (1): I understand
Regarding (2) I believe the standard in async is really to have short living calls or IO made with async libs. Async code shall not block for IO or include sleeps or heavy crunching. Per default if debug mode is enabled, async calls are generating warnings starting with 100ms. I would not recommend setting a higher default else the effectiveness of the triggerer is reduced if "bad code" is incrementially provided. I'd rather follow (3)
Regarding (3): this is reasonable.
Some questions: Is a CPU limit of <=1 a strong requirement on your side? If you are not using deferred tasks, have you thought about disabling triggerer at-all?
And as you already made this investigation, would you be able to contribute to the improvement?
Yes. Very good analysis (thanks), however I believe it has some flawed assumptions.
I would love to get some more insights on that one and I have two questions @dlesco
Question 1) What is a (reasonable and deliberate and well thought) reason you would like to limit CPU on triggerer?
For me it makes no sense whatsoever. Triggerer - by definition - runs an active async-io loop and again - by-definition it should run as fast as humanly possible to run this (single) async-io loop. And it's main async-io loop will take no more than 1 CPU (again almost by definition - because the async-io loop is just this - active loop). No threads, no multi-processing, just run your triggers as fast as you can. And if you do not have triggers, in the queue it will not take CPU - almost at all.
So literally IMHO it makes no sense to allocate 0.5 or less CPU to triggerer. What purpose would it serve? Could you please elaborate on why you would want to set the limit for triggerer?
So IMHO at most this issue should end up in (hopefully you could contribute it) with some production guideline documentation to never set the CPU limit to triggerer to < 1 (it was not specified - but your observation is right that setting it to lower limit will result in some unexpected results).
So it would be great - if we agree that it's the case in the discussion - that you submit - as a way to contribute back - an update to documentation explaining it. But maybe I am wrong and there are good reasons why you would like to deliberately set it to 0.5 CPU for example (I do not see it currently).
Question 2) What is your proposal on solving it if you do not agree with my understanding explained in 1) ?
The reason CPU limit is used because of Google Kubernetes Engine Autopilot. Google Composer 2 only runs on Autopilot. Autopilot always sets cpu limit equal to cpu requests, and cpu requests is also required; if you do not specify cpu requests, cpu requests will be set to 250m, and therefore cpu limit is set to 250m.
https://cloud.google.com/kubernetes-engine/docs/concepts/autopilot-resource-requests#resource-limits
The reason CPU limit is used because of Google Kubernetes Engine Autopilot. Google Composer 2 only runs on Autopilot. Autopilot always sets cpu limit equal to cpu requests, and cpu requests is also required; if you do not specify cpu requests, cpu requests will be set to 250m, and therefore cpu limit is set to 250m.
Then Google Composer team should set it to 1 (both requests and limits). This is problem of Composer then. I see no reason why it could not be set to 1 in Composer deployment (uinless there is one of course).
And just to explain it - as I see it now.
The problem is precisely as you described it, your description very well describes the unintended behaviour here - and resulting bursts of triggerer doing stuff (but then pausing) potentially.
The whole Idea of triggerer loop is to make sure triggers are fired as soon as they happen. But limiting the CPU for it, hampers its ability of doing so. The whole purpose of enabling Triggerer is to free resources for everyone else so that triggerer can process triggers and tight event loop and invole the (workers mainly) to be triggered as soo as possible and then Auto-Pilot should automatically scale them up as they are needed (via auto-pilot mechanisms).
So having a busy and working triggerer that is run on an instance that it can rely on having 1 CPU is crucial to process things as soon as possible and scale them as soon as possible. This way you save a lot on the workers (who do not have to take busy slots for waiting tasks in sensors for example or for constant serializing/deserializing to just check what happened). The price to pay is to have Triggerer that has 1 CPU available and can "command" the deferred workers to be woken up and deserialized as soon as an event happened. With 250ms it means that it will work in bursts. and if it happens that it takes 300 ms to process a single trigger event it will automatically get delayed by almost a second extra (and potentially the efects of it might compound).
So having request CPU = 1 makes sense in this case.
And of course If you have an idea how to improve it for lower limits - I think you are still free to do it, though I see setting CPU limit to < 1 makes very little sense. Maybe you could come up with a proposal to only include active time in the calculations - it should be possible, I think.
Google Composer 2 does allow to set cpu requests/limit to 1: the three options allowed by Composer are (0.5, 0.75, 1). You can see I put logs with the limit set to 1. But with a limit set to 1, it still takes more than 200ms sometimes. That's because you have a workload that is more than 1 CPU; besides the async-task thread, you have:
For Composer 2, the livenessProbe is running a script that calls the airflow jobs check
command. So, it's starting up a whole new Python process, loading modules, etc, which takes a lot of CPU. It would've been better if the gunicorn server had a /liveness URL (implemented to call the same Python function that airflow jobs check
command would do) to do an http liveness check; then it's not spending a lot of cpu starting up a new process. You can see from my logs that the magnitude of the delays are less when I removed the livenessProbe; with the livenessProbe, the max delay was 0.78s; without the livenessProbe, 0.37s.
An additional thing I thought of after writing last comment.
The triggerer is written as two threads: the _run_trigger_loop on the main thread, and the async task thread. That means that when the main thread gets scheduled, the Global Interpreter Lock prevents the async task thread from running while the main thread is running.
This adds some latency to the async task thread.
t would've been better if the gunicorn server had a /liveness URL
Absolutely, that would be a good contribution to make if we do not have it yet in triggerer. That would make it closer to the ideal 1
_run_trigger_loop The Global Interpreter Lock prevents the async task thread from running while the main thread is running.
Correct. That one takes some CPU cycles out. So yes. It can impact the loop (a bit) - not as much as 250 ms limit (because the main thread mostly interacts with DB and invokes I/O operations which do not keep GIL, whiere 250 ms limt will effectively block the loop from running always for 750ms within a second. So it's quite a bit differnt mechanism - it might delay the loop for "short" times but it should not really starve it. Same with Airflow Jobs check which yes, takes CPU but at most compete with the triggerer process for CPU not starve it.
Again - if you have another way to make such warning, it woudl be great to propose it, otherwise it's just stating that these 200ms fluctuate when there are other things running.
What's your proposal to solve it better @dlesco ? You seem to be pretty knowledgeable about all the internal details, so surely you can propose a sensible approach there? It's really good if you would like to contribute those things as PRs - since you can also test it in your environment.
I think what we need to do is throw-away blips.
Have the watchdog calculate it's latency, and append the latency value to a global list.
Then the main thread _run_trigger_loop, in it's loop, would make a local variable reference to the list, then reset the global list reference to a new empty list. The main loop would do either a trimmed-mean or median statistic on the list of latency values, and alarm on that.
I think what we need to do is throw-away blips.
Have the watchdog calculate it's latency, and append the latency value to a global list.
Then the main thread _run_trigger_loop, in it's loop, would make a local variable reference to the list, then reset the global list reference to a new empty list. The main loop would do either a trimmed-mean or median statistic on the list of latency values, and alarm on that.
Sounds cool - feel free to contribute it :)
I won't mark it back as "good-first-issue" as it is quite a bit beyond the scope of "first" issue for new contributors but for someone knowledgeable in the internal matters, sounds like a cool contirbutiion
I drafted some code, but won't be able to test it today.
diff --git a/airflow/jobs/triggerer_job_runner.py b/airflow/jobs/triggerer_job_runner.py
index bb151b32cc..0eba7a5c91 100644
--- a/airflow/jobs/triggerer_job_runner.py
+++ b/airflow/jobs/triggerer_job_runner.py
@@ -369,6 +369,8 @@ class TriggererJobRunner(BaseJobRunner, LoggingMixin):
self.emit_metrics()
# Idle sleep
time.sleep(1)
+ # Check watchdog
+ self.trigger_runner.check_watchdog()
def load_triggers(self):
"""Query the database for the triggers we're supposed to be running and update the runner."""
@@ -445,6 +447,17 @@ class TriggerRunner(threading.Thread, LoggingMixin):
# Should-we-stop flag
stop: bool = False
+ # Intervals between block_watchdog runs
+ watchdog_stats: deque[float]
+
+ # Next time to do watchdog checks
+ next_watchdog_check_time: float
+
+ # Watchdog constants
+ WATCHDOG_INTERVAL = 0.1
+ WATCHDOG_ERROR_THRESHOLD = 0.2
+ WATCHDOG_CHECK_INTERVAL = 10.0
+
def __init__(self):
super().__init__()
self.triggers = {}
@@ -453,12 +466,45 @@ class TriggerRunner(threading.Thread, LoggingMixin):
self.to_cancel = deque()
self.events = deque()
self.failed_triggers = deque()
+ self.watchdog_stats = deque()
+ self.next_watchdog_check_time = time.monotonic() + self.WATCHDOG_CHECK_INTERVAL
self.job_id = None
def run(self):
"""Sync entrypoint - just run a run in an async loop."""
asyncio.run(self.arun())
+ def check_watchdog(self):
+ """Main thread loop calls this to check on block_watchdog delays."""
+
+ now = time.monotonic()
+ if now < self.next_watchdog_check_time:
+ return
+ self.next_watchdog_check_time = now + self.WATCHDOG_CHECK_INTERVAL
+ stats_len = len(self.watchdog_stats)
+ if not stats_len:
+ self.log.error(
+ "Triggerer's async thread was blocked for %.2f seconds, "
+ "likely by a badly-written trigger. Set PYTHONASYNCIODEBUG=1 "
+ "to get more information on overrunning coroutines.",
+ self.WATCHDOG_CHECK_INTERVAL,
+ )
+ Stats.incr("triggers.blocked_main_thread")
+ return
+ stats = list()
+ for i in range(stats_len):
+ stats.append(self.watchdog_stats.popleft())
+ stats.sort()
+ median_time = stats[len(stats)//2]
+ if median_time > self.WATCHDOG_ERROR_THRESHOLD:
+ self.log.error(
+ "Triggerer's async thread median task scheduling time was %.2f seconds, "
+ "likely because of a badly-written trigger. Set PYTHONASYNCIODEBUG=1 "
+ "to get more information on overrunning coroutines.",
+ median_time,
+ )
+ Stats.incr("triggers.blocked_main_thread")
+
async def arun(self):
"""
Run trigger addition/deletion/cleanup; main (asynchronous) logic loop.
@@ -568,18 +614,9 @@ class TriggerRunner(threading.Thread, LoggingMixin):
"""
while not self.stop:
last_run = time.monotonic()
- await asyncio.sleep(0.1)
- # We allow a generous amount of buffer room for now, since it might
- # be a busy event loop.
+ await asyncio.sleep(self.WATCHDOG_INTERVAL)
time_elapsed = time.monotonic() - last_run
- if time_elapsed > 0.2:
- self.log.info(
- "Triggerer's async thread was blocked for %.2f seconds, "
- "likely by a badly-written trigger. Set PYTHONASYNCIODEBUG=1 "
- "to get more information on overrunning coroutines.",
- time_elapsed,
- )
- Stats.incr("triggers.blocked_main_thread")
+ self.watchdog_stats.append(time_elapsed)
@staticmethod
def set_individual_trigger_logging(trigger):
Maybe open PR and we can iterate ? and test too ? That's best way to start discussing over a "real" code :) ?
OK, I'm working on setting up my local environment so that pre-commit, etc, is enabled, before submitting.
This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.
This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.
Hey @dlesco - any progres with this ?
This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.
This issue has been closed because it has not received response from the issue author.
Apache Airflow version
Other Airflow 2 version (please specify below)
What happened
The triggerer emits "Triggerer's async thread was blocked for x.xx seconds" messages even on a cluster where none of the DAGs have been updated to use deferred mode for operators, so there is no load on the triggerer.
I have a detailed analysis of why.
In the "Anything Else" section, I am including a series of logs showing the problem, under differing conditions. But they all share the same condition that there is no load on the triggerer.
The issue is the triggerer's
block_watchdog
async function, which hardcodes a timeout of 0.2 seconds for consecutive runs of the async function. This especially does not play well if you set in kubernetesspec.template.spec.containers[].resources.limits.cpu
. For Google Composer, all pods on GKE Autopilot must have a cpu limit set, but it's an issue for any k8s deployment that sets a low cpu limit.The k8s
resources.limits.cpu
feature makes use of the Linux kernel's cgroups cpu throttling feature. There is a cpuperiod
, typically 100ms, and if all the threads in a container consume more than their allotted cpu time, the container is not scheduled onto a cpu for the rest of the period. So, if you have a cpu limit of 0.5, and over the course of 25ms, 50ms of cpu time is used by the threads in the container, then the container will not run for the next 75ms. This introduces latency into the threads.An article about it: https://medium.com/@ramandumcs/cpu-throttling-unbundled-eae883e7e494
This is not the only problem. If multiple threads share one cpu, then the context-switching to the other threads also introduces latency into the threads. For the triggerer container, we have these threads of execution:
_run_trigger_loop
function.A hardcoded 200ms is a "real-time" constraint. In order to satisfy such a real-time constraint, you would actually need multiple physical cpu-threads, so that the other concurrent threads do not introduce latencies, and you'd need no cpu throttling enabled.
But we're doing batch jobs, not real-time stock trading or something.
What you think should happen instead
Since Airflow is a batch job processing system, we should not have expensive real-time constraints that require you to dedicate multiple CPUs to the triggerer. Instead:
How to reproduce
Have the helm chart values set a triggerer limit of 0.5 or 1.0 cpu, and let it run for a day. You should see similar log messages as I'm including.
Operating System
NAME="Ubuntu" VERSION="20.04.6 LTS (Focal Fossa)" ID=ubuntu ID_LIKE=debian PRETTY_NAME="Ubuntu 20.04.6 LTS" VERSION_ID="20.04" HOME_URL="https://www.ubuntu.com/" SUPPORT_URL="https://help.ubuntu.com/" BUG_REPORT_URL="https://bugs.launchpad.net/ubuntu/" PRIVACY_POLICY_URL="https://www.ubuntu.com/legal/terms-and-policies/privacy-policy" VERSION_CODENAME=focal UBUNTU_CODENAME=focal
Versions of Apache Airflow Providers
No response
Deployment
Google Cloud Composer
Deployment details
composer-2-4-6-airflow-2-6-3
Anything else
0.5 CPU limit
1.0 cpu limit
1.0 cpu, livenessProbe removed from the Deployment
Are you willing to submit PR?
Code of Conduct