apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.73k stars 14.22k forks source link

Can't read S3 remote logs when using gevent/eventlent webserver workers. #8212

Closed webster-chainalysis closed 1 year ago

webster-chainalysis commented 4 years ago

Hey everyone. I’ve upgraded to 1.10.9. It appears that the logging changes broke the functionality for reading S3 remote logs in the Web UI (writing is ok). In the change log it mentions that Airflow's logging mechanism has been refactored to uses Python’s builtin logging module:

[AIRFLOW-1611] Customize logging

I followed the directions in the changelog and created the following log config:

import os
import six

from airflow import AirflowException
from airflow.configuration import conf
from airflow.utils.file import mkdirs
from typing import Dict, Any

LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()

FAB_LOG_LEVEL = conf.get('core', 'FAB_LOGGING_LEVEL').upper()

LOG_FORMAT = conf.get('core', 'LOG_FORMAT')

COLORED_LOG_FORMAT = conf.get('core', 'COLORED_LOG_FORMAT')

COLORED_LOG = conf.getboolean('core', 'COLORED_CONSOLE_LOG')

COLORED_FORMATTER_CLASS = conf.get('core', 'COLORED_FORMATTER_CLASS')

BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')

PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'CHILD_PROCESS_LOG_DIRECTORY')

DAG_PROCESSOR_MANAGER_LOG_LOCATION = \
    conf.get('core', 'DAG_PROCESSOR_MANAGER_LOG_LOCATION')

FILENAME_TEMPLATE = conf.get('core', 'LOG_FILENAME_TEMPLATE')

PROCESSOR_FILENAME_TEMPLATE = conf.get('core', 'LOG_PROCESSOR_FILENAME_TEMPLATE')

FORMATTER_CLASS_KEY = '()' if six.PY2 else 'class'

#
# Getting this from environment because the changelog for 1.10.9 says to set
# the path of `REMOTE_BASE_LOG_FOLDER` explicitly in the config. The
# `REMOTE_BASE_LOG_FOLDER` key is not used anymore.
#
REMOTE_BASE_LOG_FOLDER = os.environ.get('AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER')

LOGGING_CONFIG = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'airflow.task': {
            'format': LOG_FORMAT,
        },
        'airflow.processor': {
            'format': LOG_FORMAT,
        },
        'airflow_coloured': {
            'format': COLORED_LOG_FORMAT if COLORED_LOG else LOG_FORMAT,
            FORMATTER_CLASS_KEY: COLORED_FORMATTER_CLASS if COLORED_LOG else 'logging.Formatter'
        },
    },
    'handlers': {
        'console': {
            'class': 'logging.StreamHandler',
            'formatter': 'airflow.task',
            'stream': 'ext://sys.stdout'
        },
        'file.task': {
            'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
            'formatter': 'airflow.task',
            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
            'filename_template': FILENAME_TEMPLATE,
        },
        'file.processor': {
            'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
            'formatter': 'airflow.processor',
            'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
            'filename_template': PROCESSOR_FILENAME_TEMPLATE,
        },
        's3.task': {
            'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
            'formatter': 'airflow.task',
            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
            's3_log_folder': REMOTE_BASE_LOG_FOLDER,
            'filename_template': FILENAME_TEMPLATE,
        },
    },
    'loggers': {
        '': {
            'handlers': ['console'],
            'level': LOG_LEVEL
        },
        'airflow': {
            'handlers': ['console'],
            'level': LOG_LEVEL,
            'propagate': False,
        },
        'airflow.processor': {
            'handlers': ['file.processor'],
            'level': LOG_LEVEL,
            'propagate': True,
        },
        'airflow.task': {
            'handlers': ['s3.task'],
            'level': LOG_LEVEL,
            'propagate': False,
        },
        'airflow.task_runner': {
            'handlers': ['s3.task'],
            'level': LOG_LEVEL,
            'propagate': True,
        },
    }
}  # type: Dict[str, Any]

However, the task log reader is always defaulting to using the FileTaskHandler. This should not occur because I have the following settings in airflow.cfg:

