coiled / feedback

A place to provide Coiled feedback
14 stars 3 forks source link

Airflow S3 Authentication issues #155

Closed avriiil closed 1 year ago

avriiil commented 2 years ago

When writing data to S3 (with a ddf.to_parquet() call) using a Coiled cluster spun up within an Airflow task, credentials are not being passed correctly to the workers, resulting in a PermissionError: Access Denied, full traceback below.

The following works:

This seems to point to Airflow somehow getting in the way of Coiled passing the AWS credentials stored locally to the Dask workers.

Reproducer (run as Airflow DAG):

from datetime import datetime
import dask
import coiled
from dask.distributed import Client

from airflow.decorators import dag, task

# define path where we'll store the Parquet file
path = "s3://coiled-datasets/airflow-repro-coiled.parquet"

# define DAG as a function with the @dag decorator
@dag(
    schedule_interval=None, 
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=['coiled-demo'],
    )
def reproducer_coiled():

    @task()
    def write_to_s3_coiled():
        cluster = coiled.Cluster()
        client = Client(cluster)
        ddf = dask.datasets.timeseries("1990-01-01", "1990-02-01")
        ddf.to_parquet(path)
        return ddf

    ddf = write_to_s3_coiled()

# Call taskflow
demo = reproducer_coiled()

Substituting the Coiled cluster above with a Local Dask Cluster runs fine.

My local machine is set up with an IAM user with access key and secret access key set in the ~/.aws/credentials file under the [default] role.

Full traceback for PermissionError error from Airflow logs:

[2021-12-13, 16:14:56 UTC] {taskinstance.py:1262} INFO - Executing <Task(_PythonDecoratedOperator): transform_github_data> on 2021-12-13 15:06:58.446355+00:00
[2021-12-13, 16:14:56 UTC] {standard_task_runner.py:52} INFO - Started process 95452 to run task
[2021-12-13, 16:14:56 UTC] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'json_to_parquet', 'transform_github_data', 'manual__2021-12-13T15:06:58.446355+00:00', '--job-id', '290', '--raw', '--subdir', 'DAGS_FOLDER/airflow-coiled-json-parquet.py', '--cfg-path', '/var/folders/ky/bqjn_gxn1xv0cn_8q5xvp3q40000gn/T/tmpfjpbk3wj', '--error-file', '/var/folders/ky/bqjn_gxn1xv0cn_8q5xvp3q40000gn/T/tmppzf_kky1']
[2021-12-13, 16:14:56 UTC] {standard_task_runner.py:77} INFO - Job 290: Subtask transform_github_data
[2021-12-13, 16:14:56 UTC] {logging_mixin.py:109} INFO - Running <TaskInstance: json_to_parquet.transform_github_data manual__2021-12-13T15:06:58.446355+00:00 [running]> on host 1.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.ip6.arpa
[2021-12-13, 16:14:56 UTC] {taskinstance.py:1427} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_EMAIL=rpelgrim@coiled.io
AIRFLOW_CTX_DAG_OWNER=rpelgrim
AIRFLOW_CTX_DAG_ID=json_to_parquet
AIRFLOW_CTX_TASK_ID=transform_github_data
AIRFLOW_CTX_EXECUTION_DATE=2021-12-13T15:06:58.446355+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-12-13T15:06:58.446355+00:00
[2021-12-13, 16:14:57 UTC] {logging_mixin.py:109} INFO - Using existing cluster: 'airflow-task-json-parquet'
[2021-12-13, 16:14:59 UTC] {logging_mixin.py:109} INFO - 
[2021-12-13, 16:14:59 UTC] {logging_mixin.py:109} WARNING - /Users/rpelgrim/mambaforge/envs/airflow-new/lib/python3.9/site-packages/distributed/client.py:1131 VersionMismatchWarning: Mismatched versions found

+-------------+---------------+---------------+---------------+
| Package     | client        | scheduler     | workers       |
+-------------+---------------+---------------+---------------+
| dask        | 2021.11.2     | 2021.11.1     | 2021.11.1     |
| distributed | 2021.11.2     | 2021.11.1     | 2021.11.1     |
| msgpack     | 1.0.3         | 1.0.2         | 1.0.2         |
| numpy       | 1.20.3        | 1.21.4        | 1.21.4        |
| python      | 3.9.7.final.0 | 3.9.0.final.0 | 3.9.0.final.0 |
+-------------+---------------+---------------+---------------+
Notes: 
-  msgpack: Variation is ok, as long as everything is above 0.6
[2021-12-13, 16:14:59 UTC] {logging_mixin.py:109} INFO - Dashboard: http://3.208.73.91:8787
[2021-12-13, 16:15:01 UTC] {credentials.py:1224} INFO - Found credentials in shared credentials file: ~/.aws/credentials
[2021-12-13, 16:15:34 UTC] {taskinstance.py:1703} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/opt/conda/envs/coiled/lib/python3.9/site-packages/s3fs/core.py", line 246, in _call_s3
  File "/opt/conda/envs/coiled/lib/python3.9/site-packages/aiobotocore/client.py", line 155, in _make_api_call
