Parsl / parsl

Parsl - a Python parallel scripting library
http://parsl-project.org
Apache License 2.0
503 stars 195 forks source link

Terminate Workers after Job is done #776

Open Olllom opened 5 years ago

Olllom commented 5 years ago

Is there a way to terminate workers after they have completed a single task, i.e. assign exactly one worker per task?

I am trying to set up a workflow on a Slurm Cluster using the following code:

parsl.clear()

config = Config(
    executors=[IPyParallelExecutor(label="cluster",
                                   provider= SlurmProvider(
                                       'wmere',
                                       launcher=SrunLauncher(),
                                       walltime="00:01:00",
                                       init_blocks=0,
                                       max_blocks=10,
                                       nodes_per_block=1,
                                       parallelism=1,
                                   ),
                                   managed=False,
                                   workers_per_node=1,
                                  ),
               ThreadPoolExecutor(label="local", max_threads=8)
              ]
)

kernel = parsl.load(config)

I am also a little confused that more than 10 jobs get submitted (although max_blocks=10).

benclifford commented 5 years ago

I am also a little confused that more than 10 jobs get submitted (although max_blocks=10).

... might be https://github.com/Parsl/parsl/issues/704

What is your higher goal getting workers to terminate after one task? Because the task is somehow polluting the worker?

Olllom commented 5 years ago

I am going to run simulations that take somewhere between 4 and 8 hours, depending on the hardware on the node that gets allocated.

My goal is to

  1. prevent idle workers blocking the cluster.
  2. prevent workers from terminating in the middle of a task (which happens when they get assigned to an existing, idle worker).
yadudoc commented 5 years ago

Hi Andreas,

I'm working with an incomplete picture of your workflow requirements, so we might require a bit of back and forth before we come to a reasonable configuration for your needs. In the Parsl model we use a pilot model where we request a large number of nodes in a single job mostly because these queues can be slow and we can then schedule tasks onto nodes using our own workers. If your queues give you nodes reasonably quickly and you can request many jobs at a time (schedulers often have job limits), you can reduce resource wastage by getting jobs with single nodes.

To 1: There are some inefficiencies in Parsl currently where, jobs are not de-allocated when idle, there's some beta features on a branch that address this. You generally run into this when # of tasks ~ # of workers. If you have a lot of tasks you could reduce the total resources you get to minimize wastage/idle time.

To 2: I do not understand how this happens. Can you clarify this scenario please ?

Here's a config that uses the newer executor that supports more efficient scaling down:

from parsl.providers import SlurmProvider
from parsl.launchers import SrunLauncher
from parsl.executors import ThreadPoolExecutor, HighThroughputExecutor

# In this config, we create a 'cluster' executor, that starts 10 jobs
# each with 1 worker, ensuring that one node can only execute 1 task
# at a time. We don't force the worker to quit after it completes a task.
config = Config(
    executors=[
        HighThroughputExecutor(
            label="cluster",
            max_workers = 1,                # Ensures each node only starts one worker
            provider= SlurmProvider(
                'wmere',
                launcher=SrunLauncher(),
                walltime="08:00:00",        # Updated to at least 8 hours, which is the max task duration
                init_blocks=10,             # Start with the number of tasks you have, 
                max_blocks=10,
                nodes_per_block=1,
                parallelism=1)
        ),            
        ThreadPoolExecutor(
            label="local", 
            max_threads=8
        )]
)

# parsl.load(config)
Olllom commented 5 years ago

Hi,

First of all, thank you for your help - it is very much appreciated. I am keen to use Parsl and it would be fantastic to get this up and running.

Regarding the HighThroughputExecutor:

I had tried that before and it does not work for me. See below for the python script and description of what happens (or does not happen). With the IPyParallelExecutor, at least I got the tasks to produce output.

Regarding my workflow:

I would like to start many (~2000) simulations, each 4-8h long, from a Jupyter Notebook that runs on the head node of a medium-sized Slurm cluster. Concurrently, I want do some analysis of the output data on the head node and ultimately integrate simulations into an asynchronous optimization workflow.

Problem 1:

Instead of requesting a lot of nodes for 24 hours each time (and basically blocking the whole queue for a long time), I would like to allow a separate worker for every task, which gives other users the chance to step in between my jobs. Most importantly, the nodes should not be blocked by idle jobs.

Problem 2 (Clarification):

It would be undesirable if workers stopped executing while a simulation is running. Say, I allow a walltime of 8 hours. One task finishes after 6 hours and the worker is assigned a new task. After 2 more hours, the job would be cancelled by the slurm manager without the simulation having completed. In principle, this could be avoided by forcing each task to be executed on a separate worker.

If my application is out of the scope of what Parsl was designed for, my apologies for wasting your time.

Complete Script

import parsl
from parsl.app.app import python_app
from parsl.config import Config
from parsl.providers import SlurmProvider
from parsl.launchers import SrunLauncher
from parsl.executors import ThreadPoolExecutor, HighThroughputExecutor

print(parsl.__version__)

0.7.1

parsl.clear()

Your Snippet

