dask / dask-cloudprovider

Cloud provider cluster managers for Dask. Supports AWS, Google Cloud Azure and more...
https://cloudprovider.dask.org
BSD 3-Clause "New" or "Revised" License
135 stars 110 forks source link

Better logging/debug/ssh feature for AWS EC2Cluster #199

Closed roarjn closed 3 years ago

roarjn commented 4 years ago

Currently it seems not possible to be able to ssh into worker nodes or scheduler node to look into logs, error trace or CPU/GPU utilization. Without these it's really hard to debug what could be causing an issue with the EC2Cluster creation command. For ssh it appears we don't have a key pair. AWS not allowing ssh without an associated key pair.

jacobtomlinson commented 4 years ago

Thanks for raising this @roarjn.

Typically we access logs via the cluster.get_logs() method.

CPU/GPU utilization is also available via the Dask dashboard.

I agree it would be good to optionally allow configuration of an SSH key pair though.

roarjn commented 3 years ago

It has been challenging to debug this NoCredentialsError: Unable to locate credentials error while reading data from S3. Below are the steps I carry out to read a csv file from S3. I have the aws configure step done with access key, secret access key, region added to the node from where I launch this job. Appreciate any suggestions.

# Create dask-cloudprovider EC2Cluster
from dask_cloudprovider.aws import EC2Cluster
from dask.distributed import wait

cluster = EC2Cluster(ami="ami-06d62f645899df7de",  
                             docker_image="rapidsai/rapidsai:cuda11.0-runtime-ubuntu18.04",
                             instance_type="g4dn.xlarge",
                             worker_class="dask_cuda.CUDAWorker", 
                             n_workers=1,
                             bootstrap=False,
                             filesystem_size=120)
Creating scheduler instance
Created instance i-03f977a565ebbab83 as dask-a667aaf5-scheduler
Waiting for scheduler to run
Scheduler is running
/home/dgxuser/anaconda3/envs/daskcp/lib/python3.7/contextlib.py:119: UserWarning: Creating your cluster is taking a surprisingly long time. This is likely due to pending resources. Hang tight! 
  next(self.gen)
Creating worker instance
Created instance i-0510e98641f334675 as dask-a667aaf5-worker-2932460c
from dask.distributed import Client, wait
client = Client(cluster)
/home/dgxuser/anaconda3/envs/daskcp/lib/python3.7/site-packages/distributed/client.py:1129: VersionMismatchWarning: Mismatched versions found

+-------------+--------+-----------+---------+
| Package     | client | scheduler | workers |
+-------------+--------+-----------+---------+
| distributed | 2.30.1 | 2.30.0    | None    |
| toolz       | 0.10.0 | 0.11.1    | None    |
| tornado     | 6.1    | 6.0.4     | None    |
+-------------+--------+-----------+---------+
  warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"]))
# Read a CSV file from S3
import dask.dataframe as dd
ddf = dd.read_csv("s3://airlines-2019/airlines-2019.csv")

ddf.head()

---------------------------------------------------------------------------
NoCredentialsError                        Traceback (most recent call last)
<ipython-input-6-4e0da72f8a21> in <module>
----> 1 ddf.head()

~/anaconda3/envs/daskcp/lib/python3.7/site-packages/dask/dataframe/core.py in head(self, n, npartitions, compute)
   1004             Whether to compute the result, default is True.
   1005         """
-> 1006         return self._head(n=n, npartitions=npartitions, compute=compute, safe=True)
   1007 
   1008     def _head(self, n, npartitions, compute, safe):

~/anaconda3/envs/daskcp/lib/python3.7/site-packages/dask/dataframe/core.py in _head(self, n, npartitions, compute, safe)
   1037 
   1038         if compute:
-> 1039             result = result.compute()
   1040         return result
   1041 

~/anaconda3/envs/daskcp/lib/python3.7/site-packages/dask/base.py in compute(self, **kwargs)
    165         dask.base.compute
    166         """
--> 167         (result,) = compute(self, traverse=False, **kwargs)
    168         return result
    169 

~/anaconda3/envs/daskcp/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
    450         postcomputes.append(x.__dask_postcompute__())
    451 
--> 452     results = schedule(dsk, keys, **kwargs)
    453     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    454 

~/anaconda3/envs/daskcp/lib/python3.7/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2723                     should_rejoin = False
   2724             try:
-> 2725                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2726             finally:
   2727                 for f in futures.values():

~/anaconda3/envs/daskcp/lib/python3.7/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1990                 direct=direct,
   1991                 local_worker=local_worker,
-> 1992                 asynchronous=asynchronous,
   1993             )
   1994 