remote_logging = True
remote_base_log_folder = s3://my-bucket-name
remote_log_conn_id = aws_default
task_log_reader = s3.task

The s3.task handler passed to the task_log_reader setting should be creating an instance of the S3TaskHandler class to read the task logs to from S3. This occurs when rendering the get_logs_with_metadata view in www/views.py.


Apache Airflow version: 1.10.9 Kubernetes version: 1.15

Environment:

What happened: Logs did not appear in the Airflow Web UI. The FileTaskHandler tries to fetch the file locally or from the worker on port 8793. However, the logs do not exist in either location since we are using the Kubernetes Executor. This produces the following errors messages:

*** Log file does not exist: /usr/local/airflow/logs/MY_DAG_NAME/MY_TASK_NAME/2020-04-07T20:59:19.312402+00:00/6.log
*** Fetching from: http://MY_DAG_NAME-0dde5ff5a786437cb14234:8793/log/MY_DAG_NAME/MY_TASK_NAME/2020-04-07T20:59:19.312402+00:00/6.log
*** Failed to fetch log file from worker. HTTPConnectionPool(host='MY_DAG_NAME-0dde5ff5a786437cb14234', port=8793): Max retries exceeded with url: /log/MY_DAG_NAME/MY_TASK_NAME/2020-04-07T20:59:19.312402+00:00/6.log (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f708332fc90>: Failed to establish a new connection: [Errno -2] Name or service not known'))

What you expected to happen:

The logs should be rendered in the Web UI using the S3TaskHandler class.

boring-cyborg[bot] commented 4 years ago

Thanks for opening your first issue here! Be sure to follow the issue template!

dimon222 commented 4 years ago

@webster-chainalysis I noticed such issue on 1.10.10. Do you also have same behaviour on this version?

webster-chainalysis commented 4 years ago

@dimon222 - Yes.

dimon222 commented 4 years ago

@webster-chainalysis Worker class switch in airflow.cfg from gevent/eventlet to "sync" seem to resolve the issue for me.

webster-chainalysis commented 4 years ago

@dimon222 - That makes sense. I'm currently using gevent behind an Amazon elastic load balancer. When using the sync worker class with the ELB I had unacceptable performance.

TRReeve commented 4 years ago

I have had this issue as well on Airflow 10.10 and been tearing my hear out at it a couple of days now. I'm running using the KubernetesOperator and the logging seems to upload to S3 fine it is only the reading of the log back from the web ui that is always in local mode.

TRReeve commented 4 years ago

Having gotten remote logging working on 1.10.10 I've noticed there seems to be a difference in how the workers handle the log upload versus the webserver component. The Connection you define needs to point to the same folder as the remote_base_logs_folder. E.g if your remote base logs folder is data/airflow/logs then your connection used for remote logging also needs to point to it e.g s3://access_key:secret@data/airflow/logs. You cannot just have a generic "AWS connection" and then have it figure out which folder it needs to point to on its own using the filepath from the worker.

marclamberti commented 4 years ago

@TRReeve I didn’t need to do that. Actually, if you want to have the remote logging working with the kubernetes executor, you have to define additional kubernetes environment variables so that your PODs are in sync with the scheduler/web server. Eg: AIRFLOWCOREREMOTE_LOGGING=True AIRFLOWCOREREMOTE_BASE_LOGS_FOLDER=s3://my-bucket/my-key AIRFLOWCOREREMOTE_LOG_CONN_ID=myawsconn AIRFLOWCOREFERNET_KEY=myfernetkey

AIRFLOWKUBERNETES_ENVIRONMENT_VARIABALESAIRFLOWCOREREMOTE_LOGGING=True AIRFLOWKUBERNETES_ENVIRONMENT_VARIABALESAIRFLOWCOREREMOTE_BASE_LOGS_FOLDER=s3://my-bucket/my-key AIRFLOWKUBERNETES_ENVIRONMENT_VARIABALESAIRFLOWCOREREMOTE_LOG_CONN_ID=myawsconn AIRFLOWKUBERNETES_ENVIRONMENT_VARIABALESAIRFLOWCOREFERNET_KEY=myfernetkey

The connection myawsconn with type S3 and extra field

Also I didn’t need to change the task handler as it is automatically changed when remote logging is set to True, and I didn’t have to define a custom logging class.

Hope it helps :)