config = Config(
    executors=[
        HighThroughputExecutor(
            label="cluster",
            max_workers = 1,                # Ensures each node only starts one worker
            provider= SlurmProvider(
                'wmere',
                launcher=SrunLauncher(),
                walltime="08:00:00",        # Updated to at least 8 hours, which is the max task duration
                init_blocks=10,             # Start with the number of tasks you have, 
                max_blocks=10,
                nodes_per_block=1,
                parallelism=1)
        ),            
        ThreadPoolExecutor(
            label="local", 
            max_threads=8
        )]
)

kernel = parsl.load(config)

Sample Apps for Testing

@python_app(executors=["cluster"]) #, walltime=60)
def hello_from_cluster_node():
    import socket
    import os
    return "Hello from CLUSTER node: " + socket.gethostname() + " " + os.environ["SLURM_JOB_ID"]

@python_app(executors=["local"]) #, walltime=60)
def hello_from_local_node():
    import socket
    import datetime
    return "Hello from  LOCAL  node: " + socket.gethostname() + " " + str(datetime.datetime.now())

futures = []
for i in range(10):
    futures.append(hello_from_cluster_node())
for i in range(5):
    futures.append(hello_from_local_node())
print('\n'.join([f.result() for f in futures]))

Behavior

The workers start up as expected, but there is no feedback to the script. The code gets stuck at the print statement in the last line, waiting for output indefinitely. When the jobs are cancelled manually, parsl submits new jobs (all idle, too). The parsl.log file keeps printing the following:

....
2019-02-15 19:20:18 parsl.executors.high_throughput.executor:408 [DEBUG]  Got outstanding count: 10
2019-02-15 19:20:18 parsl.executors.high_throughput.executor:414 [DEBUG]  Got managers: []
2019-02-15 19:20:18 parsl.dataflow.strategy:202 [DEBUG]  Executor cluster has 10 active tasks, 10/0/0 running/submitted/pending blocks, and 0 connected engines
2019-02-15 19:20:23 parsl.executors.high_throughput.executor:408 [DEBUG]  Got outstanding count: 10
2019-02-15 19:20:23 parsl.executors.high_throughput.executor:414 [DEBUG]  Got managers: []
2019-02-15 19:20:23 parsl.dataflow.strategy:202 [DEBUG]  Executor cluster has 10 active tasks, 10/0/0 running/submitted/pending blocks, and 0 connected engines
2019-02-15 19:20:28 parsl.executors.high_throughput.executor:408 [DEBUG]  Got outstanding count: 10
2019-02-15 19:20:28 parsl.executors.high_throughput.executor:414 [DEBUG]  Got managers: []
2019-02-15 19:20:28 parsl.dataflow.strategy:202 [DEBUG]  Executor cluster has 10 active tasks, 10/0/0 running/submitted/pending blocks, and 0 connected engines
2019-02-15 19:20:33 parsl.executors.high_throughput.executor:408 [DEBUG]  Got outstanding count: 10
2019-02-15 19:20:33 parsl.executors.high_throughput.executor:414 [DEBUG]  Got managers: []
2019-02-15 19:20:33 parsl.dataflow.strategy:202 [DEBUG]  Executor cluster has 10 active tasks, 10/0/0 running/submitted/pending blocks, and 0 connected engines
2019-02-15 19:20:38 parsl.executors.high_throughput.executor:408 [DEBUG]  Got outstanding count: 10
2019-02-15 19:20:38 parsl.executors.high_throughput.executor:414 [DEBUG]  Got managers: []
2019-02-15 19:20:38 parsl.dataflow.strategy:202 [DEBUG]  Executor cluster has 10 active tasks, 10/0/0 running/submitted/pending blocks, and 0 connected engines
2019-02-15 19:20:43 parsl.executors.high_throughput.executor:408 [DEBUG]  Got outstanding count: 10
2019-02-15 19:20:43 parsl.executors.high_throughput.executor:414 [DEBUG]  Got managers: []
2019-02-15 19:20:43 parsl.dataflow.strategy:202 [DEBUG]  Executor cluster has 10 active tasks, 10/0/0 running/submitted/pending blocks, and 0 connected engines
2019-02-15 19:20:48 parsl.executors.high_throughput.executor:408 [DEBUG]  Got outstanding count: 10
2019-02-15 19:20:48 parsl.executors.high_throughput.executor:414 [DEBUG]  Got managers: []
...
Olllom commented 5 years ago

Oh yes, and to answer your questions:

yadudoc commented 5 years ago

Hi Olllom, I think I understand this use-case better now. We do not have a mechanism to terminate workers after executing a single task but this is something I can add reasonably quickly to one of our executors. Since we are deprecating IPyParallel and switching over to HighThroughputExecutor (HTEX) as a replacement, I'd recommend you switch over to HTEX. I can help debug the issue you had over slack (link to our slack is on this page : http://parsl-project.org/support.html).

Once you've got HTEX working, I'll create a new branch with the feature you requested. I'm hoping you can help by testing it out on your cluster. Does that work for you ?

Olllom commented 5 years ago

Sounds great. Thank you!