~/anaconda3/envs/daskcp/lib/python3.7/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    831         else:
    832             return sync(
--> 833                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    834             )
    835 

~/anaconda3/envs/daskcp/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    338     if error[0]:
    339         typ, exc, tb = error[0]
--> 340         raise exc.with_traceback(tb)
    341     else:
    342         return result[0]

~/anaconda3/envs/daskcp/lib/python3.7/site-packages/distributed/utils.py in f()
    322             if callback_timeout is not None:
    323                 future = asyncio.wait_for(future, callback_timeout)
--> 324             result[0] = yield future
    325         except Exception as exc:
    326             error[0] = sys.exc_info()

~/anaconda3/envs/daskcp/lib/python3.7/site-packages/tornado/gen.py in run(self)
    760 
    761                     try:
--> 762                         value = future.result()
    763                     except Exception:
    764                         exc_info = sys.exc_info()

~/anaconda3/envs/daskcp/lib/python3.7/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1849                             exc = CancelledError(key)
   1850                         else:
-> 1851                             raise exception.with_traceback(traceback)
   1852                         raise exc
   1853                     if errors == "skip":

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/bytes/core.py in read_block_from_file()

/opt/conda/envs/rapids/lib/python3.7/site-packages/fsspec/core.py in __enter__()

/opt/conda/envs/rapids/lib/python3.7/site-packages/fsspec/spec.py in open()

/opt/conda/envs/rapids/lib/python3.7/site-packages/s3fs/core.py in _open()

/opt/conda/envs/rapids/lib/python3.7/site-packages/s3fs/core.py in __init__()

/opt/conda/envs/rapids/lib/python3.7/site-packages/fsspec/spec.py in __init__()

/opt/conda/envs/rapids/lib/python3.7/site-packages/s3fs/core.py in info()

/opt/conda/envs/rapids/lib/python3.7/site-packages/s3fs/core.py in _call_s3()

/opt/conda/envs/rapids/lib/python3.7/site-packages/botocore/client.py in _api_call()

/opt/conda/envs/rapids/lib/python3.7/site-packages/botocore/client.py in _make_api_call()

/opt/conda/envs/rapids/lib/python3.7/site-packages/botocore/client.py in _make_request()

/opt/conda/envs/rapids/lib/python3.7/site-packages/botocore/endpoint.py in make_request()

/opt/conda/envs/rapids/lib/python3.7/site-packages/botocore/endpoint.py in _send_request()

/opt/conda/envs/rapids/lib/python3.7/site-packages/botocore/endpoint.py in create_request()

/opt/conda/envs/rapids/lib/python3.7/site-packages/botocore/hooks.py in emit()

/opt/conda/envs/rapids/lib/python3.7/site-packages/botocore/hooks.py in emit()

/opt/conda/envs/rapids/lib/python3.7/site-packages/botocore/hooks.py in _emit()

/opt/conda/envs/rapids/lib/python3.7/site-packages/botocore/signers.py in handler()

/opt/conda/envs/rapids/lib/python3.7/site-packages/botocore/signers.py in sign()

/opt/conda/envs/rapids/lib/python3.7/site-packages/botocore/auth.py in add_auth()

NoCredentialsError: Unable to locate credentials
jacobtomlinson commented 3 years ago

Thanks for the example @roarjn it's very helpful.

In this case you are setting your credentials on the node where you launch the job, but not on the workers. And the workers are the nodes making the connection to S3.

Here's a code snippet to work around this:

cluster = EC2Cluster(ami="ami-06d62f645899df7de",  
                     docker_image="rapidsai/rapidsai:cuda11.0-runtime-ubuntu18.04",
                     instance_type="g4dn.xlarge",
                     worker_class="dask_cuda.CUDAWorker", 
                     n_workers=1,
                     bootstrap=False,
                     filesystem_size=120,
                     env_vars={
                         "AWS_ACCESS_KEY_ID": "<access key>",
                         "AWS_SECRET_ACCESS_KEY": "<secret key>",
                         "AWS_REGION": "<region>",
                     })

In the long term I can think of a few options that would make this easier/more pleasant and not require you to put credentials in your code. I would appreciate your feedback if you have any preference.

roarjn commented 3 years ago

Hi @jacobtomlinson, Thanks again for the help. In my current use case either option 2 or 3 would be more useful. Although for production use cases option 1 may be more relevant.

I tried the above suggestion and running into this error TemplateSyntaxError: tag name expected. Tried searching and debugging but no clear leads on what could be causing this. Any suggestions?