TRReeve commented 4 years ago

@marclamberti Your answer is how I understood it would work as well and it half worked for me I would get the logs uploading fine into the S3 bucket but then when i went to "view logs" in the UI it would give the "logs not found" error with no output in the logs to indicate it was using the s3 connection or the read_key function to retrieve anything.

It would be really nice if I could just define AIRFLOW_CONN_S3_URI = s3://user:pass@S3 then have REMOTE_BASE_LOGS_FOLDER=s3://airflow-stuff/logs and the UI would build the path but I could only get logs uploading. My working helm template for airflow on k8s builds the connection s3://access_key:secret_key@{{ mys3path }} and then remote_log_path is s3://{{ mys3path }}. Aside that it's exactly the same as you defined above with the same variables defined under AIRFLOWKUBERNETESENVIRONMENT_VARIABLES.

And yes I can confirm for anyone reading there was no need for any custom logging classes or task handlers.

marclamberti commented 4 years ago

Well, in my case it works for both reading and writing logs from S3 with Airflow 1.10.10 and the Kubernetes executor I agreed it could be simpler :)

ashb commented 4 years ago

@webster-chainalysis I've just opened #9118 to improve the debugging of S3 log handler.

Could you also try the debug steps I put in that description there -- lets rule out that sort of problem.

prithvisathiya commented 4 years ago

Im also experiencing the exact same issue when I upgraded to 1.10.9, but Im still using LocalExecutor. I can clearly see from the S3 console that the logs are getting uploaded, but the Airflow UI is unable to read it back. It attempts to read the local folder and then just gives up with the following error:

*** Log file does not exist: /app/logs/dag_name/task_name/2020-07-05T19:21:09.715128+00:00/1.log
*** Fetching from: http://airflow-test-scheduler-5f8ccc76df-tc8j8:8793/log/dag_name/task_name/2020-07-05T19:21:09.715128+00:00/1.log
*** Failed to fetch log file from worker. HTTPConnectionPool(host='airflow-test-scheduler-5f8ccc76df-tc8j8', port=8793): Max retries exceeded with url: /log/dag_name/task_name/2020-07-05T19:21:09.715128+00:00/1.log 

My configurations below:

AIRFLOW__CORE__EXECUTOR=LocalExecutor
AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://{bucket}/logs/
AIRFLOW__CORE__REMOTE_LOGGING=True
AIRFLOW__CORE__REMOTE_LOG_CONN_ID=aws_default
AIRFLOW_CONN_AWS_DEFAULT=aws://?region_name=us-west-2

update: Also when I switched from gevent to sync worker class, it seemed to read fine again

alanbrent commented 4 years ago

Can confirm that using a threaded gunicorn worker (gevent in our case) breaks the web component's ability to show task logs that are in S3. Moving back to sync works for us, and since we're running on Kubernetes a few more web pods to absorb the ALB/ELB health checks isn't (so far) a performance concern.

Would love to see this fixed, however.

Using Airflow default aws_default for the S3 connection. Relevant configuration shared among web, scheduler, and worker pods below the fold ``` - AIRFLOW__CORE__REMOTE_LOGGING=True - AIRFLOW__CORE__REMOTE_LOG_CONN_ID=aws_default - AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://${OUR-BUCKET}/logs - AIRFLOW__WEBSERVER__WORKER_CLASS=sync - AIRFLOW__WEBSERVER__WORKER_REFRESH_INTERVAL=3600 # the default 30s worker lifetime is *way* too short - AWS_DEFAULT_REGION=us-east-1 - AWS_ROLE_ARN=arn:aws:iam::${OUR_AWS_ACCOUNT_ID}:role/${OUR_AIRFLOW_APP_ROLE} - AWS_WEB_IDENTITY_TOKEN_FILE=/var/run/secrets/eks.amazonaws.com/serviceaccount/token # This is how EKS does IAM<>Pod stuff ```
cmlad commented 4 years ago

