dask / dask-yarn

Deploy dask on YARN clusters
http://yarn.dask.org
BSD 3-Clause "New" or "Revised" License
69 stars 41 forks source link

distributed 2022.3.0 no more compatible with dask-yarn because of missing "status" attribute in YarnCluster #155

Open NHanser opened 2 years ago

NHanser commented 2 years ago

What happened: error while creating Client object

What you expected to happen: correct init

Minimal Complete Verifiable Example:

from dask_yarn import YarnCluster
from dask.distributed import Client
# Create a cluster
cluster = YarnCluster()
# Connect to the cluster
client = Client(cluster)

Anything else we need to know?:

Environment:

jcrist commented 2 years ago

Can you provide the traceback of the error you saw?

NHanser commented 2 years ago
AttributeError                            Traceback (most recent call last)
Input In [2], in <cell line: 7>()
      3 from dask import dataframe as dd
      6 cluster = YarnCluster()
----> 7 client = Client(cluster)
      8 cluster

File ~/dask_tests/daskenv/lib64/python3.8/site-packages/distributed/client.py:834, in Client.__init__(self, address, loop, timeout, set_as_default, scheduler_file, security, asynchronous, name, heartbeat_interval, serializers, deserializers, extensions, direct_to_workers, connection_limit, **kwargs)
    831 elif isinstance(getattr(address, "scheduler_address", None), str):
    832     # It's a LocalCluster or LocalCluster-compatible object
    833     self.cluster = address
--> 834     status = getattr(self.cluster, "status")
    835     if status and status in [Status.closed, Status.closing]:
    836         raise RuntimeError(
    837             f"Trying to connect to an already closed or closing Cluster {self.cluster}."
    838         )

AttributeError: 'YarnCluster' object has no attribute 'status'
jacobtomlinson commented 2 years ago

I've noticed this error popping up in other places too.

yup111 commented 2 years ago

I have same issue.

jtrive84 commented 2 years ago

Same issue using dask/distributed 2022.6.1 with dask-yarn 0.9.

santosh-d3vpl3x commented 2 years ago

As per newer implementations in distributed we need to extend SpecCluster to implement resource managers like yarn.

Minimal example I hacked together:

import skein
from distributed import SpecCluster
from distributed.deploy import ProcessInterface

class YarnProcess(ProcessInterface):
    def __init__(self, **kwargs):
        super().__init__()
        self.service_name = None
        self.cli: skein.ApplicationClient = skein.ApplicationClient.from_current()
        self.container = None
        _ = kwargs

    async def close(self):
        self.cli.kill_container(self.container)
        await super().close()

class DaskYarnScheduler(YarnProcess):
    def __init__(self, **kwargs):
        super().__init__()
        self.service_name: str = "dask.scheduler"
        self.container = None
        _ = kwargs

    async def start(self):
        self.cli.scale(self.service_name, count=1)
        self.address = self.cli.kv.wait("dask.scheduler").decode()
        self.container = self.cli.get_containers(services=[self.service_name])[0].id
        await super().start()

class DaskYarnWorker(YarnProcess):
    def __init__(self, address, **kwargs):
        super().__init__()
        self.service_name: str = "dask.worker"
        _ = kwargs, address

    async def start(self):
        self.container = self.cli.add_container(self.service_name).id
        await super().start()

class YarnCluster(SpecCluster):
    def __init__(self, security=None):
        super().__init__(
            scheduler={"cls": DaskYarnScheduler, "options": {}},
            worker={"cls": DaskYarnWorker, "options": {}},
            security=security,
        )
        self.spec = skein.ApplicationClient.from_current().get_specification()

cluster = YarnCluster()
client = Client(cluster.scheduler_address)
cjac commented 2 months ago

@santosh-d3vpl3x - can you please review #162 to make sure that I'm implementing correctly?

@bradmiro - I could use some help executing the tests and sanity checking these changes.