dask / dask-cloudprovider

Cloud provider cluster managers for Dask. Supports AWS, Google Cloud Azure and more...
https://cloudprovider.dask.org
BSD 3-Clause "New" or "Revised" License
134 stars 110 forks source link

Getting TypeError for vanilla ECSCluster/FargateCluster example #122

Open deeplook opened 4 years ago

deeplook commented 4 years ago

I'm trying to create an AWS cluster for using Dask temporarily. In former times there was some "dask-ec2", but that is no more, so I thought dask-cloudprovider is the new way to go. I've created fresh AWS access key ID and secret and ran the vanilla exmple on https://cloudprovider.dask.org/en/latest/.

What happened:

[…]
/srv/conda/envs/notebook/lib/python3.7/site-packages/aiobotocore/credentials.py in load_credentials(self)
    785         for provider in self.providers:
    786             logger.debug("Looking for credentials via: %s", provider.METHOD)
--> 787             creds = await provider.load()
    788             if creds is not None:
    789                 return creds

TypeError: object NoneType can't be used in 'await' expression

What I expected to happen:

I don't expect this to work, but I'm getting the error I don't know how to deal with. So I guess I had expected some other error. ;)

Minimal Complete Verifiable Example:

from dask_cloudprovider import ECSCluster, FargateCluster

cluster = FargateCluster(  # or ECSCluster
    aws_access_key_id="******",
    aws_secret_access_key="******",
    region_name="******"
)

Environment:

Linux, Python 3.7.8, plus (pip-installed):