The issue seems to be infinite recursion due to an interaction between gevent's monkey patching and the botocore library used by S3TaskHandler:

Traceback (most recent call last):
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/airflow/utils/log/s3_task_handler.py", line 131, in s3_log_exists
    return self.hook.get_key(remote_log_location) is not None
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/airflow/hooks/S3_hook.py", line 224, in get_key
    obj = self.get_resource_type('s3').Object(bucket_name, key)
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/airflow/contrib/hooks/aws_hook.py", line 186, in get_resource_type
    config=config, verify=self.verify)
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/boto3/session.py", line 389, in resource
    aws_session_token=aws_session_token, config=config)
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/boto3/session.py", line 263, in client
    aws_session_token=aws_session_token, config=config)
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/botocore/session.py", line 823, in create_client
    credentials = self.get_credentials()
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/botocore/session.py", line 428, in get_credentials
    'credential_provider').load_credentials()
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/botocore/session.py", line 919, in get_component
    self._components[name] = factory()
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/botocore/session.py", line 149, in _create_credential_resolver
    self, region_name=self._last_client_region_used
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/botocore/credentials.py", line 70, in create_credential_resolver
    container_provider = ContainerProvider()
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/botocore/credentials.py", line 1803, in __init__
    fetcher = ContainerMetadataFetcher()
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/botocore/utils.py", line 1578, in __init__
    timeout=self.TIMEOUT_SECONDS
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/botocore/httpsession.py", line 180, in __init__
    self._manager = PoolManager(**self._get_pool_manager_kwargs())
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/botocore/httpsession.py", line 188, in _get_pool_manager_kwargs
    'ssl_context': self._get_ssl_context(),
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/botocore/httpsession.py", line 197, in _get_ssl_context
    return create_urllib3_context()
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/botocore/httpsession.py", line 72, in create_urllib3_context
    context.options |= options
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/ssl.py", line 507, in options
    super(SSLContext, SSLContext).options.__set__(self, value)
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/ssl.py", line 507, in options
    super(SSLContext, SSLContext).options.__set__(self, value)
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/ssl.py", line 507, in options
    super(SSLContext, SSLContext).options.__set__(self, value)
  [Previous line repeated 409 more times]
RecursionError: maximum recursion depth exceeded

I still need to dive deeper into this

dimon222 commented 4 years ago

@cmlad I will make very bad suggestion, but would it work with regular HTTP endpoint instead (without SSL)? Just something to try out.

dimon222 commented 4 years ago

@cmlad yes its the reason for sure. And HTTP vs HTTPS no difference.

I randomly found potential fix in one of previously mentioned issues https://github.com/apache/airflow/issues/8164#issuecomment-629621571 This works, prepending monkey patching in dagbag.py. What is invoking dagbag.py so early ? Not sure.

Siddharthk commented 4 years ago

How can I read logs from worker pods? This is important since we need to see logs in real time to see whats happening. S3 logs are available only when task gets completed. I am getting below error currently:

