apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
35.26k stars 13.78k forks source link

Celery Executor: Issue with broker connection when 'celery_ssl_active' is True #39210

Open rdeveloper21 opened 2 months ago

rdeveloper21 commented 2 months ago

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.7.2

What happened?

Basically in our airflow setup we had implemented rabbitmq broker url connection for celery executor. Later on, due to certain vulnerabilities issue, we had implemented SSL for rabbitmq connection. To handle same we changed the value of below config variables as per documentation: "AIRFLOWCELERYBROKER_URL": "amqps://user:password@ip:port/" "AIRFLOWCELERYSSL_ACTIVE": "True" "AIRFLOWCELERYSSL_CACERT": "/path/to/cacert.pem"

But still we were observing airflow exception: raise AirflowException( airflow.exceptions.AirflowException: Exception: There was an unknown Celery SSL Error. Please ensure you want to use SSL and/or have all necessary certs and key (The broker you configured does not support SSL_ACTIVE to be True. Please use RabbitMQ or Redis if you would like to use SSL for broker.).

Then after going through source code we found below: airflow_ssl_bug Due to the condition >>"amqp://" in broker_url << the exception was getting raised. When tried by updating the condition and also commenting the "keyfile" and "certfile" in broker_use_ssl, we were able to connect to rabbitmq and proceed.

What you think should happen instead?

The condition when 'celery_ssl_active' is True should be revisited and updated. As the condition if "amqp://" in broker_url is not justified.

How to reproduce

Implement SSL for rabbitmq broker. Update below variables: "AIRFLOWCELERYBROKER_URL": "amqps://user:password@ip:port/" "AIRFLOWCELERYSSL_ACTIVE": "True" "AIRFLOWCELERYSSL_CACERT": "/path/to/cacert.pem" and try to connect broker.

Operating System

linux

Versions of Apache Airflow Providers

apache-airflow-providers-celery==3.3.4

Deployment

Other Docker-based deployment

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 2 months ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

RNHTTR commented 2 months ago

Are you able to try to reproduce on the latest version of the celery provider (3.6.2)?

NBardelot commented 2 months ago

Wouldn't the amqps:// protocol only make TLS-enabled connections? It seems AIRFLOW__CELERY__SSL_ACTIVE must then be True in this case and Airflow behaviour is correct.

See https://www.rabbitmq.com/docs/uri-spec#the-amqps-uri-scheme

The "amqps" URI scheme is used to instruct a client to make an secured connection to the server.

And disabling TLS would then go with switching back to the amqp:// URI scheme.

rdeveloper21 commented 2 months ago

@NBardelot My mistake... actually in the above steps to reproduce there is one correction... Implement SSL for rabbitmq broker. Update below variables: "AIRFLOWCELERYBROKER_URL": "amqps://user:password@ip:port/" "AIRFLOWCELERYSSL_ACTIVE": "True" "AIRFLOWCELERYSSL_CACERT": "/path/to/cacert.pem" and try to connect broker.

In stead of False it is True.

rdeveloper21 commented 2 months ago

@RNHTTR We install all the required packages using below command: pip install 'apache-airflow[celery]==2.7.2' --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.7.2/constraints-3.8.txt"

Tried with celery provider (3.6.2). Observing same error. Below is Traceback:

During handling of the above exception, another exception occurred:

Traceback (most recent call last): File "/Users/Admin/miniconda3/envs/airflow/bin/airflow", line 8, in sys.exit(main()) File "/Users/Admin/miniconda3/envs/airflow/lib/python3.8/site-packages/airflow/main.py", line 59, in main args.func(args) File "/Users/Admin/miniconda3/envs/airflow/lib/python3.8/site-packages/airflow/cli/cli_config.py", line 49, in command return func(*args, kwargs) File "/Users/Admin/miniconda3/envs/airflow/lib/python3.8/site-packages/airflow/utils/cli.py", line 113, in wrapper return f(*args, *kwargs) File "/Users/Admin/miniconda3/envs/airflow/lib/python3.8/site-packages/airflow/utils/providers_configuration_loader.py", line 55, in wrapped_function return func(args, kwargs) File "/Users/Admin/miniconda3/envs/airflow/lib/python3.8/site-packages/airflow/cli/commands/celery_command.py", line 142, in worker from airflow.providers.celery.executors.celery_executor import app as celery_app File "/Users/Admin/miniconda3/envs/airflow/lib/python3.8/site-packages/airflow/providers/celery/executors/celery_executor.py", line 107, in getattr from airflow.providers.celery.executors.celery_executor_utils import app File "/Users/Admin/miniconda3/envs/airflow/lib/python3.8/site-packages/airflow/providers/celery/executors/celery_executor_utils.py", line 99, in app = _get_celery_app() File "/Users/Admin/miniconda3/envs/airflow/lib/python3.8/site-packages/airflow/utils/providers_configuration_loader.py", line 55, in wrapped_function return func(*args, **kwargs) File "/Users/Admin/miniconda3/envs/airflow/lib/python3.8/site-packages/airflow/providers/celery/executors/celery_executor_utils.py", line 78, in _get_celery_app celery_configuration = conf.getimport("celery", "celery_config_options") File "/Users/Admin/miniconda3/envs/airflow/lib/python3.8/site-packages/airflow/configuration.py", line 1214, in getimport return import_string(full_qualified_path) File "/Users/Admin/miniconda3/envs/airflow/lib/python3.8/site-packages/airflow/utils/module_loading.py", line 37, in import_string module = import_module(module_path) File "/Users/Admin/miniconda3/envs/airflow/lib/python3.8/importlib/init.py", line 127, in import_module return _bootstrap._gcd_import(name[level:], package, level) File "/Users/Admin/miniconda3/envs/airflow/lib/python3.8/site-packages/airflow/providers/celery/executors/default_celery.py", line 131, in raise AirflowException( airflow.exceptions.AirflowException: Exception: There was an unknown Celery SSL Error. Please ensure you want to use SSL and/or have all necessary certs and key (The broker you configured does not support SSL_ACTIVE to be True. Please use RabbitMQ or Redis if you would like to use SSL for broker.). [2024-04-25T19:51:22.857+0530] {settings.py:351} DEBUG - Disposing DB connection pool (PID 70576) (airflow) LM0003366:myairflow Admin$ pip freeze |grep celery apache-airflow-providers-celery==3.6.2 celery==5.3.4