apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.75k stars 14.22k forks source link

DagTimeout and AirflowTaskTimeout from running multiple dags at one time #18448

Closed hpatel-higi closed 3 years ago

hpatel-higi commented 3 years ago

Apache Airflow version

2.1.4 (latest released)

Operating System

Linux

Versions of Apache Airflow Providers

apache-airflow-providers-ftp==2.0.1 apache-airflow-providers-http==2.0.1 apache-airflow-providers-imap==2.0.1 apache-airflow-providers-microsoft-azure==3.1.1 apache-airflow-providers-microsoft-mssql==2.0.1 apache-airflow-providers-odbc==2.0.1 apache-airflow-providers-postgres==2.2.0 apache-airflow-providers-slack==4.0.1 apache-airflow-providers-snowflake==2.1.1 apache-airflow-providers-sqlite==2.0.1 apache-airflow-providers-ssh==2.1.1

Deployment

Docker-Compose

Deployment details

Deployed to Azure app service

What happened

trigger a dag run on every dag, around 16 of them, at the same time. Start seeing errors and airflow crashes.

What you expected to happen

Dag runs to complete over time.

How to reproduce

No response

Anything else

No response

Are you willing to submit PR?

Code of Conduct

Jorricks commented 3 years ago

Hello @hpatel-higi,

This is not enough to go on to figure out the issues. What about sharing a bit more specific information about:

hpatel-higi commented 3 years ago

The DAGs are in a python file of their own. Theres 20 DAGs in total that we have. If create a DAG run for all of them at the same time thats when i get a DAGTimeout error and sometime AirflowTaskTimeout error. I cant get the logs since its happening on Azure app service, but i will try and see what i can get.

Also when i was getting these errors i had my SECRET_KEY set to b'asdfasdfasdf' not sure if that is correct or not but i changed the key to one that is generated from the following code: python -c 'import secrets; print(secrets.token_hex(16));'

I am seeing alot less errors then before.

hpatel-higi commented 3 years ago

This is the error i got while just 4 DAGs were running. I didn't have these problems in 1.10.8

