dask / dask-cloudprovider

Cloud provider cluster managers for Dask. Supports AWS, Google Cloud Azure and more...
https://cloudprovider.dask.org
BSD 3-Clause "New" or "Revised" License
134 stars 110 forks source link

AWS - ECS/Fargate Cluster - When providing a `scheduler_address`, calling `cluster.close()` terminates the remote scheduler via tcp #375

Open BadAsstronaut opened 2 years ago

BadAsstronaut commented 2 years ago

What happened: Calling cluster.close from a FargateCluster instance terminates the remote scheduler process.

2022-09-01 17:35:21,106 - distributed.scheduler - INFO - Scheduler closing...
2022-09-01 17:35:21,107 - distributed.scheduler - INFO - Scheduler closing all comms
2022-09-01 17:35:21,108 - distributed.scheduler - INFO - Stopped scheduler at 'tcp://172.27.23.183:8786'
2022-09-01 17:35:21,108 - distributed.scheduler - INFO - End scheduler

What you expected to happen: The FargateCluster initiate resources should clean up.

Minimal Complete Verifiable Example:

def main():
    async def run():
        logger.info('-' * 47)
        cluster = FargateCluster(asynchronous=True,
                                 image=image,
                                 fargate_spot=True,
                                 scheduler_address=scheduler_address,
                                 cluster_arn=cluster_arn,
                                 execution_role_arn=execution_role_arn,
                                 task_role_arn=task_role_arn,
                                 cloudwatch_logs_group=cloudwatch_logs_group,
                                 vpc=vpc,
                                 subnets=subnets,
                                 security_groups=security_groups,
                                 fargate_use_private_ip=True)
        cluster.adapt(minimum=1, maximum=5)
        await cluster._start()
        logger.info('-' * 47)
        await asyncio.sleep(120)
        await cluster.close()
    try:
        asyncio.run(run())
    finally:
        logger.info('Adaptive scaler has ended')

Anything else we need to know?: This was a bugaboo to debug. Some breadcrumbs:

dask.distributed Scheduler close distributed SpecCluster _close distributed SpecCluster _start

It looks to me like scheduler_address in CloudProvider is setting scheduler_comm on the SpecCluster; thus when cluster.close() gets called, <SpecClusterInstance>._close() gets invoked and sends the terminate command via rpc.

(EDIT: I tested with cluster._close as well and observed the same behavior.)

Environment:

jacobtomlinson commented 2 years ago

Ouch that does sound painful to debug, sorry you had a bad time!

You're right it looks like the scheduler gets a terminate call when the ECSCluster._close() calls super()._close()

https://github.com/dask/distributed/blob/acf607832c7191cc496a9b4a81760170de85062c/distributed/deploy/spec.py#L429

I wonder if SpecCluster._close should have a shutdown_scheduler kwarg that we can set or something. Would you mind opening an issue on distributed and ping me there?