Create a sub-process for calling the airflow celery stop instead of directly importing Celery Provider CLI. The reason is that, within the entrypoint, we don't have all the necessary environment variables, hence the CLI generates some warnings about the DB because it thinks it is not initialized. I plan to do some refactoring later, but for now, it is easier to just create a sub-process.
Move the importing of Stats object in task_monitor.py to the __init__ method of the WorkerTaskMonitor. This is to avoid doing imports at the top level, which results in some Airflow initialization, potentially before having the updated environment variables (related to the previous point, though not a complete solution yet to the problem.)
Add a couple more informational logs to auto-scaling logic.
Fix the logging in Subprocess._kill_subprocess to make sure the logs end up in the service side not the customer.
Set AIRFLOW__SCHEDULER__TASK_QUEUED_TIMEOUT to 1800.0 to match the internal code.
Ensuring that the following metrics are always reported even if we
don't end up calling _return_abandoned_task_to_queue():
clean_celery_message_error_no_queue
clean_celery_message_error_sqs_op
clean_celery_message_success
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.
Issue #, if available: N/A
Description of changes:
airflow celery stop
instead of directly importing Celery Provider CLI. The reason is that, within the entrypoint, we don't have all the necessary environment variables, hence the CLI generates some warnings about the DB because it thinks it is not initialized. I plan to do some refactoring later, but for now, it is easier to just create a sub-process.Stats
object intask_monitor.py
to the__init__
method of the WorkerTaskMonitor. This is to avoid doing imports at the top level, which results in some Airflow initialization, potentially before having the updated environment variables (related to the previous point, though not a complete solution yet to the problem.)Subprocess._kill_subprocess
to make sure the logs end up in the service side not the customer.AIRFLOW__SCHEDULER__TASK_QUEUED_TIMEOUT
to 1800.0 to match the internal code._return_abandoned_task_to_queue()
:clean_celery_message_error_no_queue
clean_celery_message_error_sqs_op
clean_celery_message_success
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.