botocore.exceptions.ClientError: An error occurred (AccessDenied) when calling the PutObject operation: Access Denied

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/rpelgrim/mambaforge/envs/airflow-new/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1332, in _run_raw_task
    self._execute_task_with_callbacks(context)
  File "/Users/rpelgrim/mambaforge/envs/airflow-new/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1458, in _execute_task_with_callbacks
    result = self._execute_task(context, self.task)
  File "/Users/rpelgrim/mambaforge/envs/airflow-new/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1514, in _execute_task
    result = execute_callable(context=context)
  File "/Users/rpelgrim/mambaforge/envs/airflow-new/lib/python3.9/site-packages/airflow/decorators/base.py", line 134, in execute
    return_value = super().execute(context)
  File "/Users/rpelgrim/mambaforge/envs/airflow-new/lib/python3.9/site-packages/airflow/operators/python.py", line 151, in execute
    return_value = self.execute_callable()
  File "/Users/rpelgrim/mambaforge/envs/airflow-new/lib/python3.9/site-packages/airflow/operators/python.py", line 162, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/Users/rpelgrim/airflow/dags/airflow-coiled-json-parquet.py", line 94, in transform_github_data
    df.to_parquet(
  File "/Users/rpelgrim/mambaforge/envs/airflow-new/lib/python3.9/site-packages/dask/dataframe/core.py", line 4607, in to_parquet
    return to_parquet(self, path, *args, **kwargs)
  File "/Users/rpelgrim/mambaforge/envs/airflow-new/lib/python3.9/site-packages/dask/dataframe/io/parquet/core.py", line 775, in to_parquet
    return compute_as_if_collection(
  File "/Users/rpelgrim/mambaforge/envs/airflow-new/lib/python3.9/site-packages/dask/base.py", line 315, in compute_as_if_collection
    return schedule(dsk2, keys, **kwargs)
  File "/Users/rpelgrim/mambaforge/envs/airflow-new/lib/python3.9/site-packages/distributed/client.py", line 2725, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
  File "/Users/rpelgrim/mambaforge/envs/airflow-new/lib/python3.9/site-packages/distributed/client.py", line 1980, in gather
    return self.sync(
  File "/Users/rpelgrim/mambaforge/envs/airflow-new/lib/python3.9/site-packages/distributed/client.py", line 868, in sync
    return sync(
  File "/Users/rpelgrim/mambaforge/envs/airflow-new/lib/python3.9/site-packages/distributed/utils.py", line 332, in sync
    raise exc.with_traceback(tb)
  File "/Users/rpelgrim/mambaforge/envs/airflow-new/lib/python3.9/site-packages/distributed/utils.py", line 315, in f
    result[0] = yield future
  File "/Users/rpelgrim/mambaforge/envs/airflow-new/lib/python3.9/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/Users/rpelgrim/mambaforge/envs/airflow-new/lib/python3.9/site-packages/distributed/client.py", line 1845, in _gather
    raise exception.with_traceback(traceback)
  File "/opt/conda/envs/coiled/lib/python3.9/site-packages/dask/dataframe/io/parquet/fastparquet.py", line 1176, in write_partition
  File "/opt/conda/envs/coiled/lib/python3.9/site-packages/fastparquet/writer.py", line 699, in make_part_file
  File "/opt/conda/envs/coiled/lib/python3.9/site-packages/fsspec/spec.py", line 1602, in __exit__
  File "/opt/conda/envs/coiled/lib/python3.9/site-packages/fsspec/spec.py", line 1569, in close
  File "/opt/conda/envs/coiled/lib/python3.9/site-packages/fsspec/spec.py", line 1440, in flush
  File "/opt/conda/envs/coiled/lib/python3.9/site-packages/s3fs/core.py", line 1977, in _upload_chunk
  File "/opt/conda/envs/coiled/lib/python3.9/site-packages/s3fs/core.py", line 1992, in commit
  File "/opt/conda/envs/coiled/lib/python3.9/site-packages/s3fs/core.py", line 1844, in _call_s3
  File "/opt/conda/envs/coiled/lib/python3.9/site-packages/fsspec/asyn.py", line 88, in wrapper
  File "/opt/conda/envs/coiled/lib/python3.9/site-packages/fsspec/asyn.py", line 69, in sync
  File "/opt/conda/envs/coiled/lib/python3.9/site-packages/fsspec/asyn.py", line 25, in _runner
  File "/opt/conda/envs/coiled/lib/python3.9/site-packages/s3fs/core.py", line 265, in _call_s3
PermissionError: Access Denied
matthiasdv commented 2 years ago

Also note that a python call to the boto3 library (sts get-caller-identity) seemed to return the right set of credentials when running inside the Airflow runtime. But the Coiled client still failed.

shughes-uk commented 1 year ago

The way we pass aws creds has been worked on a bit and i'm hopeful this is fixed.