2021-09-21T15:44:31.288689942Z [2021-09-21 15:44:31,281] {{timeout.py:36}} ERROR - Process timed out, PID: 3002
2021-09-21T15:44:31.495731398Z [2021-09-21 15:44:31,287] {{dagbag.py:329}} ERROR - Failed to import: /usr/local/airflow/dags/build_dbs/build_api_audit_logs.py
2021-09-21T15:44:31.495763098Z Traceback (most recent call last):
2021-09-21T15:44:31.495778898Z File "/usr/local/lib/python3.9/site-packages/airflow/models/dagbag.py", line 326, in _load_modules_from_file
2021-09-21T15:44:31.495784798Z loader.exec_module(new_module)
2021-09-21T15:44:31.495788798Z File "<frozen importlib._bootstrap_external>", line 850, in exec_module
2021-09-21T15:44:31.495793298Z File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
2021-09-21T15:44:31.495797698Z File "/usr/local/airflow/dags/build_dbs/build_api_audit_logs.py", line 5, in <module>
2021-09-21T15:44:31.495802198Z from helper_functions.snowflake_operator import SnowflakeOperator
2021-09-21T15:44:31.495806298Z File "/usr/local/airflow/dags/helper_functions/snowflake_operator.py", line 1, in <module>
2021-09-21T15:44:31.495810298Z from airflow.contrib.operators.snowflake_operator import SnowflakeOperator as sfo
2021-09-21T15:44:31.495814198Z File "/usr/local/lib/python3.9/site-packages/airflow/contrib/operators/snowflake_operator.py", line 22, in <module>
2021-09-21T15:44:31.495818398Z from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator # noqa
2021-09-21T15:44:31.495822398Z File "/usr/local/lib/python3.9/site-packages/airflow/providers/snowflake/operators/snowflake.py", line 21, in <module>
2021-09-21T15:44:31.495826598Z from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
2021-09-21T15:44:31.495830398Z File "/usr/local/lib/python3.9/site-packages/airflow/providers/snowflake/hooks/snowflake.py", line 25, in <module>
2021-09-21T15:44:31.495834598Z from snowflake import connector
2021-09-21T15:44:31.495838298Z File "/usr/local/lib/python3.9/site-packages/snowflake/connector/__init__.py", line 15, in <module>
2021-09-21T15:44:31.495842398Z from .connection import SnowflakeConnection
2021-09-21T15:44:31.495846198Z File "/usr/local/lib/python3.9/site-packages/snowflake/connector/connection.py", line 65, in <module>
2021-09-21T15:44:31.495867199Z from .cursor import LOG_MAX_QUERY_LENGTH, SnowflakeCursor
2021-09-21T15:44:31.495882499Z File "/usr/local/lib/python3.9/site-packages/snowflake/connector/cursor.py", line 51, in <module>
2021-09-21T15:44:31.495886999Z from .file_transfer_agent import SnowflakeFileTransferAgent
2021-09-21T15:44:31.495890499Z File "/usr/local/lib/python3.9/site-packages/snowflake/connector/file_transfer_agent.py", line 45, in <module>
2021-09-21T15:44:31.495894399Z from .gcs_util import SnowflakeGCSUtil
2021-09-21T15:44:31.495898999Z File "/usr/local/lib/python3.9/site-packages/snowflake/connector/gcs_util.py", line 15, in <module>
2021-09-21T15:44:31.495902999Z from .encryption_util import EncryptionMetadata
2021-09-21T15:44:31.495906499Z File "/usr/local/lib/python3.9/site-packages/snowflake/connector/encryption_util.py", line 15, in <module>
2021-09-21T15:44:31.495931599Z from Cryptodome.Cipher import AES
2021-09-21T15:44:31.495935899Z File "/usr/local/lib/python3.9/site-packages/Cryptodome/Cipher/__init__.py", line 36, in <module>
2021-09-21T15:44:31.495939899Z from Cryptodome.Cipher._mode_gcm import _create_gcm_cipher
2021-09-21T15:44:31.495943499Z File "/usr/local/lib/python3.9/site-packages/Cryptodome/Cipher/_mode_gcm.py", line 84, in <module>
2021-09-21T15:44:31.495947499Z _ghash_portable = _get_ghash_portable()
2021-09-21T15:44:31.495951099Z File "/usr/local/lib/python3.9/site-packages/Cryptodome/Cipher/_mode_gcm.py", line 81, in _get_ghash_portable
2021-09-21T15:44:31.495954999Z lib = load_pycryptodome_raw_lib("Cryptodome.Hash._ghash_portable", api)
2021-09-21T15:44:31.495958899Z File "/usr/local/lib/python3.9/site-packages/Cryptodome/Util/_raw_api.py", line 293, in load_pycryptodome_raw_lib
2021-09-21T15:44:31.495979099Z return load_lib(pycryptodome_filename(dir_comps, filename),
2021-09-21T15:44:31.495982599Z File "/usr/local/lib/python3.9/site-packages/Cryptodome/Util/_raw_api.py", line 95, in load_lib
2021-09-21T15:44:31.495986399Z lib = ffi.dlopen(name)
2021-09-21T15:44:31.495990099Z File "/usr/local/lib/python3.9/site-packages/cffi/api.py", line 150, in dlopen
2021-09-21T15:44:31.495993899Z lib, function_cache = _make_ffi_library(self, name, flags)
2021-09-21T15:44:31.496015900Z File "/usr/local/lib/python3.9/site-packages/cffi/api.py", line 832, in _make_ffi_library
2021-09-21T15:44:31.496020000Z backendlib = _load_backend_lib(backend, libname, flags)
2021-09-21T15:44:31.496023700Z File "/usr/local/lib/python3.9/site-packages/cffi/api.py", line 818, in _load_backend_lib
2021-09-21T15:44:31.496027600Z path = ctypes.util.find_library(name)
2021-09-21T15:44:31.496031200Z File "/usr/local/lib/python3.9/ctypes/util.py", line 330, in find_library
2021-09-21T15:44:31.496035000Z _get_soname(_findLib_gcc(name)) or _get_soname(_findLib_ld(name))
2021-09-21T15:44:31.496038800Z File "/usr/local/lib/python3.9/ctypes/util.py", line 131, in _findLib_gcc
2021-09-21T15:44:31.496045800Z trace = proc.stdout.read()
2021-09-21T15:44:31.496049600Z File "/usr/local/lib/python3.9/site-packages/airflow/utils/timeout.py", line 37, in handle_timeout
2021-09-21T15:44:31.496053500Z raise AirflowTaskTimeout(self.error_message)
2021-09-21T15:44:31.496057800Z airflow.exceptions.AirflowTaskTimeout: DagBag import timeout for /usr/local/airflow/dags/build_dbs/build_api_audit_logs.py after 30.0s, PID: 3002
2021-09-21T15:44:31.500108928Z [2021-09-21 15:44:31,496] {{local_executor.py:128}} ERROR - Failed to execute task dag_id could not be found: build_api_audit_logs. Either the dag did not exist or it failed to parse..
hpatel-higi commented 3 years ago

This is what my airflow.cfg file looks like

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

# This is the template for Airflow's default configuration. When Airflow is
# imported, it looks for a configuration file at $AIRFLOW_HOME/airflow.cfg. If
# it doesn't exist, Airflow uses this template to generate it by replacing
# variables in curly braces with their global values from configuration.py.

# Users should not modify this file; they should customize the generated
# airflow.cfg instead.

# ----------------------- TEMPLATE BEGINS HERE -----------------------

[core]
# The folder where your airflow pipelines live, most likely a
# subfolder in a code repository. This path must be absolute.
dags_folder = /usr/local/airflow/dags

# Hostname by providing a path to a callable, which will resolve the hostname.
# The format is "package.function".
#
# For example, default value "socket.getfqdn" means that result from getfqdn() of "socket"
# package will be used as hostname.
#
# No argument should be required in the function specified.
# If using IP address as hostname is preferred, use value ``airflow.utils.net.get_host_ip_address``
hostname_callable = socket.getfqdn

# Default timezone in case supplied date times are naive
# can be utc (default), system, or any IANA timezone string (e.g. Europe/Amsterdam)
default_timezone = America/Chicago

# The executor class that airflow should use. Choices include
# ``SequentialExecutor``, ``LocalExecutor``, ``CeleryExecutor``, ``DaskExecutor``,
# ``KubernetesExecutor``, ``CeleryKubernetesExecutor`` or the
# full import path to the class when using a custom executor.
executor = LocalExecutor

# The SqlAlchemy connection string to the metadata database.
# SqlAlchemy supports many different database engines.
# More information here:
# http://airflow.apache.org/docs/apache-airflow/stable/howto/set-up-database.html#database-uri
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@postgres:5432/airflow

# The encoding for the databases
sql_engine_encoding = utf-8

# Collation for ``dag_id``, ``task_id``, ``key`` columns in case they have different encoding.
# This is particularly useful in case of mysql with utf8mb4 encoding because
# primary keys for XCom table has too big size and ``sql_engine_collation_for_ids`` should
# be set to ``utf8mb3_general_ci``.
# sql_engine_collation_for_ids =

# If SqlAlchemy should pool database connections.
sql_alchemy_pool_enabled = True

# The SqlAlchemy pool size is the maximum number of database connections
# in the pool. 0 indicates no limit.
sql_alchemy_pool_size = 5

# The maximum overflow size of the pool.
# When the number of checked-out connections reaches the size set in pool_size,
# additional connections will be returned up to this limit.
# When those additional connections are returned to the pool, they are disconnected and discarded.
# It follows then that the total number of simultaneous connections the pool will allow
# is pool_size + max_overflow,
# and the total number of "sleeping" connections the pool will allow is pool_size.
# max_overflow can be set to ``-1`` to indicate no overflow limit;
# no limit will be placed on the total number of concurrent connections. Defaults to ``10``.
sql_alchemy_max_overflow = 10

# The SqlAlchemy pool recycle is the number of seconds a connection
# can be idle in the pool before it is invalidated. This config does
# not apply to sqlite. If the number of DB connections is ever exceeded,
# a lower config value will allow the system to recover faster.
sql_alchemy_pool_recycle = 1800

# Check connection at the start of each connection pool checkout.
# Typically, this is a simple statement like "SELECT 1".
# More information here:
# https://docs.sqlalchemy.org/en/13/core/pooling.html#disconnect-handling-pessimistic
sql_alchemy_pool_pre_ping = True

# The schema to use for the metadata database.
# SqlAlchemy supports databases with the concept of multiple schemas.
sql_alchemy_schema =

# Import path for connect args in SqlAlchemy. Defaults to an empty dict.
# This is useful when you want to configure db engine args that SqlAlchemy won't parse
# in connection string.
# See https://docs.sqlalchemy.org/en/13/core/engines.html#sqlalchemy.create_engine.params.connect_args
# sql_alchemy_connect_args =

# This defines the maximum number of task instances that can run concurrently in Airflow
# regardless of scheduler count and worker count. Generally, this value is reflective of
# the number of task instances with the running state in the metadata database.
parallelism = 32

# The maximum number of task instances allowed to run concurrently in each DAG. To calculate
# the number of tasks that is running concurrently for a DAG, add up the number of running
# tasks for all DAG runs of the DAG. This is configurable at the DAG level with ``concurrency``,
# which is defaulted as ``dag_concurrency``.
dag_concurrency = 16

# Are DAGs paused by default at creation
dags_are_paused_at_creation = True

# The maximum number of active DAG runs per DAG. The scheduler will not create more DAG runs
# if it reaches the limit. This is configurable at the DAG level with ``max_active_runs``,
# which is defaulted as ``max_active_runs_per_dag``.
max_active_runs_per_dag = 16

# The maximum number of queued dagruns for a single DAG. The scheduler will not create more DAG runs
# if it reaches the limit. This is not configurable at the DAG level.
max_queued_runs_per_dag = 16

# Whether to load the DAG examples that ship with Airflow. It's good to
# get started, but you probably want to set this to ``False`` in a production
# environment
load_examples = False

# Whether to load the default connections that ship with Airflow. It's good to
# get started, but you probably want to set this to ``False`` in a production
# environment
load_default_connections = False

# Path to the folder containing Airflow plugins
plugins_folder = /usr/local/airflow/plugins

# Should tasks be executed via forking of the parent process ("False",
# the speedier option) or by spawning a new python process ("True" slow,
# but means plugin changes picked up by tasks straight away)
execute_tasks_new_python_interpreter = False

# Secret key to save connection passwords in the db
fernet_key =

# Whether to disable pickling dags
donot_pickle = False

# How long before timing out a python file import
dagbag_import_timeout = 30

# Should a traceback be shown in the UI for dagbag import errors,
# instead of just the exception message
dagbag_import_error_tracebacks = True

# If tracebacks are shown, how many entries from the traceback should be shown
dagbag_import_error_traceback_depth = 2

# How long before timing out a DagFileProcessor, which processes a dag file
dag_file_processor_timeout = 50

# The class to use for running task instances in a subprocess.
# Choices include StandardTaskRunner, CgroupTaskRunner or the full import path to the class
# when using a custom task runner.
task_runner = StandardTaskRunner

# If set, tasks without a ``run_as_user`` argument will be run with this user
# Can be used to de-elevate a sudo user running Airflow when executing tasks
default_impersonation =

# What security module to use (for example kerberos)
security =

# Turn unit test mode on (overwrites many configuration options with test
# values at runtime)
unit_test_mode = False

# Whether to enable pickling for xcom (note that this is insecure and allows for
# RCE exploits).
enable_xcom_pickling = False

# When a task is killed forcefully, this is the amount of time in seconds that
# it has to cleanup after it is sent a SIGTERM, before it is SIGKILLED
killed_task_cleanup_time = 60

# Whether to override params with dag_run.conf. If you pass some key-value pairs
# through ``airflow dags backfill -c`` or
# ``airflow dags trigger -c``, the key-value pairs will override the existing ones in params.
dag_run_conf_overrides_params = False

# When discovering DAGs, ignore any files that don't contain the strings ``DAG`` and ``airflow``.
dag_discovery_safe_mode = True

# The number of retries each task is going to have by default. Can be overridden at dag or task level.
default_task_retries = 0

# Updating serialized DAG can not be faster than a minimum interval to reduce database write rate.
min_serialized_dag_update_interval = 30

# Fetching serialized DAG can not be faster than a minimum interval to reduce database
# read rate. This config controls when your DAGs are updated in the Webserver
min_serialized_dag_fetch_interval = 10

# Whether to persist DAG files code in DB.
# If set to True, Webserver reads file contents from DB instead of
# trying to access files in a DAG folder.
# (Default is ``True``)
# Example: store_dag_code = True
# store_dag_code =

# Maximum number of Rendered Task Instance Fields (Template Fields) per task to store
# in the Database.
# All the template_fields for each of Task Instance are stored in the Database.
# Keeping this number small may cause an error when you try to view ``Rendered`` tab in
# TaskInstance view for older tasks.
max_num_rendered_ti_fields_per_task = 30

# On each dagrun check against defined SLAs
check_slas = True

# Path to custom XCom class that will be used to store and resolve operators results
# Example: xcom_backend = path.to.CustomXCom
xcom_backend = airflow.models.xcom.BaseXCom

# By default Airflow plugins are lazily-loaded (only loaded when required). Set it to ``False``,
# if you want to load plugins whenever 'airflow' is invoked via cli or loaded from module.
lazy_load_plugins = True

# By default Airflow providers are lazily-discovered (discovery and imports happen only when required).
# Set it to False, if you want to discover providers whenever 'airflow' is invoked via cli or
# loaded from module.
lazy_discover_providers = True

# Number of times the code should be retried in case of DB Operational Errors.
# Not all transactions will be retried as it can cause undesired state.
# Currently it is only used in ``DagFileProcessor.process_file`` to retry ``dagbag.sync_to_db``.
max_db_retries = 3

# Hide sensitive Variables or Connection extra json keys from UI and task logs when set to True
#
# (Connection passwords are always hidden in logs)
hide_sensitive_var_conn_fields = True

# A comma-separated list of extra sensitive keywords to look for in variables names or connection's
# extra JSON.
sensitive_var_conn_names =

[logging]
# The folder where airflow should store its log files
# This path must be absolute
base_log_folder = /usr/local/airflow/logs

# Airflow can store logs remotely in AWS S3, Google Cloud Storage or Elastic Search.
# Set this to True if you want to enable remote logging.
remote_logging = False

# Users must supply an Airflow connection id that provides access to the storage
# location.
remote_log_conn_id =

# Path to Google Credential JSON file. If omitted, authorization based on `the Application Default
# Credentials
# <https://cloud.google.com/docs/authentication/production#finding_credentials_automatically>`__ will
# be used.
google_key_path =

# Storage bucket URL for remote logging
# S3 buckets should start with "s3://"
# Cloudwatch log groups should start with "cloudwatch://"
# GCS buckets should start with "gs://"
# WASB buckets should start with "wasb" just to help Airflow select correct handler
# Stackdriver logs should start with "stackdriver://"
remote_base_log_folder =

# Use server-side encryption for logs stored in S3
encrypt_s3_logs = False

# Logging level.
#
# Supported values: ``CRITICAL``, ``ERROR``, ``WARNING``, ``INFO``, ``DEBUG``.
logging_level = INFO

# Logging level for Flask-appbuilder UI.
#
# Supported values: ``CRITICAL``, ``ERROR``, ``WARNING``, ``INFO``, ``DEBUG``.
fab_logging_level = WARN

# Logging class
# Specify the class that will specify the logging configuration
# This class has to be on the python classpath
# Example: logging_config_class = my.path.default_local_settings.LOGGING_CONFIG
logging_config_class = log_config.DEFAULT_LOGGING_CONFIG

# Flag to enable/disable Colored logs in Console
# Colour the logs when the controlling terminal is a TTY.
colored_console_log = True

# Log format for when Colored logs is enabled
colored_log_format = [%%(blue)s%%(asctime)s%%(reset)s] {{%%(blue)s%%(filename)s:%%(reset)s%%(lineno)d}} %%(log_color)s%%(levelname)s%%(reset)s - %%(log_color)s%%(message)s%%(reset)s
colored_formatter_class = airflow.utils.log.colored_log.CustomTTYColoredFormatter

# Format of Log line
log_format = [%%(asctime)s] {{%%(filename)s:%%(lineno)d}} %%(levelname)s - %%(message)s
simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s

# Specify prefix pattern like mentioned below with stream handler TaskHandlerWithCustomFormatter
# Example: task_log_prefix_template = {{ti.dag_id}}-{{ti.task_id}}-{{execution_date}}-{{try_number}}
task_log_prefix_template =

# Formatting for how airflow generates file names/paths for each task run.
log_filename_template = {{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log

# Formatting for how airflow generates file names for log
log_processor_filename_template = {{ filename }}.log

# full path of dag_processor_manager logfile
dag_processor_manager_log_location = /usr/local/airflow/dag_processor_manager/dag_processor_manager.log

# Name of handler to read task instance logs.
# Defaults to use ``task`` handler.
task_log_reader = task

# A comma\-separated list of third-party logger names that will be configured to print messages to
# consoles\.
# Example: extra_loggers = connexion,sqlalchemy
extra_loggers =

[metrics]

# StatsD (https://github.com/etsy/statsd) integration settings.
# Enables sending metrics to StatsD.
statsd_on = False
statsd_host = localhost
statsd_port = 8125
statsd_prefix = airflow

# If you want to avoid sending all the available metrics to StatsD,
# you can configure an allow list of prefixes (comma separated) to send only the metrics that
# start with the elements of the list (e.g: "scheduler,executor,dagrun")
statsd_allow_list =

# A function that validate the statsd stat name, apply changes to the stat name if necessary and return
# the transformed stat name.
#
# The function should have the following signature:
# def func_name(stat_name: str) -> str:
stat_name_handler =

# To enable datadog integration to send airflow metrics.
statsd_datadog_enabled = False

# List of datadog tags attached to all metrics(e.g: key1:value1,key2:value2)
statsd_datadog_tags =

# If you want to utilise your own custom Statsd client set the relevant
# module path below.
# Note: The module path must exist on your PYTHONPATH for Airflow to pick it up
# statsd_custom_client_path =

[secrets]
# Full class name of secrets backend to enable (will precede env vars and metastore in search path)
# Example: backend = airflow.providers.amazon.aws.secrets.systems_manager.SystemsManagerParameterStoreBackend
backend =

# The backend_kwargs param is loaded into a dictionary and passed to __init__ of secrets backend class.
# See documentation for the secrets backend you are using. JSON is expected.
# Example for AWS Systems Manager ParameterStore:
# ``{{"connections_prefix": "/airflow/connections", "profile_name": "default"}}``
backend_kwargs =

[cli]
# In what way should the cli access the API. The LocalClient will use the
# database directly, while the json_client will use the api running on the
# webserver
api_client = airflow.api.client.local_client

# If you set web_server_url_prefix, do NOT forget to append it here, ex:
# ``endpoint_url = http://localhost:8080/myroot``
# So api will look like: ``http://localhost:8080/myroot/api/experimental/...``
endpoint_url = http://localhost:$WEB_SERVER_PORT

[debug]
# Used only with ``DebugExecutor``. If set to ``True`` DAG will fail with first
# failed task. Helpful for debugging purposes.
fail_fast = False

[api]
# Enables the deprecated experimental API. Please note that these APIs do not have access control.
# The authenticated user has full access.
#
# .. warning::
#
#   This `Experimental REST API <https://airflow.readthedocs.io/en/latest/rest-api-ref.html>`__ is
#   deprecated since version 2.0. Please consider using
#   `the Stable REST API <https://airflow.readthedocs.io/en/latest/stable-rest-api-ref.html>`__.
#   For more information on migration, see
#   `UPDATING.md <https://github.com/apache/airflow/blob/main/UPDATING.md>`_
enable_experimental_api = False

# How to authenticate users of the API. See
# https://airflow.apache.org/docs/apache-airflow/stable/security.html for possible values.
# ("airflow.api.auth.backend.default" allows all requests for historic reasons)
auth_backend = airflow.api.auth.backend.basic_auth

# Used to set the maximum page limit for API requests
maximum_page_limit = 100

# Used to set the default page limit when limit is zero. A default limit
# of 100 is set on OpenApi spec. However, this particular default limit
# only work when limit is set equal to zero(0) from API requests.
# If no limit is supplied, the OpenApi spec default is used.
fallback_page_limit = 100

# The intended audience for JWT token credentials used for authorization. This value must match on the client and server sides. If empty, audience will not be tested.
# Example: google_oauth2_audience = project-id-random-value.apps.googleusercontent.com
google_oauth2_audience =

# Path to Google Cloud Service Account key file (JSON). If omitted, authorization based on
# `the Application Default Credentials
# <https://cloud.google.com/docs/authentication/production#finding_credentials_automatically>`__ will
# be used.
# Example: google_key_path = /files/service-account-json
google_key_path =

# Used in response to a preflight request to indicate which HTTP
# headers can be used when making the actual request. This header is
# the server side response to the browser's
# Access-Control-Request-Headers header.
access_control_allow_headers =

# Specifies the method or methods allowed when accessing the resource.
access_control_allow_methods =

# Indicates whether the response can be shared with requesting code from the given origin.
access_control_allow_origin =

[operators]
# The default owner assigned to each new operator, unless
# provided explicitly or passed via ``default_args``
default_owner = airflow
default_cpus = 1
default_ram = 512
default_disk = 512
default_gpus = 0

# Default queue that tasks get assigned to and that worker listen on.
default_queue = default

# Is allowed to pass additional/unused arguments (args, kwargs) to the BaseOperator operator.
# If set to False, an exception will be thrown, otherwise only the console message will be displayed.
allow_illegal_arguments = False

[webserver]
# The base url of your website as airflow cannot guess what domain or
# cname you are using. This is used in automated emails that
# airflow sends to point links to the right web server
base_url = http://localhost:$WEB_SERVER_PORT

# Default timezone to display all dates in the UI, can be UTC, system, or
# any IANA timezone string (e.g. Europe/Amsterdam). If left empty the
# default value of core/default_timezone will be used
# Example: default_ui_timezone = America/New_York
default_ui_timezone = America/Chicago

# The ip specified when starting the web server
web_server_host = 0.0.0.0

# The port on which to run the web server
web_server_port = $WEB_SERVER_PORT

# Paths to the SSL certificate and key for the web server. When both are
# provided SSL will be enabled. This does not change the web server port.
web_server_ssl_cert =

# Paths to the SSL certificate and key for the web server. When both are
# provided SSL will be enabled. This does not change the web server port.
web_server_ssl_key =

# Number of seconds the webserver waits before killing gunicorn master that doesn't respond
web_server_master_timeout = 120

# Number of seconds the gunicorn webserver waits before timing out on a worker
web_server_worker_timeout = 120

# Number of workers to refresh at a time. When set to 0, worker refresh is
# disabled. When nonzero, airflow periodically refreshes webserver workers by
# bringing up new ones and killing old ones.
worker_refresh_batch_size = 1

# Number of seconds to wait before refreshing a batch of workers.
worker_refresh_interval = 30

# If set to True, Airflow will track files in plugins_folder directory. When it detects changes,
# then reload the gunicorn.
reload_on_plugin_change = False

# Secret key used to run your flask app. It should be as random as possible. However, when running
# more than 1 instances of webserver, make sure all of them use the same ``secret_key`` otherwise
# one of them will error with "CSRF session token is missing".
secret_key = a08edd33546e509829b48d8c05310cf9

# Number of workers to run the Gunicorn web server
workers = 4

# The worker class gunicorn should use. Choices include
# sync (default), eventlet, gevent
worker_class = sync

# Log files for the gunicorn webserver. '-' means log to stderr.
access_logfile = -

# Log files for the gunicorn webserver. '-' means log to stderr.
error_logfile = -

# Access log format for gunicorn webserver.
# default format is %%(h)s %%(l)s %%(u)s %%(t)s "%%(r)s" %%(s)s %%(b)s "%%(f)s" "%%(a)s"
# documentation - https://docs.gunicorn.org/en/stable/settings.html#access-log-format
access_logformat =

# Expose the configuration file in the web server
expose_config = False

# Expose hostname in the web server
expose_hostname = True

# Expose stacktrace in the web server
expose_stacktrace = True

# Default DAG view. Valid values are: ``tree``, ``graph``, ``duration``, ``gantt``, ``landing_times``
dag_default_view = tree

# Default DAG orientation. Valid values are:
# ``LR`` (Left->Right), ``TB`` (Top->Bottom), ``RL`` (Right->Left), ``BT`` (Bottom->Top)
dag_orientation = LR

# The amount of time (in secs) webserver will wait for initial handshake
# while fetching logs from other worker machine
log_fetch_timeout_sec = 5

# Time interval (in secs) to wait before next log fetching.
log_fetch_delay_sec = 2

# Distance away from page bottom to enable auto tailing.
log_auto_tailing_offset = 30

# Animation speed for auto tailing log display.
log_animation_speed = 1000

# By default, the webserver shows paused DAGs. Flip this to hide paused
# DAGs by default
hide_paused_dags_by_default = False

# Consistent page size across all listing views in the UI
page_size = 100

# Define the color of navigation bar
navbar_color = #fff

# Default dagrun to show in UI
default_dag_run_display_number = 25

# Enable werkzeug ``ProxyFix`` middleware for reverse proxy
enable_proxy_fix = False

# Number of values to trust for ``X-Forwarded-For``.
# More info: https://werkzeug.palletsprojects.com/en/0.16.x/middleware/proxy_fix/
proxy_fix_x_for = 1

# Number of values to trust for ``X-Forwarded-Proto``
proxy_fix_x_proto = 1

# Number of values to trust for ``X-Forwarded-Host``
proxy_fix_x_host = 1

# Number of values to trust for ``X-Forwarded-Port``
proxy_fix_x_port = 1

# Number of values to trust for ``X-Forwarded-Prefix``
proxy_fix_x_prefix = 1

# Set secure flag on session cookie
cookie_secure = False

# Set samesite policy on session cookie
cookie_samesite = Lax

# Default setting for wrap toggle on DAG code and TI log views.
default_wrap = False

# Allow the UI to be rendered in a frame
x_frame_enabled = True

# Send anonymous user activity to your analytics tool
# choose from google_analytics, segment, or metarouter
# analytics_tool =

# Unique ID of your account in the analytics tool
# analytics_id =

# 'Recent Tasks' stats will show for old DagRuns if set
show_recent_stats_for_completed_runs = True

# Update FAB permissions and sync security manager roles
# on webserver startup
update_fab_perms = True

# The UI cookie lifetime in minutes. User will be logged out from UI after
# ``session_lifetime_minutes`` of non-activity
session_lifetime_minutes = 43200

# Sets a custom page title for the DAGs overview page and site title for all pages
# instance_name =

[email]

# Configuration email backend and whether to
# send email alerts on retry or failure
# Email backend to use
email_backend = airflow.utils.email.send_email_smtp

# Email connection to use
email_conn_id = smtp_default

# Whether email alerts should be sent when a task is retried
default_email_on_retry = True

# Whether email alerts should be sent when a task failed
default_email_on_failure = True

# File that will be used as the template for Email subject (which will be rendered using Jinja2).
# If not set, Airflow uses a base template.
# Example: subject_template = /path/to/my_subject_template_file
# subject_template =

# File that will be used as the template for Email content (which will be rendered using Jinja2).
# If not set, Airflow uses a base template.
# Example: html_content_template = /path/to/my_html_content_template_file
# html_content_template =

[smtp]

# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = localhost
smtp_starttls = True
smtp_ssl = False
# Example: smtp_user = airflow
# smtp_user =
# Example: smtp_password = airflow
# smtp_password =
smtp_port = 25
smtp_mail_from = airflow@example.com
smtp_timeout = 30
smtp_retry_limit = 5

[sentry]

# Sentry (https://docs.sentry.io) integration. Here you can supply
# additional configuration options based on the Python platform. See:
# https://docs.sentry.io/error-reporting/configuration/?platform=python.
# Unsupported options: ``integrations``, ``in_app_include``, ``in_app_exclude``,
# ``ignore_errors``, ``before_breadcrumb``, ``before_send``, ``transport``.
# Enable error reporting to Sentry
sentry_on = false
sentry_dsn =

[scheduler]
# Task instances listen for external kill signal (when you clear tasks
# from the CLI or the UI), this defines the frequency at which they should
# listen (in seconds).
job_heartbeat_sec = 5

# How often (in seconds) to check and tidy up 'running' TaskInstancess
# that no longer have a matching DagRun
clean_tis_without_dagrun_interval = 15

# The scheduler constantly tries to trigger new tasks (look at the
# scheduler section in the docs for more information). This defines
# how often the scheduler should run (in seconds).
scheduler_heartbeat_sec = 5

# The number of times to try to schedule each DAG file
# -1 indicates unlimited number
num_runs = -1

# The number of seconds to wait between consecutive DAG file processing
processor_poll_interval = 1

# Number of seconds after which a DAG file is parsed. The DAG file is parsed every
# ``min_file_process_interval`` number of seconds. Updates to DAGs are reflected after
# this interval. Keeping this number low will increase CPU usage.
min_file_process_interval = 60

# How often (in seconds) to scan the DAGs directory for new files. Default to 5 minutes.
dag_dir_list_interval = 300

# How often should stats be printed to the logs. Setting to 0 will disable printing stats
print_stats_interval = 30

# How often (in seconds) should pool usage stats be sent to statsd (if statsd_on is enabled)
pool_metrics_interval = 5

# If the last scheduler heartbeat happened more than scheduler_health_check_threshold
# ago (in seconds), scheduler is considered unhealthy.
# This is used by the health check in the "/health" endpoint
scheduler_health_check_threshold = 30

# How often (in seconds) should the scheduler check for orphaned tasks and SchedulerJobs
orphaned_tasks_check_interval = 300
child_process_log_directory = /usr/local/airflow/scheduler/logs

# Local task jobs periodically heartbeat to the DB. If the job has
# not heartbeat in this many seconds, the scheduler will mark the
# associated task instance as failed and will re-schedule the task.
scheduler_zombie_task_threshold = 300

# Turn off scheduler catchup by setting this to ``False``.
# Default behavior is unchanged and
# Command Line Backfills still work, but the scheduler
# will not do scheduler catchup if this is ``False``,
# however it can be set on a per DAG basis in the
# DAG definition (catchup)
catchup_by_default = True

# This changes the batch size of queries in the scheduling main loop.
# If this is too high, SQL query performance may be impacted by one
# or more of the following:
# - reversion to full table scan
# - complexity of query predicate
# - excessive locking
# Additionally, you may hit the maximum allowable query length for your db.
# Set this to 0 for no limit (not advised)
max_tis_per_query = 512

# Should the scheduler issue ``SELECT ... FOR UPDATE`` in relevant queries.
# If this is set to False then you should not run more than a single
# scheduler at once
use_row_level_locking = True

# Max number of DAGs to create DagRuns for per scheduler loop.
max_dagruns_to_create_per_loop = 10

# How many DagRuns should a scheduler examine (and lock) when scheduling
# and queuing tasks.
max_dagruns_per_loop_to_schedule = 20

# Should the Task supervisor process perform a "mini scheduler" to attempt to schedule more tasks of the
# same DAG. Leaving this on will mean tasks in the same DAG execute quicker, but might starve out other
# dags in some circumstances
schedule_after_task_execution = True

# The scheduler can run multiple processes in parallel to parse dags.
# This defines how many processes will run.
parsing_processes = 2

# One of ``modified_time``, ``random_seeded_by_host`` and ``alphabetical``.
# The scheduler will list and sort the dag files to decide the parsing order.
#
# * ``modified_time``: Sort by modified time of the files. This is useful on large scale to parse the
#   recently modified DAGs first.
# * ``random_seeded_by_host``: Sort randomly across multiple Schedulers but with same order on the
#   same host. This is useful when running with Scheduler in HA mode where each scheduler can
#   parse different DAG files.
# * ``alphabetical``: Sort by filename
file_parsing_sort_mode = modified_time

# Turn off scheduler use of cron intervals by setting this to False.
# DAGs submitted manually in the web UI or with trigger_dag will still run.
use_job_schedule = True

# Allow externally triggered DagRuns for Execution Dates in the future
# Only has effect if schedule_interval is set to None in DAG
allow_trigger_in_future = False

# DAG dependency detector class to use
dependency_detector = airflow.serialization.serialized_objects.DependencyDetector

[smart_sensor]
# When `use_smart_sensor` is True, Airflow redirects multiple qualified sensor tasks to
# smart sensor task.
use_smart_sensor = False

# `shard_code_upper_limit` is the upper limit of `shard_code` value. The `shard_code` is generated
# by `hashcode % shard_code_upper_limit`.
shard_code_upper_limit = 10000

# The number of running smart sensor processes for each service.
shards = 5

# comma separated sensor classes support in smart_sensor.
sensors_enabled = NamedHivePartitionSensor
potiuk commented 3 years ago

I think if importing your DAG takes > 30 seconds, then you simply try to do too much in your DAG's Top-Level code:

I just published some updates to documentation related to that - please read the docs and follow them:

https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html#top-level-python-code

and:

https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html#reducing-dag-complexity

hpatel-higi commented 3 years ago

Like i mentioned in the previous comment, in version 1.10.8 we didnt have this issue.

It just started happening after we upgraded to 2.1.4. Ours DAGs are already following the best practices.

Other thing to note is the CPU is usually around 100% usage when these errors happen.

potiuk commented 3 years ago

Like i mentioned in the previous comment, in version 1.10.8 we didnt have this issue.

It just started happening after we upgraded to 2.1.4. Ours DAGs are already following the best practices.

How do you kow that? Those best practices were pubkished LITERALLY 10 minutes ago.

About 90% of the content there was not existing a week ago even in a PR and was build basing on experience of people who had similar problems, so there is no way you could have followed it.

You can also increase the timeout of importing the DAGBag it after reviewing the practices you confirm that you followed them. I am converting this into a discussion, because clearly the problem is with import timeout.