dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 718 forks source link

[RFC] Worker groups and environments #495

Open jcrist opened 8 years ago

jcrist commented 8 years ago

This is a concrete proposal for solving #85. For preliminary discussion, see that issue.

What is an environment:

An environment is defined as an object of type Environment, which currently has the following design:

class Environment(object):
    def isinstance(self):
        """Return if worker it's run on is an instance of the environment"""
        pass

    def setup(self):
        """Setup the environment"""
        pass

    def teardown(self):
        """Teardown the environment"""
        pass

The reason for making it a class is grouping the methods together, providing a consistent place to store state, and making the user facing signature consistent (even if more optional methods are added to the class). Using the examples from the original issue, example Environment classes might be:

class MemoryGreaterThan(Environment):
    def __init__(self, threshold=30e9):
        self.threshold = threshold

    def isinstance(self):
        import psutil
        return psutil.virtual_memory.total > self.threshold

class DataBaseAccess(Environment):
    def __init__(self, credentials):
        self.credentials = credentials

    def isinstance(self):
        return can_connect_to_database(self.credentials)

    def setup(self):
        self.conn = connect_to_database(self.credentials)

    def teardown(self):
        self.conn.disconnect()

How environments are registered

Environments are registered by passing instances of the object to Executor.register_environment, along with a name. The decision to use instances instead of classes was made to allow environments to be parametrized (if needed), and contain internal state. An example of registering environments is:

# possibly use keywords instead (e.g. `env_name=env_object`)
exc.register_environment('medium', MemoryGreaterThan(16e9))
exc.register_environment('large', MemoryGreaterThan(32e9))
exc.register_environment('database', DataBaseAccess('mypassword'))

Changes to the scheduler/worker state and transitions

When an environment is registered

  1. After being registered, the environment is serialized and sent to the scheduler, where it is stored in a mapping {name: environment_object} called environments.
  2. The scheduler sends the environment to each worker. There the isinstance method is ran to check if the worker is an instance of the environment. If so, the worker stores it in a mapping of {name: environment_object} called environments. The (optional) setup method on the environment is then run, which can initialize any additional needed state.
  3. The worker replies to the scheduler, letting it know if the environment was added or not. If it was added, the scheduler updates a mapping of {environment_name: {worker_ids}} to reflect this. Call this environment_workers.

    When a worker is added

  4. The scheduler iterates through all of the current environments, and does the dance described above. There might be merit in batching these together (sending a list instead of many instances), unknown.

    When a worker is removed

  5. The worker runs the (optional) teardown method to cleanup any necessary state. Note that this won't happen if the worker fails in a hard manner (segfault, etc...).
  6. The scheduler removes the worker from environment_workers as needed

    When a task is submitted

  7. The workers keyword to submit will be modified to take in environment names as well. This will be sent along in addition to restrictions and loose_restrictions to the scheduler's update_graph method as an environments keyword.
  8. update_graph will update a mapping of {key: {environment_names}} called environment_restrictions on the state. The reason for not converting this immediately into restrictions is that the workers in the environment may change between task submission time and task run time (new workers may register for example).

    When selecting tasks to run

  9. ensure_occupied will be modified to also take into account environment_restrictions when picking tasks to run and where to run them.

    Summary of new state

New worker state

New scheduler state

Tasks can call the distributed.get_environment('name') function to get the environment object from inside the worker. The implementation of this might look like:

def get_environment(name):
    worker = get_worker_this_is_running_on()
    return worker.environments[name]

This allows tasks to access whatever state the environment may contain internally. For example, a user may do the following to pull data from a stored database connection:

def get_data(query):
    env = distributed.get_environment('database')
    return env.conn.run_query(query)

exc.submit(get_data, query, workers='database')
mrocklin commented 8 years ago

Some thoughts:

  1. I hadn't thought about changing a task's restrictions after it entered the scheduler but this is a good idea. We fail to do this now when users enter hostnames rather than full addresses and workers from that host appear afterwards. It may be sufficient (and clean) to just keep all terms that the user has used in workers= in restrictions all the time without ever translating down until we absolutely need to in decide_worker. This would make decide_worker slightly more complex because it would have to take in some function that took a list of workers= terms and produces a list of actual addresses. This function exists and is currently in Scheduler.worker_list (which would be expanded). This would have the benefit of restricting logic to only one place and leaving this decision until the very end. It would also avoid having to add new {key: {environment_names}} state, which is great, because state requires maintenance. There are a few places where worker_list could be used but isn't, for example in broadcast.
  2. I think that the worker function should consume a list of environments. As you've mentioned this will have to be called both when a client registers an environment with the scheduler (this will require a new entry in Scheduler.handlers) and in Scheduler.add_worker. It would be nice to make the worker function idempotent so that the scheduler doesn't need to track which environments the worker has seen already and can just send the whole batch down without caring. (Idempotence is a core virtue in this project)
  3. See worker_client.py:get_worker, which should probably be moved to worker.py. I don't think we necessarily need this though unless we need to support setup/teardown, which I would caution against for a first go around.
  4. The term isinstance feels off here to me. Workers don't feel like instances of these environments, they feel like things that satisfy certain conditions. I would personally go for a generic term like condition, or satisfies but there are probably better things out there as well.
  5. I strongly prefer using a function input over a class input to start. I expect specifying only a condition to be the common case and for this case a function is sufficient and feels (to me) significantly lighter weight. I also think that this PR will be interesting enough without throwing in setup/teardown.
mrocklin commented 8 years ago

Hrm, people like @ogrisel might have a use for setup(). He often deploys software with something like the following:

def install():
    import os
    os.system('pip install x y z')

client.run(install)

This would be a good and use case for setup, to always install certain libraries when the workers come online.

TomAugspurger commented 7 years ago

Since the condition or setup methods may take a while to execute on the workers, would you expect client.register('env', environment) to block, or return a Future?

mrocklin commented 7 years ago

There are two options:

  1. Users don't need to wait on the action to finish. We just send the appropriate message up to the scheduler in a fire-and-forget manner. This is the case when we submit tasks like map or submit. We use the _send_to_scheduler method on the client.

  2. Users do want to wait on the action to finish. This is the case for operations like gather. We make two methods in this case, _gather, which is a tornado coroutine (and so technically returns a tornado Future), and gather, which blocks on that coroutine finishing.

olly-writes-code commented 5 years ago

Did we make any progress with this issue?

TomAugspurger commented 5 years ago

No, I don't think so.

On Wed, Aug 14, 2019 at 7:48 PM kindjacket notifications@github.com wrote:

Did we make any progress with this issue?

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/495?email_source=notifications&email_token=AAKAOIWUN3RM3A65VVMAVRDQESRUDA5CNFSM4CO5QBA2YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOD4KQPYI#issuecomment-521471969, or mute the thread https://github.com/notifications/unsubscribe-auth/AAKAOISPF3UPVT3KUNVMOX3QESRUDANCNFSM4CO5QBAQ .