Log file does not exist: /opt/airflow/logs/mydag/mytask/2020-07-21T11:58:55.019748+00:00/2.log
*** Fetching from: http://taskpod-49ccd964791a4740b199:8793/log/mydag/mytask/2020-07-21T11:58:55.019748+00:00/2.log
*** Failed to fetch log file from worker. HTTPConnectionPool(host='taskpod-49ccd964791a4740b199', port=8793): Max retries exceeded with url: /log/mydag/mytask/2020-07-21T11:58:55.019748+00:00/2.log (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f486c9b8be0>: Failed to establish a new connection: [Errno -3] Temporary failure in name resolution',))
TRReeve commented 4 years ago

@Siddharthk If you're using any kind of distributed execution (including kubernetes) you wont be able to read logs from the airflow UI while the executor is running using airflow "out of the box". Airflow pushes the log to remote storage when the task is completed and then the UI reads them from there.

I would guess if you do need to get real time logs from batch tasks while they are running maybe you can set up some sort of worker image with fluentd installed or something similar to scrape stdout and push it somewhere else while airflow is running on it? Either way there'll have to be some sort of external integration

dimon222 commented 4 years ago

Solution mentioned above by me

@cmlad yes its the reason for sure. And HTTP vs HTTPS no difference.

I randomly found potential fix in one of previously mentioned issues #8164 (comment) This works, prepending monkey patching in dagbag.py. What is invoking dagbag.py so early ? Not sure.

This no longer works in 1.10.12. Perhaps, something gets loaded earlier again and messes up urllib3. The issue is still present.

dimon222 commented 4 years ago

Update - new solution is adding gevent monkey patching to top of config_templates/airflow_local_settings.py. It works.

peycho commented 4 years ago

We're facing the same recursion error when using gevent workers.

RecursionError: maximum recursion depth exceeded while calling a Python object

Airflow version 1.10.9

toderesa97 commented 4 years ago

We are experiencing the same issue here. Airflow does sucessfully write logs to S3, but we are getting:

Log file does not exist: /opt/airflow/logs/mydag/mytask/2020-07-21T11:58:55.019748+00:00/2.log
*** Fetching from: http://taskpod-49ccd964791a4740b199:8793/log/mydag/mytask/2020-07-21T11:58:55.019748+00:00/2.log
*** Failed to fetch log file from worker. HTTPConnectionPool(host='taskpod-49ccd964791a4740b199', port=8793): Max retries exceeded with url: /log/mydag/mytask/2020-07-21T11:58:55.019748+00:00/2.log (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f486c9b8be0>: Failed to establish a new connection: [Errno -3] Connection refused',))

I have configured my airflow.cfg as follows:

remote_logging = True
remote_log_conn_id = S3Conn
remote_base_log_folder = s3://dlakeufd-dev-audit/logs/airflow
encrypt_s3_logs = False
logging_config_class = airflow.utils.log.s3_task_handler.S3TaskHandler
task_log_reader = s3.task

But we are getting this exception when the docker image is being provisioned into an ECS container (ie, nothing to do with ECS):

...
File "/usr/local/lib/python3.7/site-packages/airflow/logging_config.py", line 53, in configure_logging
2020-09-03 21:16:15.format(logging_class_path, err)
2020-09-03 21:16:15ImportError: Unable to load custom logging from airflow.utils.log.s3_task_handler.S3TaskHandler due to
...

(There is no further explanation after the due to)

I don't get to understand why after extracting the attribute of the class it is expected to be a dict type, which clearly fails. See this class where the exception is triggered in line 43.

nielstenboom commented 4 years ago

I also ran into this problem today. What I found out was that boto complains about not having credentials set, so the webserver pod needed the AWS environment variables.

AWS_ACCESS_KEY_ID=<id>
AWS_SECRET_ACCESS_KEY=<secret>

Making sure the webserver pod had those credentials as env variables in addition to having a s3 connection string solved my problem!

danielpoonwj commented 4 years ago

Update - new solution is adding gevent monkey patching to top of config_templates/airflow_local_settings.py. It works.

Previously I had to manually patch botocore to fix this by removing bypassing of patched SSLContext, but it broke in Airflow 1.10.12.

I ran into IOErrors on the scheduler when using monkey.patch_all() similar to this and the suggested fix monkey.patch_all(thread=False, socket=False) caused warnings on threading.

In the end I figured the issue was only with monkey-patching SSL as somehow it's imported earlier than gunicorn expected as seen from the warning in the webserver logs.

Sep 15 02:10:45 ubuntu-xenial pipenv[22660]: /etc/airflow/.local/share/virtualenvs/airflow-bTdwlyD1/lib/python3.8/site-packages/gunicorn/workers/ggevent.py:53: MonkeyPatchWarning: Monkey-patching ssl after ssl has already been imported may lead to errors, including RecursionError on Python 3.6. It may also silently lead to incorrect behaviour on Python 3.7. Please monkey-patch earlier. See https://github.com/gevent/gevent/issues/1016. Modules that had direct imports (NOT patched): ['urllib3.util (/etc/airflow/.local/share/virtualenvs/airflow-bTdwlyD1/lib/python3.8/site-packages/urllib3/util/__init__.py)', 'urllib3.util.ssl_ (/etc/airflow/.local/share/virtualenvs/airflow-bTdwlyD1/lib/python3.8/site-packages/urllib3/util/ssl_.py)'].
Sep 15 02:10:45 ubuntu-xenial pipenv[22660]:   monkey.patch_all()

In the end adding this to the top of airflow_local_settings.py worked for me.

from gevent import monkey
monkey.patch_ssl()
soumilshah1995 commented 3 years ago

ok i have the same issue can someone please explain what do i need to do ? using docker-compose and tried almost everything could not get this to work [2021-02-04 17:51:08,227] {s3_task_handler.py:52} ERROR - Could not create an S3Hook with connection id "AWSS3LogStorage". Please make sure that airflow[s3] is installed and the S3 connection exists.

scaoupgrade commented 3 years ago

The issue seems to be infinite recursion due to an interaction between gevent's monkey patching and the botocore library used by S3TaskHandler:

Traceback (most recent call last):
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/airflow/utils/log/s3_task_handler.py", line 131, in s3_log_exists
    return self.hook.get_key(remote_log_location) is not None
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/airflow/hooks/S3_hook.py", line 224, in get_key
    obj = self.get_resource_type('s3').Object(bucket_name, key)
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/airflow/contrib/hooks/aws_hook.py", line 186, in get_resource_type
    config=config, verify=self.verify)
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/boto3/session.py", line 389, in resource
    aws_session_token=aws_session_token, config=config)
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/boto3/session.py", line 263, in client
    aws_session_token=aws_session_token, config=config)
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/botocore/session.py", line 823, in create_client
    credentials = self.get_credentials()
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/botocore/session.py", line 428, in get_credentials
    'credential_provider').load_credentials()
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/botocore/session.py", line 919, in get_component
    self._components[name] = factory()
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/botocore/session.py", line 149, in _create_credential_resolver
    self, region_name=self._last_client_region_used
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/botocore/credentials.py", line 70, in create_credential_resolver
    container_provider = ContainerProvider()
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/botocore/credentials.py", line 1803, in __init__
    fetcher = ContainerMetadataFetcher()
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/botocore/utils.py", line 1578, in __init__
    timeout=self.TIMEOUT_SECONDS
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/botocore/httpsession.py", line 180, in __init__
    self._manager = PoolManager(**self._get_pool_manager_kwargs())
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/botocore/httpsession.py", line 188, in _get_pool_manager_kwargs
    'ssl_context': self._get_ssl_context(),
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/botocore/httpsession.py", line 197, in _get_ssl_context
    return create_urllib3_context()
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/botocore/httpsession.py", line 72, in create_urllib3_context
    context.options |= options
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/ssl.py", line 507, in options
    super(SSLContext, SSLContext).options.__set__(self, value)
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/ssl.py", line 507, in options
    super(SSLContext, SSLContext).options.__set__(self, value)
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/ssl.py", line 507, in options
    super(SSLContext, SSLContext).options.__set__(self, value)
  [Previous line repeated 409 more times]
