dask / dask-cloudprovider

Cloud provider cluster managers for Dask. Supports AWS, Google Cloud Azure and more...
https://cloudprovider.dask.org
BSD 3-Clause "New" or "Revised" License
135 stars 110 forks source link

Dask config size limitation in EC2Cluster #249

Open jacobtomlinson opened 3 years ago

jacobtomlinson commented 3 years ago

It seems there is a 16kb limit on the amount of user_data that can be passed to an EC2 instance when starting up.

We serialize the local Dask config and pass it to the scheduler and workers via the user_data.

https://github.com/dask/dask-cloudprovider/blob/da454827b88c2f0f0d06af07e02d5d1580a4c366/dask_cloudprovider/generic/vmcluster.py#L33-L35

Depending on what config the user has locally this can tip us over the limit and result in the AWS API rejecting the instance creation call.

botocore.exceptions.ClientError: An error occurred (InvalidParameterValue) when calling the RunInstances operation: User data is limited to 16384 bytes
douglasdavis commented 3 years ago

I've run into this, after digging around a bit with @martindurant we've noticed that dask.config.config is modified (and grows quite a bit with only default settings) upon importing dask_cloudprovider or dask_cloudprovider.{aws,gcp}. (reproducer below).

Would it be reasonable to drop the keys in the config dict that have defaults? (especially things that are in the config only to spin up cloud resources, since the cloud instances don't need to know how to start things on the cloud?)

In [1]: import dask.config

In [2]: dask.config.config
Out[2]: 
{'temporary-directory': None,
 'dataframe': {'shuffle-compression': None},
 'array': {'svg': {'size': 120}, 'slicing': {'split-large-chunks': None}},
 'optimization': {'fuse': {'active': True,
   'ave-width': 1,
   'max-width': None,
   'max-height': inf,
   'max-depth-new-edges': None,
   'subgraphs': None,
   'rename-keys': True}}}

In [3]: import dask_cloudprovider.aws

In [4]: dask.config.config
Out[4]: 
{'temporary-directory': None,
 'dataframe': {'shuffle-compression': None},
 'array': {'svg': {'size': 120}, 'slicing': {'split-large-chunks': None}},
 'optimization': {'fuse': {'active': True,
   'ave-width': 1,
   'max-width': None,
   'max-height': inf,
   'max-depth-new-edges': None,
   'subgraphs': None,
   'rename-keys': True}},
 'cloudprovider': {'ecs': {'fargate_scheduler': False,
   'fargate_workers': False,
   'scheduler_cpu': 1024,
   'scheduler_mem': 4096,
   'worker_cpu': 4096,
   'worker_mem': 16384,
   'worker_gpu': 0,
   'n_workers': 0,
   'scheduler_timeout': '5 minutes',
   'image': 'daskdev/dask:latest',
   'gpu_image': 'rapidsai/rapidsai:latest',
   'cluster_name_template': 'dask-{uuid}',
   'cluster_arn': '',
   'execution_role_arn': '',
   'task_role_arn': '',
   'task_role_policies': [],
   'cloudwatch_logs_group': '',
   'cloudwatch_logs_stream_prefix': '{cluster_name}',
   'cloudwatch_logs_default_retention': 30,
   'vpc': 'default',
   'subnets': [],
   'security_groups': [],
   'tags': {},
   'environment': {},
   'find_address_timeout': 60,
   'skip_cleanup': False},
  'ec2': {'region': None,
   'availability_zone': None,
   'bootstrap': True,
   'auto_shutdown': True,
   'instance_type': 't2.micro',
   'docker_image': 'daskdev/dask:latest',
   'filesystem_size': 40,
   'key_name': None,
   'iam_instance_profile': {}},
  'azure': {'location': None,
   'resource_group': None,
   'azurevm': {'vnet': None,
    'security_group': None,
    'public_ingress': True,
    'vm_size': 'Standard_DS1_v2',
    'disk_size': 50,
    'scheduler_vm_size': None,
    'docker_image': 'daskdev/dask:latest',
    'vm_image': {'publisher': 'Canonical',
     'offer': 'UbuntuServer',
     'sku': '18.04-LTS',
     'version': 'latest'},
    'bootstrap': True,
    'auto_shutdown': True}},
  'digitalocean': {'token': None,
   'region': 'nyc3',
   'size': 's-1vcpu-1gb',
   'image': 'ubuntu-20-04-x64'},
  'gcp': {'source_image': 'projects/ubuntu-os-cloud/global/images/ubuntu-minimal-1804-bionic-v20201014',
   'zone': 'us-east1-c',
   'network': 'default',
   'projectid': '',
   'on_host_maintenance': 'TERMINATE',
   'machine_type': 'n1-standard-1',
   'filesystem_size': 50,
   'ngpus': '',
   'gpu_type': '',
   'disk_type': 'pd-standard',
   'docker_image': 'daskdev/dask:latest',
   'auto_shutdown': True,
   'public_ingress': True}},
 'distributed': {'version': 2,
  'scheduler': {'allowed-failures': 3,
   'bandwidth': 100000000,
   'blocked-handlers': [],
   'default-data-size': '1kiB',
   'events-cleanup-delay': '1h',
   'idle-timeout': None,
   'transition-log-length': 100000,
   'events-log-length': 100000,
   'work-stealing': True,
   'work-stealing-interval': '100ms',
   'worker-ttl': None,
   'pickle': True,
   'preload': [],
   'preload-argv': [],
   'unknown-task-duration': '500ms',
   'default-task-durations': {'rechunk-split': '1us', 'shuffle-split': '1us'},
   'validate': False,
   'dashboard': {'status': {'task-stream-length': 1000},
    'tasks': {'task-stream-length': 100000},
    'tls': {'ca-file': None, 'key': None, 'cert': None},
    'bokeh-application': {'allow_websocket_origin': ['*'],
     'keep_alive_milliseconds': 500,
     'check_unused_sessions_milliseconds': 500}},
   'locks': {'lease-validation-interval': '10s', 'lease-timeout': '30s'},
   'http': {'routes': ['distributed.http.scheduler.prometheus',
     'distributed.http.scheduler.info',
     'distributed.http.scheduler.json',
     'distributed.http.health',
     'distributed.http.proxy',
     'distributed.http.statics']},
   'allowed-imports': ['dask', 'distributed']},
  'worker': {'blocked-handlers': [],
   'multiprocessing-method': 'spawn',
   'use-file-locking': True,
   'connections': {'outgoing': 50, 'incoming': 10},
   'preload': [],
   'preload-argv': [],
   'daemon': True,
   'validate': False,
   'resources': {},
   'lifetime': {'duration': None, 'stagger': '0 seconds', 'restart': False},
   'profile': {'interval': '10ms', 'cycle': '1000ms', 'low-level': False},
   'memory': {'recent_to_old_time': '30s',
    'target': 0.6,
    'spill': 0.7,
    'pause': 0.8,
    'terminate': 0.95},
   'http': {'routes': ['distributed.http.worker.prometheus',
     'distributed.http.health',
     'distributed.http.statics']}},
  'nanny': {'preload': [], 'preload-argv': []},
  'client': {'heartbeat': '5s', 'scheduler-info-interval': '2s'},
  'deploy': {'lost-worker-timeout': '15s', 'cluster-repr-interval': '500ms'},
  'adaptive': {'interval': '1s',
   'target-duration': '5s',
   'minimum': 0,
   'maximum': inf,
   'wait-count': 3},
  'comm': {'retry': {'count': 0, 'delay': {'min': '1s', 'max': '20s'}},
   'compression': 'auto',
   'offload': '10MiB',
   'default-scheme': 'tcp',
   'socket-backlog': 2048,
   'recent-messages-log-length': 0,
   'zstd': {'level': 3, 'threads': 0},
   'timeouts': {'connect': '10s', 'tcp': '30s'},
   'require-encryption': None,
   'tls': {'ciphers': None,
    'ca-file': None,
    'scheduler': {'cert': None, 'key': None},
    'worker': {'key': None, 'cert': None},
    'client': {'key': None, 'cert': None}}},
  'dashboard': {'link': '{scheme}://{host}:{port}/status',
   'export-tool': False,
   'graph-max-items': 5000,
   'prometheus': {'namespace': 'dask'}},
  'admin': {'tick': {'interval': '20ms', 'limit': '3s'},
   'max-error-length': 10000,
   'log-length': 10000,
   'log-format': '%(name)s - %(levelname)s - %(message)s',
   'pdb-on-err': False,
   'system-monitor': {'interval': '500ms'},
   'event-loop': 'tornado'}},
 'rmm': {'pool-size': None},
 'ucx': {'cuda_copy': False,
  'tcp': False,
  'nvlink': False,
  'infiniband': False,
  'rdmacm': False,
  'net-devices': None,
  'reuse-endpoints': True}}
eric-valente commented 3 years ago

The only way I got things to work currently was to add security=False to the EC2Cluster instantiation, this removes the massive TLS strings for security and brings the size to under the EC2 limit. The Zlib piece on @jacobtomlinson fixed it when I tested but its not in a release yet.

martindurant commented 3 years ago

I would expect TLS certs to compress very poorly

jacobtomlinson commented 3 years ago

Yeah it's the TLS certs that really cause the size inflation. The zlib PR works but is not really a long term solution, it just kicks the problem down the road.

tiagoantao commented 3 years ago

Just a note for someone trying to go around this. After you try to create a cluster with security=True the certificates will be appended to the config, so if you try to create a new cluster afterwards with security=False the config will be too big anyways. You need to cleanup the certs from config after the execution attempt with security=True.

jacobtomlinson commented 3 years ago

To add to what @tiagoantao said only the in-memory config is updated. Restarting your Python process or notebook kernel will clear out any cached certs.

shireenrao commented 3 years ago

Hello - I ran into this issue trying to setup a EC2Cluster

>>> from dask_cloudprovider.aws import EC2Cluster
>>> cluster = EC2Cluster(
       ami="ami-some-ami-built-with-packer", 
       bootstrap=False,
       vpc="vpc-some-vpc", 
       security_groups=["sg-some-security-grp"],
       security=False
    )
Creating scheduler instance
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/srao/.pyenv/versions/cloudprovider/lib/python3.7/site-packages/dask_cloudprovider/aws/ec2.py", line 478, in __init__
    super().__init__(debug=debug, **kwargs)
  File "/home/srao/.pyenv/versions/cloudprovider/lib/python3.7/site-packages/dask_cloudprovider/generic/vmcluster.py", line 289, in __init__
    super().__init__(**kwargs, security=self.security)
  File "/home/srao/.pyenv/versions/cloudprovider/lib/python3.7/site-packages/distributed/deploy/spec.py", line 283, in __init__
    self.sync(self._start)
  File "/home/srao/.pyenv/versions/cloudprovider/lib/python3.7/site-packages/distributed/deploy/cluster.py", line 214, in sync
    return sync(self.loop, func, *args, **kwargs)
  File "/home/srao/.pyenv/versions/cloudprovider/lib/python3.7/site-packages/distributed/utils.py", line 326, in sync
    raise exc.with_traceback(tb)
  File "/home/srao/.pyenv/versions/cloudprovider/lib/python3.7/site-packages/distributed/utils.py", line 309, in f
    result[0] = yield future
  File "/home/srao/.pyenv/versions/cloudprovider/lib/python3.7/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/home/srao/.pyenv/versions/cloudprovider/lib/python3.7/site-packages/dask_cloudprovider/generic/vmcluster.py", line 329, in _start
    await super()._start()
  File "/home/srao/.pyenv/versions/cloudprovider/lib/python3.7/site-packages/distributed/deploy/spec.py", line 312, in _start
    self.scheduler = await self.scheduler
  File "/home/srao/.pyenv/versions/cloudprovider/lib/python3.7/site-packages/distributed/deploy/spec.py", line 67, in _
    await self.start()
  File "/home/srao/.pyenv/versions/cloudprovider/lib/python3.7/site-packages/dask_cloudprovider/generic/vmcluster.py", line 86, in start
    ip = await self.create_vm()
  File "/home/srao/.pyenv/versions/cloudprovider/lib/python3.7/site-packages/dask_cloudprovider/aws/ec2.py", line 139, in create_vm
    response = await client.run_instances(**vm_kwargs)
  File "/home/srao/.pyenv/versions/cloudprovider/lib/python3.7/site-packages/aiobotocore/client.py", line 155, in _make_api_call
    raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (InvalidParameterValue) when calling the RunInstances operation: User data is limited to 16384 bytes

I tried passing in security=False to EC2Cluster and I am still seeing the error. Is there any work around for this to get working?

I am using the following versions: dask 2021.9.1 dask-cloudprovider 2021.9.0

Thank you

jacobtomlinson commented 3 years ago

I'm surprised that security=False doesn't resolve this for you, that is the current workaround for this problem.

Could you check your Dask config to see if you can identify why it is so large?

import dask.config

print(dask.config.config)
shireenrao commented 3 years ago

@jacobtomlinson - I see what mistake I made. You pointed me in the right direction. When I first saw this error, I found this ticket. In the same repl session, I tried passing in the security=False and saw this error. In a fresh session, if you start with security=False, this works! Your comment above about Restarting your Python process or notebook kernel will clear out any cached certs makes sense now.

Thank you!

filpano commented 2 years ago

Is there a longer-term solution that does not include setting security=False? IMO, that's not a production-grade configuration since it leaves connections to the scheduler and workers susceptible to MITM attacks.

It's been quite a while since there has been any discussion in this issue and https://github.com/dask/distributed/pull/4465 does not seem like it's getting any traction for an issue that seems to me to be quite important. Unless I'm overlooking something and the recommended setup (w.r.t. security best practices) looks different...

jacobtomlinson commented 2 years ago

We enabled security=True by default because other default behaviour can cause your cluster to be exposed to the internet. However, Dask is typically deployed with security=False and folks use network-level security to secure their clusters, so I'd push back against this not being a production-grade workaround. For example, on Kubernetes you would use a service like Istio to handle this at the network layer.

I totally agree though this it's an unpleasant workaround and if there is a strong desire by the community to resolve this then I'm all for it. Do you have thoughts on a long term solution?