dask / dask-cloudprovider

Cloud provider cluster managers for Dask. Supports AWS, Google Cloud Azure and more...
https://cloudprovider.dask.org
BSD 3-Clause "New" or "Revised" License
134 stars 110 forks source link

Spot instance support for EC2 #366

Open martinedgefocus opened 2 years ago

martinedgefocus commented 2 years ago

This works from a touch test, will keep working with it here, but interested in any feedback / collaboration. Seems critical to support, as this should be a 4-5x reduction in cost. Once I finally figured out what to change, was fairly simple, concerns are:

  1. We don't want a spot instance for the scheduler. checking if worker_class is present seems to work, but is not elegant.
  2. Unclear what behaviour will be when spot instances are reclaimed. I'm hoping it will gracefully recover and delete the worker?

For reference: https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_SpotMarketOptions.html

diff --git a/dask_cloudprovider/aws/ec2.py b/dask_cloudprovider/aws/ec2.py
index 1a4f27d..486eeb6 100644
--- a/dask_cloudprovider/aws/ec2.py
+++ b/dask_cloudprovider/aws/ec2.py
@@ -85,7 +85,6 @@ class EC2Instance(VMInterface):
         """
         https://botocore.amazonaws.com/v1/documentation/api/latest/reference/services/ec2.html#EC2.Client.run_instances
         """
-        # TODO Enable Spot support

+         spot = hasattr(self, "worker_class")
@@ -152,6 +151,15 @@ class EC2Instance(VMInterface):
                 ],
             }

+            if spot:
+                vm_kwargs['InstanceMarketOptions'] = {
+                    'MarketType': 'spot',
+                    'SpotOptions': {
+                        'SpotInstanceType': 'one-time',
+                    }
+                }
+
             if self.key_name:
                 vm_kwargs["KeyName"] = self.key_name
jacobtomlinson commented 2 years ago

It would be great to support this! We already support spot in the ECSCluster manager and it would be awesome to have it here too.

I would expect that when the VM is terminated it will be gracefully shut down, so the process gets a SIGTERM and it will attempt to notify the scheduler and push any memory off onto other workers.

Depending on how much memory is on the worker the time allowed to shutdown may not be long enough, so we might want the workers to watch for the termination notification and preemptively shutdown. #48 covers this for ECSCluster and it would likely be reusable here too.

We already have something similar for the Azure preemptible instances. https://github.com/dask/dask-cloudprovider/blob/75fc8eca21b599d2cf7ec07a5441954c8deb660a/dask_cloudprovider/azure/utils.py#L17-L22

@martinedgefocus your patch looks like a great first start at this, it should definitely be configurable and opt in via the Dask config. Do you have any interest in raising a PR?