RecursionError: maximum recursion depth exceeded

I still need to dive deeper into this

this is the root cause. Related to the monkey patching and import order of request module: https://github.com/gevent/gevent/issues/1016#issuecomment-328529454

ashb commented 3 years ago

Closing this as it seems to be an interaction between botocore and gevent, so not something Airflow can fix.

Happy to re-open this if I've misunderstood!

dimon222 commented 3 years ago

@ashb as mentioned above, the logging is completely broken for s3 when using gevent, I don't think this ticket should be closed. The solution is to ensure monkey patching is happening as early as possible before anything else gets imported, I believe currently it's very top of airflow_local_settings.py as of version 1.10.14 (potentially same for 2.0.1, but I haven't validated)

ashb commented 3 years ago

Gevent is not something Airflow uses though - and airflow_local_settings is a user controlled file.

ashb commented 3 years ago

If we can fix this easilY though we'll gladly accept a PR

soumilshah1995 commented 3 years ago

https://youtu.be/33Yv19YSYxk I made a video on how to use it

On Sun, Mar 7, 2021 at 9:12 AM Ash Berlin-Taylor notifications@github.com wrote:

If we can fix this easilY though we'll gladly accept a PR

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/apache/airflow/issues/8212#issuecomment-792285617, or unsubscribe https://github.com/notifications/unsubscribe-auth/AJMF5P22PLZKHUR6FWFUYNLTCOCUNANCNFSM4MD42V7Q .

-- Thanking You, Soumil Nitin Shah

B.E in Electronic MS Electrical Engineering MS Computer Engineering +1-646 204 5957

dimon222 commented 3 years ago

Gevent is not something Airflow uses though - and airflow_local_settings is a user controlled file.

Based on my very brief look gevent is currently used just as alternative for webserver request handling (so its true that this entire issue is only about reading s3 logs in webserver component, the rest: scheduler, or airflow workers if there are any - shouldn't anyhow be touched - are working correctly, and this monkeypatching actually can break them instead). About airflow_local_settings - I think its not true because of https://github.com/apache/airflow/blob/1.10.14/airflow/settings.py#L416. Attention - it doesn't matter that we may not call methods in airflow_local_settings, what matters is package import order where gevent monkeypatching should be first call in entrypoint or as soon as possible (in this case, it should be before botocore and requests imports).

What I mentioned - is more a "dirty quick" that I found via reading stack and bruteforcing, since I have only small picture of how airflow start flow of Webserver process goes (especially, when Gevent Worker forking is impacting on result). Only dev who has clear understanding where everything begins for forked gevent worker might understand where to include this monkeypatching, plus should also have to ensure that its happening only for gevent, as such action for other types of webserver engine would cause issues instead (sync, eventlet).

PS: in any case, since issue is present, and it might be critical for infrastructure handling - its unreasonable to have this closed independent of how difficult it is to fix it. (its not difficult, just requires some thorough look)

ashb commented 3 years ago

re airflow_local_settings -- that file does not exist in Airflow install, it is a place for site admins to be able to customize settings.

Using gevent is a fairly uncommon scenario (precisely because it's monkey patching makes it hard to work with many other libraries) so we are unlikely to have the time to fix this ourselves.

We will happily review and accept a PR that does this however.

alannesta commented 3 years ago

We are using celery worker for airflow(v1.10.12) and s3 logging is broken. it seems to be celery's gevent patching that has caused the issue: https://github.com/celery/celery/blob/6eb5b718843d69e31bb2c90e3efa2e2aa39f5f94/celery/__init__.py#L114

The import of celery executor from airflow views.py: https://github.com/apache/airflow/blob/1.10.14/airflow/www_rbac/views.py#L975

dimon222 commented 3 years ago

If anyone pops on this issue, I wanted to mention that as of 2.1.3 the issue is still present, and solution that still works is prepending this:

from gevent import monkey; monkey.patch_ssl()

to very top line of file airflow/config_templates/airflow_local_settings.py (or your respective actual file if you have made one) of Airflow Webserver component.

Make sure to not touch any other airflow components in your cluster like Airflow Worker or Airflow Scheduler (so if you have single node setup, this may not work for you), as those are not going to behave correctly with monkeypatches for SSL.

dimon222 commented 2 years ago

Unfortunately, mentioned above hack worked in 2.1.4, but no longer works on 2.2.0. Some code refactoring have affected the loading flow of Airflow Webserver, so we're no longer patching SSL early enough (so monkeypatching exceptions are popping). I'm still digging in code to identify where we should plug hack now.

UPD few hours later: now you need to add it to the very top of file airflow/settings.py instead of airflow_local_settings.py

alannesta commented 2 years ago

Unfortunately, mentioned above hack worked in 2.1.4, but no longer works on 2.2.0. Some code refactoring have affected the loading flow of Airflow Webserver, so we're no longer patching SSL early enough (so monkeypatching exceptions are popping). I'm still digging in code to identify where we should plug hack now.

UPD few hours later: now you need to add it to the very top of file airflow/settings.py instead of airflow_local_settings.py

This would require you to maintain your own fork of airflow, which will be a pain

uranusjr commented 2 years ago

If anyone is willing to dig into what exactly is causing the web stuff to load too early, we can implement some workaround to keep local settings working.

dimon222 commented 2 years ago

Unfortunately, mentioned above hack worked in 2.1.4, but no longer works on 2.2.0. Some code refactoring have affected the loading flow of Airflow Webserver, so we're no longer patching SSL early enough (so monkeypatching exceptions are popping). I'm still digging in code to identify where we should plug hack now. UPD few hours later: now you need to add it to the very top of file airflow/settings.py instead of airflow_local_settings.py

This would require you to maintain your own fork of airflow, which will be a pain

As this is internal component of airflow, obviously. I'm just prepending line in dockefile, so my use case for K8s is fairly simple. As long as it works and client is happy, I'm fine... This issue was present for years, so my hopes left me long ago.

iBalag commented 2 years ago

I faced with similar issue when I tried to get secret from secrets backend (HashiCorp Vault) in webserver_config.py.

from airflow.models import Variable
...
AUTH_LDAP_BIND_USER = Variable.get("ldap-bind-user")
AUTH_LDAP_BIND_PASSWORD = Variable.get("ldap-bind-password")

So, based on answers of @dimon222, I had to patch airflow/settings.py during my custom docker image build. Below the proof of concept:

FROM apache/airflow:2.2.3-python3.8
USER airflow
RUN sed -i '1s/^/from gevent import monkey; monkey.patch_ssl() \n/' /home/airflow/.local/lib/python3.8/site-packages/airflow/settings.py

And it works fine! I hope this patch will be included into official airflow code.

alannesta commented 2 years ago

After trying with monkey_patch and causing tons of unexpected issues here and there, I find the simplest solution is to set worker_class to sync(which is the default) instead of gevent: https://github.com/apache/airflow/blob/main/airflow/config_templates/default_airflow.cfg#L573

YevhenKv commented 2 years ago

changing worker_class to sync on 2.2.5 doesn't help

aa3pankaj commented 1 year ago

any update? facing similar error on airflow 2.4.2 also

potiuk commented 1 year ago

I believe this has been fixed in https://github.com/apache/airflow/pull/27297 and will be available in 2.4.3. Closing it provisionally @aa3pankaj try this workaround to see if it solves the problem for 2.4.2:

if the preload fix is indeed the fix, you can suppress this for now by setting the GUNICORN_CMD_ARGS environment variable to --preload

Suggested here: https://github.com/apache/airflow/issues/23512#issuecomment-1297310890

aa3pankaj commented 1 year ago

@potiuk monkey patching ssl via airflow_local_settings.py worked for me (in airflow 2.4.2)

potiuk commented 1 year ago

@potiuk monkey patching ssl via airflow_local_settings.py worked for me (in airflow 2.4.2)

Can you try the --preload case instead ?

potiuk commented 1 year ago

OKl. I think I see what is the problem now.

potiuk commented 1 year ago

@aa3pankaj -> can you please apply and try my fix instead from #27546 ? I think it should finally solve the problem for you when we merge without further need of patching Airflow.

Also @dimon222 -> maybe you can also try this one. I know it's been years but I stumbled upon this one now and after a bit reading it seems the solution is easy and should have no side effects.

aa3pankaj commented 1 year ago

@potiuk I faced this issue even with worker_class = eventlet, looks like your PR is fixing it for gevent worker_class only?

potiuk commented 1 year ago

I only handed gevent as this was the only one I knew about. But once you confirm it works for you for gevent I see that there is a similar issue https://github.com/apache/airflow/issues/15116 and seems we can use similar technique by adding eventlet configuration.

    import eventlet
    eventlet.monkey_patch()

But this should be a separate PR once we get confirmation that the approach works for you. Since you have s3 loigging configured and very interested, I will hold on until I get confirmation.