villasv / aws-airflow-stack

Turbine: the bare metals that gets you Airflow
https://victor.villas/aws-airflow-stack/
MIT License
377 stars 69 forks source link

Error sending Celery task:An error occurred (AccessDenied) when calling the ListQueues operation: Access to the resource https://eu-central-1.queue.amazonaws.com/ is denied. #183

Closed ilyanoskov closed 4 years ago

ilyanoskov commented 4 years ago

I tried writing a simple DAG that will copy files from sftp to s3. Then Airflow UI stopped loading and I saw the error that is in the headline. Here is my dag:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.operators.s3_to_sftp_operator import S3ToSFTPOperator
from airflow.contrib.operators.sftp_to_s3_operator import SFTPToS3Operator
import airflow.hooks.S3_hook as S3_hook
import airflow.contrib.hooks.sftp_hook as sftp_hook
import awswrangler as wr
import pandas as pd
import pendulum

aws_session = S3_hook.S3Hook('aws').get_session(region_name='eu-central-1')
cowen_sftp = sftp_hook.SFTPHook('sftp')

s3_files = wr.s3.list_objects(path='s3://sftp', boto3_session=aws_session)

default_args = {
    'owner': 'processor',
    'depends_on_past': False,
    'start_date': datetime(2020, 5, 24, tzinfo=pendulum.timezone('Asia/Timezone')),
    'email': ['t@gmail.com', 'i@gmail.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 5,
    'retry_delay': timedelta(minutes=3)
}

dag = DAG('sync_c_to_s3', catchup = False, default_args=default_args, schedule_interval='0 * * * *')

def printer():
    print(s3_files)

with dag:
    print_s3_files = PythonOperator(
        task_id='print_s3_files',
        python_callable=printer
    )

    print_s3_files

This issue seems to be very similar to https://github.com/villasv/aws-airflow-stack/issues/180

ilyanoskov commented 4 years ago

Could you please advise, how do I adjust these permissions myself? I have a cluster running already and would want to avoid creating a new one. Thanks a lot in advance.

ilyanoskov commented 4 years ago

I also get an error RuntimeError: can't start new thread whenever I try to run this DAG

villasv commented 4 years ago

Could you please advise, how do I adjust these permissions myself?

You can check your CloudFormation deployment to see the name of the IAM Role created in the TurbineScheduler nested stack (or search it directly in the IAM console). It has a bunch of policies that you can alter, or add your own.

villasv commented 4 years ago

This issue seems to be very similar to #180

Indeed. Does that error come from the scheduler instance?

Here's the IAM policy that is attached to it and it should have allowed that request:

        - PolicyName: !Sub TurbineAirflowSchedulerQueueRWPolicy-${AWS::StackName}
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - sqs:ListQueues
                Resource:
                  - !Sub arn:aws:sqs:${AWS::Region}:${AWS::AccountId}:*

I don't see anything wrong with it at first. I'm going to deploy it to eu-central-1 later and check it out.

ilyanoskov commented 4 years ago

@villasv, no, the error came from web server I believe. I checked, none of the cluster roles have ListQueues permissions in my stack

villasv commented 4 years ago

That's pretty weird. Do you see any TurbineAirflowSchedulerQueueRWPolicy-* policies? Those should have been created and associated correctly. Maybe you can use CFN drift detection to check what happened.

ilyanoskov commented 4 years ago

@villasv after checking again, I found the policies, so it seems to be a singular occurence. I grep'ed for the error again in webserver's journalctl. The error only appears once, so it could be a one-off error by Amazon? If I see it again I will reopen this issue.

sh-4.2$ sudo journalctl | grep ListQueues
May 24 19:52:38 ip-10-0-1-232.eu-central-1.compute.internal airflow[8682]: [2020-05-24 19:52:38,343] {celery_executor.py:226} ERROR - Error sending Celery task:An error occurred (AccessDenied) when calling the ListQueues operation: Access to the resource https://eu-central-1.queue.amazonaws.com/ is denied.
May 24 19:52:38 ip-10-0-1-232.eu-central-1.compute.internal airflow[8682]: botocore.exceptions.ClientError: An error occurred (AccessDenied) whencalling the ListQueues operation: Access to the resource https://eu-central-1.queue.amazonaws.com/ is denied.

The original reason why I started looking into logs is that I always get RuntimeError: can't start new thread errors on webserver, which makes it reboot. This would happen when I press "start" on a dag or try to view its logs.

Do you have any thoughts, where could that issue come from? Could it be solved by scaling up to bigger instance for webserver?

villasv commented 4 years ago

after checking again, I found the policies, so it seems to be a singular occurence. I grep'ed for the error again in webserver's journalctl. The error only appears once, so it could be a one-off error by Amazon? If I see it again I will reopen this issue.

Could be, if the service starts before IAM kicks in (it's eventually consistent so it might take a while).

The original reason why I started looking into logs is that I always get RuntimeError: can't start new thread errors on webserver, which makes it reboot. This would happen when I press "start" on a dag or try to view its logs.

Do you have any thoughts, where could that issue come from? Could it be solved by scaling up to bigger instance for webserver?

If you have too many DAGs, maybe the default instance size isn't enough? I suggest you take a look at the resource utilization on the webserver instance and experiment using a larger instance. The error message isn't very informative and I'm not sure about it, but I suspect the same thing you do.

ilyanoskov commented 4 years ago

resolved the issue RuntimeError: can't start new thread by scaling up to t3.medium for webserver