$ pip list | egrep awscli\|boto\|dask
aiobotocore                          1.0.7
awscli                               1.18.120
boto3                                1.14.43
botocore                             1.15.32
dask                                 2.23.0
dask-cloudprovider                   0.4.0```
jacobtomlinson commented 4 years ago

Thanks for raising this. It looks like the aws_access_key_id and aws_secret_access_key kwargs are not being passed through correctly. Would you mind sharing the full traceback to help me hunt down where this is happening?

Also in the mean time you may want to configure your aws credentials with aws configure on the command line instead of passing them directly.

deeplook commented 4 years ago

See the traceback below. Ironically, I get the same after running "aws configure" and removing the credentials parameters.

[...]
/srv/conda/envs/notebook/lib/python3.7/site-packages/dask_cloudprovider/providers/aws/ecs.py in __init__(self, **kwargs)
   1209 
   1210     def __init__(self, **kwargs):
-> 1211         super().__init__(fargate_scheduler=True, fargate_workers=True, **kwargs)
   1212 
   1213 

/srv/conda/envs/notebook/lib/python3.7/site-packages/dask_cloudprovider/providers/aws/ecs.py in __init__(self, fargate_scheduler, fargate_workers, image, scheduler_cpu, scheduler_mem, scheduler_timeout, scheduler_extra_args, worker_cpu, worker_mem, worker_gpu, worker_extra_args, n_workers, cluster_arn, cluster_name_template, execution_role_arn, task_role_arn, task_role_policies, cloudwatch_logs_group, cloudwatch_logs_stream_prefix, cloudwatch_logs_default_retention, vpc, subnets, security_groups, environment, tags, find_address_timeout, skip_cleanup, aws_access_key_id, aws_secret_access_key, region_name, platform_version, fargate_use_private_ip, mount_points, volumes, mount_volumes_on_scheduler, **kwargs)
    661         self._lock = asyncio.Lock()
    662         self.session = aiobotocore.get_session()
--> 663         super().__init__(**kwargs)
    664 
    665     def _client(self, name: str):

/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/deploy/spec.py in __init__(self, workers, scheduler, worker, asynchronous, loop, security, silence_logs, name)
    274         if not self.asynchronous:
    275             self._loop_runner.start()
--> 276             self.sync(self._start)
    277             self.sync(self._correct_state)
    278 

/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/deploy/cluster.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    169             return future
    170         else:
--> 171             return sync(self.loop, func, *args, **kwargs)
    172 
    173     async def _get_logs(self, scheduler=True, workers=True):

/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    337     if error[0]:
    338         typ, exc, tb = error[0]
--> 339         raise exc.with_traceback(tb)
    340     else:
    341         return result[0]

/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/utils.py in f()
    321             if callback_timeout is not None:
    322                 future = asyncio.wait_for(future, callback_timeout)
--> 323             result[0] = yield future
    324         except Exception as exc:
    325             error[0] = sys.exc_info()

/srv/conda/envs/notebook/lib/python3.7/site-packages/tornado/gen.py in run(self)
    733 
    734                     try:
--> 735                         value = future.result()
    736                     except Exception:
    737                         exc_info = sys.exc_info()

/srv/conda/envs/notebook/lib/python3.7/site-packages/dask_cloudprovider/providers/aws/ecs.py in _start(self)
    685             self._skip_cleanup = self.config.get("skip_cleanup")
    686         if not self._skip_cleanup:
--> 687             await _cleanup_stale_resources()
    688 
    689         if self._fargate_scheduler is None:

/srv/conda/envs/notebook/lib/python3.7/site-packages/dask_cloudprovider/providers/aws/ecs.py in _cleanup_stale_resources()
   1228     # Clean up clusters (clusters with no running tasks)
   1229     session = aiobotocore.get_session()
-> 1230     async with session.create_client("ecs") as ecs:
   1231         active_clusters = []
   1232         clusters_to_delete = []

/srv/conda/envs/notebook/lib/python3.7/site-packages/aiobotocore/session.py in __aenter__(self)
     18 
     19     async def __aenter__(self) -> AioBaseClient:
---> 20         self._client = await self._coro
     21         return await self._client.__aenter__()
     22 

/srv/conda/envs/notebook/lib/python3.7/site-packages/aiobotocore/session.py in _create_client(self, service_name, region_name, api_version, use_ssl, verify, endpoint_url, aws_access_key_id, aws_secret_access_key, aws_session_token, config)
     94                                                  aws_secret_access_key))
     95         else:
---> 96             credentials = await self.get_credentials()
     97         endpoint_resolver = self._get_internal_component('endpoint_resolver')
     98         exceptions_factory = self._get_internal_component('exceptions_factory')

/srv/conda/envs/notebook/lib/python3.7/site-packages/aiobotocore/session.py in get_credentials(self)
    119         if self._credentials is None:
    120             self._credentials = await (self._components.get_component(
--> 121                 'credential_provider').load_credentials())
    122         return self._credentials
    123 

/srv/conda/envs/notebook/lib/python3.7/site-packages/aiobotocore/credentials.py in load_credentials(self)
    785         for provider in self.providers:
    786             logger.debug("Looking for credentials via: %s", provider.METHOD)
--> 787             creds = await provider.load()
    788             if creds is not None:
    789                 return creds

TypeError: object NoneType can't be used in 'await' expression
jacobtomlinson commented 4 years ago

Ah this appears to be happening on cleanup. Having credentials not pass to the cleanup is a known issue.

I'm surprised it does not work after running aws configure though as it should always be able to read credentials from config files.

deeplook commented 4 years ago

I suggest reproducing it quickly from any binder environment. ;)

jacobtomlinson commented 4 years ago

I am unable to reproduce in Binder.

Steps:

from dask_cloudprovider import FargateCluster
cluster = FargateCluster()
deeplook commented 4 years ago

Hmm. I'm getting timeouts now, Much better. ;) How did you install aswcli and which version? I've used pip install and it gave me 1.18.120. I'm reading about a version 2, but don't find it via pip. – Ok, will retry with v. 2...

deeplook commented 4 years ago

But I can see a cluster in the AWS console, I just don't have a handle on it from its creation.

jacobtomlinson commented 4 years ago

I installed using v2 with the instructions that I linked.

If you're getting timeouts then Fargate may be taking too long to start your tasks. You can extend the timeout with the find_address_timeout kwarg.

The stale resources in AWS will timeout themselves so you will stop paying for them after 5 minutes of inactivity and they should be cleaned up automatically next time you use dask-cloudprovider.

deeplook commented 4 years ago

Cool. I've already lost some money on dask-ec2 before. ;) Looking at https://formulae.brew.sh/formula/awscli#default I see that awscli 2 requires Python 3.8. Is that so? You haven't mentioned it above.

jacobtomlinson commented 4 years ago

I've already lost some money on dask-ec2 before.

Yeah this is a hard balance to strike and we've been discussing it recently. I appreciate this is a bit of a tangent but I'd be interested in your view on what the Dask maintainer's responsibility is here.

We create tools to make spinning up cloud resources easy. The drawback of this is that this can cost folks money, and if the tool breaks the user has to quickly understand what was created and how to make it stop. Fargate pricing means that if all tasks stop the cost stops, so we can add timeouts and hopefully folks wont run up a bill.

The downside is if you're working on a train and lose your connection the resources may get prematurely cleared up. The workaround for this is making timeouts configurable.

However other cloud services (like EC2) are less easy to time out. So some other cluster managers we create will attempt to close stuff out, but if your session dies then it is your responsibility to clear our resources.

I wonder how we could best communicate this. Do we raise warnings when resources are created? Do we document and leave folks to figure it out? I'd love to hear your thoughts.

I see that awscli 2 requires Python 3.8

Honestly I have no idea. I started binder, followed the install docs and used it. I didn't check my Python version.

deeplook commented 4 years ago

We create tools to make spinning up cloud resources easy. The drawback of this is that this can cost folks money [...]

That's surely an important topic and I might have a thought or two, but I also feel this should be under a different headline. Happy to go there if it exists already...

Honestly I have no idea. I started binder, followed the install docs and used it. I didn't check my Python version.

I guess most binder repos are still on Python 3.7 and so is (the awesome) Dask tutorial repo. AWSCLI2 (installed the Amazon way described above) gives no complaints. It will be interesting to see a pip/conda-based installation once available.

In any case I'm always getting timeouts. Even when passing a threshold of 10 minutes like in cluster = FargateCluster(find_address_timeout=600) I get a timeout much earlier with a message indicating that the passed value is not respected:

OSError: Timed out trying to connect to 'tcp://3.236.190.132:8786' after 10 s:
Timed out trying to connect to 'tcp://3.236.190.132:8786' after 10 s: connect() didn't finish in time

Up to there, I can see a cluster being created (set to active), with one scheduler task (set to running) but no worker tasks or ECS instances... Passing the cluster ARN (taken from the AWS console) to ECSCluster makes no difference, BTW.