drabastomek / dask-cloudprovider

Cloud provider cluster managers for dask distributed
https://cloudprovider.dask.org
BSD 3-Clause "New" or "Revised" License
2 stars 2 forks source link

Add `_cleanup_stale_resources()` method #6

Open drabastomek opened 4 years ago

drabastomek commented 4 years ago

Add the method in the __init__(self) method, before calling super()._start()

drabastomek commented 4 years ago

Do we need an equivalent of this?

    """
    # Clean up clusters (clusters with no running tasks)
    session = aiobotocore.get_session()
    async with session.create_client("ecs") as ecs:
        active_clusters = []
        clusters_to_delete = []
        async for page in ecs.get_paginator("list_clusters").paginate():
            clusters = (
                await ecs.describe_clusters(
                    clusters=page["clusterArns"], include=["TAGS"]
                )
            )["clusters"]
            for cluster in clusters:
                if DEFAULT_TAGS.items() <= aws_to_dict(cluster["tags"]).items():
                    if cluster["runningTasksCount"] == 0:
                        clusters_to_delete.append(cluster["clusterArn"])
                    else:
                        active_clusters.append(cluster["clusterName"])
        for cluster_arn in clusters_to_delete:
            await ecs.delete_cluster(cluster=cluster_arn)

        # Clean up task definitions (with no active clusters)
        async for page in ecs.get_paginator("list_task_definitions").paginate():
            for task_definition_arn in page["taskDefinitionArns"]:
                response = await ecs.describe_task_definition(
                    taskDefinition=task_definition_arn, include=["TAGS"]
                )
                task_definition = response["taskDefinition"]
                task_definition["tags"] = response["tags"]
                task_definition_cluster = aws_to_dict(task_definition["tags"]).get(
                    "cluster"
                )
                if (
                    task_definition_cluster is None
                    or task_definition_cluster not in active_clusters
                ):
                    await ecs.deregister_task_definition(
                        taskDefinition=task_definition_arn
                    )

    # Clean up security groups (with no active clusters)
    async with session.create_client("ec2") as ec2:
        async for page in ec2.get_paginator("describe_security_groups").paginate(
            Filters=[{"Name": "tag:createdBy", "Values": ["dask-cloudprovider"]}]
        ):
            for group in page["SecurityGroups"]:
                sg_cluster = aws_to_dict(group["Tags"]).get("cluster")
                if sg_cluster is None or sg_cluster not in active_clusters:
                    await ec2.delete_security_group(
                        GroupName=group["GroupName"], DryRun=False
                    )

    # Clean up roles (with no active clusters)
    async with session.create_client("iam") as iam:
        async for page in iam.get_paginator("list_roles").paginate():
            for role in page["Roles"]:
                role["Tags"] = (
                    await iam.list_role_tags(RoleName=role["RoleName"])
                ).get("Tags")
                if DEFAULT_TAGS.items() <= aws_to_dict(role["Tags"]).items():
                    role_cluster = aws_to_dict(role["Tags"]).get("cluster")
                    if role_cluster is None or role_cluster not in active_clusters:
                        attached_policies = (
                            await iam.list_attached_role_policies(
                                RoleName=role["RoleName"]
                            )
                        )["AttachedPolicies"]
                        for policy in attached_policies:
                            await iam.detach_role_policy(
                                RoleName=role["RoleName"], PolicyArn=policy["PolicyArn"]
                            )
                        await iam.delete_role(RoleName=role["RoleName"])