cluster = EC2Cluster(ami="ami-06d62f645899df7de",  # Example Deep Learning AMI (Ubuntu 18.04) # AWS AMI id can be region specific
                             docker_image="rapidsai/rapidsai:cuda11.0-runtime-ubuntu18.04",
                             instance_type="g4dn.xlarge",
                             worker_class="dask_cuda.CUDAWorker", 
                             n_workers=1,
                             bootstrap=False,
                             filesystem_size=120,
                             env_vars={
                                 "AWS_ACCESS_KEY_ID": "my access key....",
                                 "AWS_SECRET_ACCESS_KEY": "my secret access key ...",
                                 "AWS_REGION": "us-east-2",})
Creating scheduler instance
---------------------------------------------------------------------------
TemplateSyntaxError                       Traceback (most recent call last)
<ipython-input-5-45b28d3da310> in <module>
     16                                  "AWS_ACCESS_KEY_ID": "my access key...",
     17                                  "AWS_SECRET_ACCESS_KEY": "my secret access key ..",
---> 18                                  "AWS_REGION": "us-east-2",})

~/anaconda3/lib/python3.7/site-packages/dask_cloudprovider/aws/ec2.py in __init__(self, region, bootstrap, auto_shutdown, ami, instance_type, vpc, subnet_id, security_groups, filesystem_size, docker_image, **kwargs)
    371         self.scheduler_options = {**self.options}
    372         self.worker_options = {**self.options}
--> 373         super().__init__(**kwargs)

~/anaconda3/lib/python3.7/site-packages/dask_cloudprovider/generic/vmcluster.py in __init__(self, n_workers, worker_class, worker_options, scheduler_options, docker_image, env_vars, **kwargs)
    233         self.uuid = str(uuid.uuid4())[:8]
    234 
--> 235         super().__init__(**kwargs)
    236 
    237     async def _start(

~/anaconda3/lib/python3.7/site-packages/distributed/deploy/spec.py in __init__(self, workers, scheduler, worker, asynchronous, loop, security, silence_logs, name)
    274         if not self.asynchronous:
    275             self._loop_runner.start()
--> 276             self.sync(self._start)
    277             self.sync(self._correct_state)
    278 

~/anaconda3/lib/python3.7/site-packages/distributed/deploy/cluster.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    181             return future
    182         else:
--> 183             return sync(self.loop, func, *args, **kwargs)
    184 
    185     def _log(self, log):

~/anaconda3/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    338     if error[0]:
    339         typ, exc, tb = error[0]
--> 340         raise exc.with_traceback(tb)
    341     else:
    342         return result[0]

~/anaconda3/lib/python3.7/site-packages/distributed/utils.py in f()
    322             if callback_timeout is not None:
    323                 future = asyncio.wait_for(future, callback_timeout)
--> 324             result[0] = yield future
    325         except Exception as exc:
    326             error[0] = sys.exc_info()

~/anaconda3/lib/python3.7/site-packages/tornado/gen.py in run(self)
    733 
    734                     try:
--> 735                         value = future.result()
    736                     except Exception:
    737                         exc_info = sys.exc_info()

~/anaconda3/lib/python3.7/site-packages/dask_cloudprovider/generic/vmcluster.py in _start(self)
    258             "Hang tight! ",
    259         ):
--> 260             await super()._start()
    261 
    262     def render_cloud_init(self, *args, **kwargs):

~/anaconda3/lib/python3.7/site-packages/distributed/deploy/spec.py in _start(self)
    302 
    303         self.status = Status.starting
--> 304         self.scheduler = await self.scheduler
    305         self.scheduler_comm = rpc(
    306             getattr(self.scheduler, "external_address", None) or self.scheduler.address,

~/anaconda3/lib/python3.7/site-packages/distributed/deploy/spec.py in _()
     69             async with self.lock:
     70                 if self.status == Status.created:
---> 71                     await self.start()
     72                     assert self.status == Status.running
     73             return self

~/anaconda3/lib/python3.7/site-packages/dask_cloudprovider/generic/vmcluster.py in start(self)
     81     async def start(self):
     82         self.cluster._log("Creating scheduler instance")
---> 83         ip = await self.create_vm()
     84         self.address = f"tcp://{ip}:8786"
     85         await self.wait_for_scheduler()

~/anaconda3/lib/python3.7/site-packages/dask_cloudprovider/aws/ec2.py in create_vm(self)
    108                     gpu_instance=self.gpu_instance,
    109                     bootstrap=self.bootstrap,
--> 110                     env_vars=self.env_vars,
    111                 ),
    112                 InstanceInitiatedShutdownBehavior="terminate",

~/anaconda3/lib/python3.7/site-packages/dask_cloudprovider/generic/vmcluster.py in render_cloud_init(self, *args, **kwargs)
    263         loader = FileSystemLoader([os.path.dirname(os.path.abspath(__file__))])
    264         environment = Environment(loader=loader)
