dask / dask-cloudprovider

Cloud provider cluster managers for Dask. Supports AWS, Google Cloud Azure and more...
https://cloudprovider.dask.org
BSD 3-Clause "New" or "Revised" License
134 stars 110 forks source link

Add support for Fargate Spot #47

Closed jacobtomlinson closed 2 years ago

jacobtomlinson commented 4 years ago

Fargate Spot is a new capability on AWS Fargate that can run interruption tolerant Amazon Elastic Container Service (Amazon ECS) Tasks at up to a 70% discount off the Fargate price.

https://aws.amazon.com/blogs/aws/aws-fargate-spot-now-generally-available/

Looks like AWS have introduced "capacity providers" which allow you to assign tasks to different capacity modes. Current modes are regular and spot.

Spot tasks can be interrupted at any time with a two minute warning in exchange for an up to 70% saving.

rsignell-usgs commented 4 years ago

The blog post shows use of the console, but it looks like using the API we need to first create a capacity provider and then set capacityProviderStrategy to FARGATE_SPOT

@jacobtomlinson does that look about right?

jacobtomlinson commented 4 years ago

That sounds about right to me.

evamaxfield commented 4 years ago

Just curious if there were any updates on this?

TomAugspurger commented 4 years ago

I don't believe so. @jacobtomlinson (the primary dask-cloudprovider maintainer) is out for a couple months.

jthielen commented 3 years ago

@tracek Would you happen to have your own implementation for fargate spot instance use mentioned in https://github.com/dask/dask-cloudprovider/pull/97#issuecomment-707161685 up somewhere or a PR ready? I thought I'd reach out to see what you may happen to already have before I try coming up with something myself (as I have need of this for a current project). Thanks!

tracek commented 3 years ago

@jthielen Sorry for the lag, busy week. Yes, I do, and I am preparing a PR with this feature. There were some change in the meantime, so I need to figure out how to merge the changes properly. If you need something here and now, I can send you the package I built locally. Otherwise I hope to have something next week.

rpanai commented 3 years ago

Hi @tracek do you need any help for this PR?

tracek commented 3 years ago

Hi @rpanai - don't mind if I do. Sorry for the lag with answering, I missed the notification initially. I got the spot instances working, but then when I finally made the PR, I could not figure out how to deal with the delta to the present code. I will take another stab at it and see if I can get it to work. Are you familiar with the cloudprovider code? Asking since I am not sure how deal with some of the elements. I could make a half-witted PR that works, but with a caveat or two - and then could use some help / advice.

jacobtomlinson commented 3 years ago

Please don't hesitate to raise a PR, even if things aren't in a great state. We can always help you push it forwards but need something to start with.

anovv commented 3 years ago

@jacobtomlinson I did some research on this and this is what I found:

As mentioned previously, to support running tasks on Fargate Spot, we need to set appropriate capacity providers for the cluster. Let's assume we want the Scheduler task to run on FARGATE provider (no interruptions) and Workers on FARGATE_SPOT.

Current implementation of FargateCluster assumes that we launch standalone tasks for both Scheduler and Worker with ECS API (using ECS Tasks API directly, without delegating scheduling to ECS Services), we use launchType=FARGATE which tells ECS which infrastructure to provide (note there is no launchType=FARGATE_SPOT).

The problem is: you can't specify capacity provider for each task directly, you need to create a service and specify provider for the service, which should manage tasks scheduling (see this https://stackoverflow.com/questions/69313867/map-specific-aws-ecs-tasks-to-either-fargate-or-fargate-spot-capacity-providers/69314851?noredirect=1#comment122533732_69314851). So, as I understand, since we need to run only Workers on spot, we can keep Scheduler task as it is now (with launchType=FARGATE), but for workers we need to create a Service with according capacity provider (and also remove launchType, since if it is specified ECS ignores capacity providers).

Now what is not clear to me is how to combine ECS Service API with Dask scheduling logic. ECS Service manages number of running tasks itself, we simply specify min/max/desired count. FargateCluster is a SpecCluster and, as I understand, Scheduler manages state of all workers internally (scale method use scheduler_comm api to create/retire workers https://github.com/dask/distributed/blob/main/distributed/deploy/spec.py#L472). How would we plug ECS here? Won't there be any type of logic duplication?

Another question is how to handle interruption notice. If a Worker silently fails, does Scheduler know about it and just recreates a new one to keep desired count? If so, we should keep in mind that (afaik) ECS Service does the same and we may have duplication here as well (if we did not integrate ECS API in Scheduler's logic).

Let me know your thoughts, thanks!

jacobtomlinson commented 3 years ago

but for workers we need to create a Service with according capacity provider

We need to manage the worker tasks manually, so do you think we could do this the other way round and run the scheduler as a service with the FARGATE launch type?

Now what is not clear to me is how to combine ECS Service API with Dask scheduling logic.

Dask workers are stateful, they store tasks and futures on behalf of the user, so they generally do not play well with scalers that treat them as fungible. Therefore the Dask scheduler needs to be in control of selecting which workers can be safely removed and migrating any memory.

Another question is how to handle interruption notice.

For Azure we have a plugin that listens for the interruption warning and attempts to migrate state away from a worker before it is removed. We could do with something similar for AWS.

mreferre commented 3 years ago

@dirtyValera

The problem is: you can't specify capacity provider for each task directly, you need to create a service and specify provider for the service, which should manage tasks scheduling

You can actually run a task that is not part of a service pointing to a capacity provider. You are right that there is no launchType=FARGATE_SPOT option, in that case you can use the --capacity-provider-strategy flag to point to the FARGATE_SPOT capacity provider. This is the CLI version of the runTask documentation but it's the same for the corresponding API.

anovv commented 3 years ago

Thanks for your input @mreferre!

Can you also help with spot termination notice? What AWS service/api do we monitor for Fargate task interruption events?

mreferre commented 3 years ago

Thanks for your input @mreferre!

Can you also help with spot termination notice? What AWS service/api do we monitor for Fargate task interruption events?

The Handling Fargate Spot termination notices in this documentation page should put you on the right track.

There are two components to the interruption: handling the SIGTERM/SIGKILL sequence within the container and monitoring when the termination occurs. The former is business as usual as it's what you should be doing regardless of infrastructure. The latter is specific to Fargate Spot and the service/api you need to use is EventBridge (which is the hub for all events occurring on AWS). The doc has an example of the rule you'd need to configure to trap spot interruptions.

anovv commented 3 years ago

@jacobtomlinson @mreferre

From the docs:

When tasks using Fargate Spot capacity are stopped due to a Spot interruption, a two-minute warning is sent before a task is stopped. The warning is sent as a task state change event to Amazon EventBridge and a SIGTERM signal to the running task

Do I understand correctly, if container has SIGTERM handling, we don't really need to handle termination notice from outside the container (since ECS sends both SIGTERM to the container and termination notice to EventBridge)? @jacobtomlinson do we need a worker plugin in this case? How do workers react to SIGTERM?

mreferre commented 3 years ago

@dirtyValera yes, your understanding is correct. To be clear, we send the SIGTERM (and then SIGKILL after 2 minutes) to allow the container to shutdown properly. If it doesn't we will kill it anyway. We do this so that the interruption can be as graceful as possible. We also send the event to EventBridge so that your higher level external scheduling algorithm can take action (if it needs to). If you use an ECS service the service scheduling logic will try to re-schedule the tasks to meet the desired state but if you launched the tasks with the runTask API (i.e. without an ECS service) then the tasks are gone and there is no attempt to restart them (within ECS).

anovv commented 3 years ago

On a different note, I was able to launch worker tasks on FARGATE_SPOT provider (thanks to @mreferre comment), but I'm seeing this error:

future: <Task finished name='Task-444' coro=<_wrap_awaitable() done, defined at /usr/local/Cellar/python@3.9/3.9.6/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py:683> exception=TimeoutException('Failed to find worker ip address after 60 seconds.')>
Traceback (most recent call last):
  File "/usr/local/Cellar/python@3.9/3.9.6/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py", line 690, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/usr/local/lib/python3.9/site-packages/dask_cloudprovider/aws/ecs.py", line 163, in _
    await self.start()
  File "/usr/local/lib/python3.9/site-packages/dask_cloudprovider/aws/ecs.py", line 309, in start
    await self._set_address_from_logs()
  File "/usr/local/lib/python3.9/site-packages/dask_cloudprovider/aws/ecs.py", line 197, in _set_address_from_logs
    while timeout.run():
  File "/usr/local/lib/python3.9/site-packages/dask_cloudprovider/utils/timeout.py", line 74, in run
    raise self.exception
dask_cloudprovider.utils.timeout.TimeoutException: Failed to find worker ip address after 60 seconds.

This goes down to _set_address_from_logs function. I'm not really sure what it does, @jacobtomlinson, could you give some context? For reference, I'm able to see workers on Dask dashboard and all of them have public IPs.

mreferre commented 3 years ago

If find means reach in this case it could be that the layer2/3 networking may be misconfigured. Each task is connected to the VPC and it has a Security group associated. Routing is guaranteed inside the VPC by default across all subnets but SGs need to be configured so that traffic is allowed and not blocked.

Unless this is more of an application problem you are seeing in which case I'll shut up :)

jacobtomlinson commented 3 years ago

We parse the logs from each task to find out what IP address the worker is running at. It looks like that is timing out. Can you verify that you are able to access the logs for these worker tasks?

anovv commented 3 years ago

@jacobtomlinson thanks, I'll investigate and get back to you. Can you also explain if workers handle SIGTERM and if the worker plugin needed?

jacobtomlinson commented 3 years ago

Yeah workers should handle SIGTERM so there is no need to watch for the termination notice.

anovv commented 3 years ago

We parse the logs from each task to find out what IP address the worker is running at. It looks like that is timing out. Can you verify that you are able to access the logs for these worker tasks?

@jacobtomlinson I think I found the reason, the log line with the address takes longer to appear than default timeout. This seems to happen on FARGATE_SPOT only (for workers only) because , I assume, the infrastructure is slower (although I use the same resource spec and image for workers and scheduler). I also install additional pip packages on image initialisation (via EXTRA_PIP_PACKAGES env var), so it takes longer than expected. Setting find_address_timeout=90 seemed to fix this.

Not related to the main thread, but what is the rationale behind parsing logs to find address? This looks a bit hacky, e.g. if a user adds extra initialisation steps (same what I did), the logs will take longer to appear and the cluster won't start. From what I saw, the address from logs is always the same as private_ip returned from AWS here, both for scheduler and workers, why not use this value?

mreferre commented 3 years ago

[ tangential ] @dirtyValera I don't think this is related to SPOT specifically but rather to the fact that Fargate in general sources capacity from a mixed pool of hardware over which (for now) you don't have control. I can't say if it's more likely that this would occur more frequently on SPOT but I just wanted to call out that the dimension that allows you to save that much with SPOT isn't "bad performance" but rather "interruptability". The fact that you could solve it by tweaking the timeout from 60 to 90 seconds makes me thing you are landing on slightly lower spec hw that can't conclude your routines in 60 secs.

jacobtomlinson commented 3 years ago

@dirtyValera I'm very much up for changing things here, this cluster manager is a little old now. Most cluster managers don't parse logs for this info anymore. But here's the rationale:

anovv commented 3 years ago

@dirtyValera I'm very much up for changing things here, this cluster manager is a little old now. Most cluster managers don't parse logs for this info anymore. But here's the rationale:

  • Historically when we create the scheduler we had to block until the scheduler is running. One way to do this is wait for the log message that shows the IP. I think things are robust enough now that we can just grab the IP from the API and move on, this is how the EC2 cluster manager works.
  • Workers just reuse the logic from the scheduler, hence the same behaviour.
  • We need the IP of the scheduler for the EC2Cluster/FargateCluster object to connect to. At one point I think the only way to get this was via the log, but now we can probably just grab it from the API.

I see, I'll make a separate issue to track this

anovv commented 3 years ago

@jacobtomlinson in the meantime, I made https://github.com/dask/dask-cloudprovider/pull/312 to add support for Fargate Spot, can you take a look?

st33v commented 3 years ago

I'm testing out the fargate_spot option and I get:

InvalidParameterException: An error occurred (InvalidParameterException) when calling the RunTask operation: The specified capacity provider strategy cannot contain a capacity provider that is not associated with the cluster. Associate the capacity provider with the cluster or specify a valid capacity provider and try again.

the dask cloud provider did not seem to create a capacity provider on the cluster.

my code looks like this:

from dask_cloudprovider.aws import FargateCluster
cluster = FargateCluster(
    fargate_spot=True, # supported in newer dask cloud code... but doesn't work. "capacity providers?"
    image=image
)

cluster.adapt(minimum=1, maximum=20)
davidvhill commented 2 years ago

InvalidParameterException has been corrected with issue #328

jacobtomlinson commented 2 years ago

I think we can close this out now.