--> 265         template = environment.get_template("cloud-init.yaml.j2")
    266         return template.render(**kwargs)
    267 

~/anaconda3/lib/python3.7/site-packages/jinja2/environment.py in get_template(self, name, parent, globals)
    828         if parent is not None:
    829             name = self.join_path(name, parent)
--> 830         return self._load_template(name, self.make_globals(globals))
    831 
    832     @internalcode

~/anaconda3/lib/python3.7/site-packages/jinja2/environment.py in _load_template(self, name, globals)
    802                                          template.is_up_to_date):
    803                 return template
--> 804         template = self.loader.load(self, name, globals)
    805         if self.cache is not None:
    806             self.cache[cache_key] = template

~/anaconda3/lib/python3.7/site-packages/jinja2/loaders.py in load(self, environment, name, globals)
    123         # date) etc. we compile the template
    124         if code is None:
--> 125             code = environment.compile(source, name, filename)
    126 
    127         # if the bytecode cache is available and the bucket doesn't

~/anaconda3/lib/python3.7/site-packages/jinja2/environment.py in compile(self, source, name, filename, raw, defer_init)
    589         except TemplateSyntaxError:
    590             exc_info = sys.exc_info()
--> 591         self.handle_exception(exc_info, source_hint=source_hint)
    592 
    593     def compile_expression(self, source, undefined_to_none=True):

~/anaconda3/lib/python3.7/site-packages/jinja2/environment.py in handle_exception(self, exc_info, rendered, source_hint)
    778             self.exception_handler(traceback)
    779         exc_type, exc_value, tb = traceback.standard_exc_info
--> 780         reraise(exc_type, exc_value, tb)
    781 
    782     def join_path(self, template, parent):

~/anaconda3/lib/python3.7/site-packages/jinja2/_compat.py in reraise(tp, value, tb)
     35     def reraise(tp, value, tb=None):
     36         if value.__traceback__ is not tb:
---> 37             raise value.with_traceback(tb)
     38         raise value
     39 

~/anaconda3/lib/python3.7/site-packages/dask_cloudprovider/generic/cloud-init.yaml.j2 in template()
     51 
     52   # Run container
---> 53   - 'docker run --net=host {%+ if gpu_instance %}--gpus=all{% endif %} {% for key in env_vars %} -e {{key}}={{env_vars[key]}} {% endfor %}{{image}} {{ command }}'
     54 
     55   {% if auto_shutdown %}

~/anaconda3/lib/python3.7/site-packages/jinja2/environment.py in _parse(self, source, name, filename)
    495     def _parse(self, source, name, filename):
    496         """Internal parsing function used by `parse` and `compile`."""
--> 497         return Parser(self, source, name, encode_filename(filename)).parse()
    498 
    499     def lex(self, source, name=None, filename=None):

~/anaconda3/lib/python3.7/site-packages/jinja2/parser.py in parse(self)
    899     def parse(self):
    900         """Parse the whole template into a `Template` node."""
--> 901         result = nodes.Template(self.subparse(), lineno=1)
    902         result.set_environment(self.environment)
    903         return result

~/anaconda3/lib/python3.7/site-packages/jinja2/parser.py in subparse(self, end_tokens)
    881                        self.stream.current.test_any(*end_tokens):
    882                         return body
--> 883                     rv = self.parse_statement()
    884                     if isinstance(rv, list):
    885                         body.extend(rv)

~/anaconda3/lib/python3.7/site-packages/jinja2/parser.py in parse_statement(self)
    123         token = self.stream.current
    124         if token.type != 'name':
--> 125             self.fail('tag name expected', token.lineno)
    126         self._tag_stack.append(token.value)
    127         pop_tag = True

~/anaconda3/lib/python3.7/site-packages/jinja2/parser.py in fail(self, msg, lineno, exc)
     57         if lineno is None:
     58             lineno = self.stream.current.lineno
---> 59         raise exc(msg, lineno, self.name, self.filename)
     60 
     61     def _fail_ut_eof(self, name, end_token_stack, lineno):

TemplateSyntaxError: tag name expected
quasiben commented 3 years ago

The error describe a problem with rendering the jinja template which is odd. I just tried this same cluster configuration and did not see this error. What version of dask-cloudprovider are you using ? Latest release is 0.5.1

Note: only recently was @jacobtomlinson able to push recent releases to conda-forge

jacobtomlinson commented 3 years ago

I tried reproducing this locally but was unable. Here's a notebook example of it working for me.

I've also raised #208 which adds a key_name option for specifying an SSH key and a iam_instance_profile option for specifying an IAM role to assign to instances.

roarjn commented 3 years ago

Thanks for the help. The example in your notebook works great for EC2Cluster.