dask / dask-jobqueue

Deploy Dask on job schedulers like PBS, SLURM, and SGE
https://jobqueue.dask.org
BSD 3-Clause "New" or "Revised" License
235 stars 143 forks source link

Dask with jobqueue not using multiple nodes #206

Closed npaulson closed 5 years ago

npaulson commented 5 years ago

I am trying to use Dask to do parallel processing on multiple nodes on supercomputing resources - yet the Dask-distributed map only takes advantage of one of the nodes. Note that I put this up on stackoverflow but didn't get attention so now I'm giving here a go.

Here is a test script I am using to set up the client and perform a simple operation:

import time
from distributed import Client
from dask_jobqueue import SLURMCluster
from socket import gethostname

def slow_increment(x):
    time.sleep(10)
    return [x + 1, gethostname(), time.time()]

cluster = SLURMCluster(
    queue='somequeue',
    cores=2,
    memory='128GB',
    project='someproject',
    walltime='00:05:00',
    job_extra=['-o myjob.%j.%N.out',
               '-e myjob.%j.%N.error'],
    env_extra=['export I_MPI_FABRICS=dapl',
               'source activate dask-jobqueue'])

cluster.scale(2)

client = Client(cluster)

A = client.map(slow_increment, range(8))
B = client.gather(A)

print(client)

for res in B:
    print(res)

client.close()

And here is the output:

<Client: scheduler='tcp://someip' processes=2 cores=4>
[1, 'bdw-0478', 1540477582.6744401]
[2, 'bdw-0478', 1540477582.67487]
[3, 'bdw-0478', 1540477592.68666]
[4, 'bdw-0478', 1540477592.6879778]
[5, 'bdw-0478', 1540477602.6986163]
[6, 'bdw-0478', 1540477602.6997452]
[7, 'bdw-0478', 1540477612.7100565]
[8, 'bdw-0478', 1540477612.711296]

While printing out the client info indicates that Dask has the correct number of nodes (processes) and tasks per node (cores), the socket.gethostname() output and time-stamps indicate that the second node isn't used. I do know that dask-jobqueue successfully requested two nodes, and that both jobs complete at the same time. I tried using different MPI Fabrics for inter- and intra-node communication (e.g. tcp, shm:tcp, shm:ofa, ofa, ofi, dapl) but this did not change the result. I also tried removing the "export I_MPI_FABRICS" command and using the "interface" option, but this caused the code to hang.

Thanks in advance for any assistance.

-Noah

guillaumeeb commented 5 years ago

Note that I put this up on stackoverflow but didn't get attention so now I'm giving here a go.

Sorry we miss this question, I do not personnaly watch Stack Overflow, but I should. You issue may also belong to dask-jobqueue, but that's not really important for the time being.

After a quick glance at your code, I cannot see anything wrong. So we will need to find a way to debug what's going on.

I tried using different MPI Fabrics for inter- and intra-node communication (e.g. tcp, shm:tcp, shm:ofa, ofa, ofi, dapl) but this did not change the result

Dask does not use MPI for communication, only plain TCP, so either ethernet interfaces, or IP over IB. Did you try without any fabrics, and without interface option ?

Some other thoughts:

jhamman commented 5 years ago

I suggest we move this issue to dask-jobqueue. One of the dask devs may be able to actually move this issue there (new GH feature) or @npaulson can just cross post.

mrocklin commented 5 years ago

Transferred. Also gave all org members with the ability to delete issues, which I hope implies the ability to move them.

npaulson commented 5 years ago

Thank you for taking an interest in my issue!

@guillaumeeb, below I'm including short descriptions of my investigations into your various debugging suggestions and after the full code, it's output and stderror.

Dask does not use MPI for communication, only plain TCP, so either ethernet interfaces, or IP over IB. Did you try without any fabrics, and without interface option ?

When I don't explicitly define the fabrics and don't include an interface option the code runs similarly as before.

You should print(client) before the map just to be sure, but I imagine the cluster was formed before the end of the gather, especially if jobs ended at the same time.

To check this I printed the client before the gather operation and it appears to have formed properly.

Did you try opening the Dask Dashboard to watch the computation?

I haven't yet figured out how do to this, though I'm looking into it next.

Do you have something interesting in Slurm stdout or stderr?

The stdout is empty but there are details in stderr.

You can also look at http://jobqueue.dask.org/en/latest/debug.html to see if there is something relevant. Thinking about debug mode for instance.

I have enabled debugging mode and included the output below. Some of the other suggestions seem to deal with getting the jobs themselves through SLURM, though I don't seem to be having a problem in that regard.

Here is the updated version of my code:

import time
import logging
from distributed import Client
from dask_jobqueue import SLURMCluster
from socket import gethostname

logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.DEBUG)

def slow_increment(x):
    time.sleep(10)
    return [x + 1, gethostname(), time.time()]

cluster = SLURMCluster(
    queue='bdwall',
    cores=2,
    memory='128GB',
    project='STARTUP-NPAULSON',
    walltime='00:05:00',
    job_extra=['-o myjob.%j.%N.out',
               '-e myjob.%j.%N.error'])

cluster.scale(2)

client = Client(cluster)

A = client.map(slow_increment, range(8))

time.sleep(15)
print('\n::: start user output :::')
print(client)
print('::: end user output :::\n')

B = client.gather(A)

print('\n::: start user output :::')
print(client)
print('::: end user output :::\n')

for res in B:
    print(res)

client.close()

Here is the output of my code with logging enabled:

DEBUG:Using selector: EpollSelector
DEBUG:Using selector: EpollSelector
DEBUG:Using selector: EpollSelector
DEBUG:Job script:
 #!/bin/bash

#!/usr/bin/env bash
#SBATCH -J dask-worker
#SBATCH -p bdwall
#SBATCH -A STARTUP-NPAULSON
#SBATCH -n 1
#SBATCH --cpus-per-task=2
#SBATCH --mem=120G
#SBATCH -t 00:05:00
#SBATCH -o myjob.%j.%N.out
#SBATCH -e myjob.%j.%N.error
JOB_ID=${SLURM_JOB_ID%;*}

/home/npaulson/miniconda/envs/dask-jobqueue/bin/python -m distributed.cli.dask_worker tcp://140.221.70.5:38499 --nthreads 2 --memory-limit 128.00GB --name dask-worker--${JOB_ID}-- --death-timeout 60

DEBUG:Found 0 active/pending workers.
DEBUG:Scaling up to 2 workers.
DEBUG:Using selector: EpollSelector
DEBUG:Found 0 active/pending workers.
DEBUG:starting 2 workers
DEBUG:Using selector: EpollSelector
DEBUG:writing job script:
#!/bin/bash

#!/usr/bin/env bash
#SBATCH -J dask-worker
#SBATCH -p bdwall
#SBATCH -A STARTUP-NPAULSON
#SBATCH -n 1
#SBATCH --cpus-per-task=2
#SBATCH --mem=120G
#SBATCH -t 00:05:00
#SBATCH -o myjob.%j.%N.out
#SBATCH -e myjob.%j.%N.error
JOB_ID=${SLURM_JOB_ID%;*}

/home/npaulson/miniconda/envs/dask-jobqueue/bin/python -m distributed.cli.dask_worker tcp://140.221.70.5:38499 --nthreads 2 --memory-limit 128.00GB --name dask-worker--${JOB_ID}-- --death-timeout 60

DEBUG:Executing the following command to command line
sbatch --parsable /tmp/tmpl5nkitoz.sh
DEBUG:started job: 802327
DEBUG:writing job script:
#!/bin/bash

#!/usr/bin/env bash
#SBATCH -J dask-worker
#SBATCH -p bdwall
#SBATCH -A STARTUP-NPAULSON
#SBATCH -n 1
#SBATCH --cpus-per-task=2
#SBATCH --mem=120G
#SBATCH -t 00:05:00
#SBATCH -o myjob.%j.%N.out
#SBATCH -e myjob.%j.%N.error
JOB_ID=${SLURM_JOB_ID%;*}

/home/npaulson/miniconda/envs/dask-jobqueue/bin/python -m distributed.cli.dask_worker tcp://140.221.70.5:38499 --nthreads 2 --memory-limit 128.00GB --name dask-worker--${JOB_ID}-- --death-timeout 60

DEBUG:Executing the following command to command line
sbatch --parsable /tmp/tmp_8knxogn.sh
DEBUG:started job: 802328
DEBUG:adding worker tcp://10.70.129.58:32853
DEBUG:job id for new worker: 802328
DEBUG:802328 is a new job or restarting worker
DEBUG:802328 is a new job, adding to running_jobs
DEBUG:adding worker tcp://10.70.130.174:33998
DEBUG:job id for new worker: 802327
DEBUG:802327 is a new job or restarting worker
DEBUG:802327 is a new job, adding to running_jobs

::: start user output :::
<Client: scheduler='tcp://140.221.70.5:38499' processes=2 cores=4>
::: end user output :::

::: start user output :::
<Client: scheduler='tcp://140.221.70.5:38499' processes=2 cores=4>
::: end user output :::

[1, 'bdw-0285', 1543964816.1772363]
[2, 'bdw-0285', 1543964816.1834495]
[3, 'bdw-0285', 1543964826.187123]
[4, 'bdw-0285', 1543964826.1882021]
[5, 'bdw-0285', 1543964836.198275]
[6, 'bdw-0285', 1543964836.199521]
[7, 'bdw-0285', 1543964846.209588]
[8, 'bdw-0285', 1543964846.210915]

Here is the stderror for node 00048:

Warning: Your terminal does not set locales.

If you use unicode text inputs for command line options then this may cause
undesired behavior.  This is rare.

If you don't use unicode characters in command line options then you can safely
ignore this message.  This is the common case.

You can support unicode inputs by specifying encoding environment variables,
though exact solutions may depend on your system:

    $ export LC_ALL=C.UTF-8
    $ export LANG=C.UTF-8

For more information see: http://click.pocoo.org/5/python3/

distributed.nanny - INFO -         Start Nanny at: 'tcp://10.70.130.174:40144'
distributed.diskutils - INFO - Found stale lock file and directory '/blues/gpfs/home/npaulson/2018_10_24_dask/dask-worker-space/worker-bqgulcd7', purging
distributed.worker - INFO -       Start worker at:  tcp://10.70.130.174:33998
distributed.worker - INFO -          Listening to:  tcp://10.70.130.174:33998
distributed.worker - INFO -              nanny at:        10.70.130.174:40144
distributed.worker - INFO -              bokeh at:        10.70.130.174:36591
distributed.worker - INFO - Waiting to connect to:   tcp://140.221.70.5:38499
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          2
distributed.worker - INFO -                Memory:                  128.00 GB
distributed.worker - INFO -       Local Directory: /blues/gpfs/home/npaulson/2018_10_24_dask/dask-worker-space/worker-si3_68ly
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to:   tcp://140.221.70.5:38499
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.worker - INFO - Stopping worker at tcp://10.70.130.174:33998
distributed.worker - INFO - Connection to scheduler broken.  Reconnecting...
distributed.worker - INFO - -------------------------------------------------
distributed.nanny - INFO - Closing Nanny at 'tcp://10.70.130.174:40144'
distributed.dask_worker - INFO - End worker

Here is the stderror for node 00285:

Warning: Your terminal does not set locales.

If you use unicode text inputs for command line options then this may cause
undesired behavior.  This is rare.

If you don't use unicode characters in command line options then you can safely
ignore this message.  This is the common case.

You can support unicode inputs by specifying encoding environment variables,
though exact solutions may depend on your system:

    $ export LC_ALL=C.UTF-8
    $ export LANG=C.UTF-8

For more information see: http://click.pocoo.org/5/python3/

distributed.nanny - INFO -         Start Nanny at: 'tcp://10.70.129.58:43857'
distributed.worker - INFO -       Start worker at:   tcp://10.70.129.58:32853
distributed.worker - INFO -          Listening to:   tcp://10.70.129.58:32853
distributed.worker - INFO -              nanny at:         10.70.129.58:43857
distributed.worker - INFO -              bokeh at:         10.70.129.58:43568
distributed.worker - INFO - Waiting to connect to:   tcp://140.221.70.5:38499
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          2
distributed.worker - INFO -                Memory:                  128.00 GB
distributed.worker - INFO -       Local Directory: /blues/gpfs/home/npaulson/2018_10_24_dask/dask-worker-space/worker-bqgulcd7
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to:   tcp://140.221.70.5:38499
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.worker - INFO - Stopping worker at tcp://10.70.129.58:32853
distributed.diskutils - ERROR - Failed to remove '/blues/gpfs/home/npaulson/2018_10_24_dask/dask-worker-space/worker-bqgulcd7' (failed in <built-in function lstat>): [Errno 2] No such file or directory: '/blues/gpfs/home/npaulson/2018_10_24_dask/dask-worker-space/worker-bqgulcd7'
distributed.worker - INFO - Connection to scheduler broken.  Reconnecting...
distributed.worker - INFO - -------------------------------------------------
distributed.nanny - INFO - Closing Nanny at 'tcp://10.70.129.58:43857'
distributed.dask_worker - INFO - End worker
guillaumeeb commented 5 years ago

I don't think this would change the behavior you see, but you should set local-directory kwarg to /tmp, /scratch or any other space local to you compute nodes.

Still no idea here, you should try to use the dashboard to se if you see some strange things.

npaulson commented 5 years ago

I've tried accessing the dashboard, which sometimes populates with the workers but other times does not. When it does populate with the workers, it only shows two - which I assume are the workers associated with the single node that reports back.

guillaumeeb commented 5 years ago

If it shoes two workers, then it's showing both nodes you're using. You've got one worker process per node with your config. It is really strange that tasks are only executed on one node. You should be able to see worker activity with the dashboard, and see where your tasks are executed.

goraj commented 5 years ago

I believe I have the same issue using SGE/UGE, I can start multiple job scripts but every job runs on the same node/PID

from dask.distributed import Client
from dask_jobqueue import SGECluster
import os
import socket

def do_process():
    hostname = socket.gethostname()
    pid = os.getpid()
    return hostname, pid

if __name__ == '__main__':
    memory = '10'
    n_workers = 10
    use_cluster = True
    if use_cluster:
        cluster = SGECluster(#queue='default.q',
                            processes=1, # number of python processes per job
                            cores=24, # cores per job
                            memory='10GB',
                            walltime='1500',
                            resource_spec=f'm_mem_free=10G')
        cluster.scale(n_workers)
        print(cluster.job_script())
    with Client(cluster) as client:
        pool = []
        for x in range(n_workers):
            pool.append(client.submit(do_process, pure=False, ))

        result = client.gather(pool)
        print(f'result: {result}')
        cluster.close()
        import IPython
        IPython.embed()

Result:

result: [('node2114', 172018), ('node2114', 172018), ('node2114', 172018), ('node2114', 172018), ('node2114', 172018), ('node2114', 172018), ('node2114', 172018), ('node2114', 172018), ('node2114', 172018), ('node2114', 172018)]

qstat output:

Every 0.1s: qstat                                                                                                                    Tue Dec 11 17:15:16 2018

job-ID     prior   name       user         state submit/start at     queue                          jclass                         slots ja-task-ID
------------------------------------------------------------------------------------------------------------------------------------------------
  17250741 0.50203 dask-worke username      dr    12/11/2018 17:15:12 default.q@node1045.cm.cluste                                    1
  17250743 0.50089 dask-worke username      dr    12/11/2018 17:15:12 default.q@node2145.cm.cluste                                    1
  17250744 0.50051 dask-worke username      dr    12/11/2018 17:15:12 default.q@node1020.cm.cluste                                    1
  17250745 0.50032 dask-worke username      dr    12/11/2018 17:15:12 default.q@node1059.cm.cluste                                    1
  17250746 0.50021 dask-worke username      dr    12/11/2018 17:15:12 default.q@node2114.cm.cluste                                    1
  17250747 0.50013 dask-worke username      dr    12/11/2018 17:15:12 default.q@node2194.cm.cluste                                    1
  17250748 0.50008 dask-worke username      dr    12/11/2018 17:15:12 default.q@node2195.cm.cluste                                    1
  17250749 0.50004 dask-worke username      dr    12/11/2018 17:15:12 default.q@node2053.cm.cluste                                    1
  17250750 0.50001 dask-worke username      dr    12/11/2018 17:15:12 default.q@node2057.cm.cluste                                    1
  17250751 0.49998 dask-worke username      dr    12/11/2018 17:15:12 default.q@node1060.cm.cluste                                    1
mrocklin commented 5 years ago

My guess is that once the first worker arrives it gets all of the work, and that this work finishes well before the second worker arrives a subsecond later. Maybe try waiting until all of the workers have arrived?

goraj commented 5 years ago

@mrocklin Yes that would make sense. However the workers all spawn and run nearly instantly and at the same time. Still when submitting long running jobs they still all get submitted to the same node and run on the same PID.

Sleeping between scale and submit seems to solves this. I assume my idea of the scheduler is wrong. The workers all spawn and run well before the first job is processed. And still all of them are pushed to that node without the sleep between scale and submit.

For future reference: The following script is able to run all jobs on different nodes, unfortunately it requires all workers to run before we can submit:

# -*- coding: utf-8 -*-
import os
import pandas as pd
from dask.distributed import Client
from dask_jobqueue import SGECluster
from distributed.deploy.local import LocalCluster
import time
import socket
from datetime import datetime

def do_process():
    hostname = socket.gethostname()
    pid = os.getpid()
    return hostname, pid, str(datetime.now())

if __name__ == '__main__':
    memory = '10'
    n_workers = 2
    use_cluster = True
    if use_cluster:
        cluster = SGECluster(queue='default.q',
                            processes=1,  # number of python processes per job
                            cores=24,  # cores per job
                            memory='10GB',
                            walltime='1500',
                            resource_spec='m_mem_free=10G')
        cluster.scale(n_workers)
    with Client(cluster) as client:
        pool = []

        while use_cluster and (client.status == 'running') and (len(client.scheduler_info()['workers']) < n_workers):
            time.sleep(1.0)

        print('start submitting jobs')
        for x in range(n_workers):
            print(x)
            pool.append(client.submit(do_process, pure=False, ))

        print('gathering results')
        result = client.gather(pool)
        print(f'result: {result}')
        cluster.close()
        import IPython
        IPython.embed()
npaulson commented 5 years ago

I modified my code to include @mrocklin 's suggestion but still only one of the nodes is doing any work. I verified this on the dashboard by having my task instantiate a large numpy array. On the dashboard I see both nodes being initialized, but processing is only done on one of the nodes.

@guillaumeeb , I also simplified the script so that only one process is started per node.

The updated script is included below:

import time
import logging
import numpy as np
from distributed import Client
from dask_jobqueue import SLURMCluster
from socket import gethostname

logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.DEBUG)

def slow_increment(x):
    m = np.random.random((100000, 10000))
    time.sleep(10)
    return [x + 1, gethostname(), time.time()]

cluster = SLURMCluster(
    queue='bdwall',
    cores=1,
    memory='128GB',
    project='BAYES_THERMO',
    walltime='00:05:00',
    job_extra=['-o myjob.%j.%N.out',
               '-e myjob.%j.%N.error'])

cluster.scale(2)

client = Client(cluster)

A = client.map(slow_increment, range(8))

time.sleep(10)

B = client.gather(A)

print('\n::: start user output :::')
print(client)
print('::: end user output :::\n')

for res in B:
    print(res)

client.close()
mrocklin commented 5 years ago

Again, if there are more than eight threads in your first worker that worker will get all of the tasks and probably won't share with the others when they come on.

On Wed, Dec 12, 2018 at 11:39 AM Noah Paulson notifications@github.com wrote:

I modified my code to include @mrocklin https://github.com/mrocklin 's suggestion but still only one of the nodes is doing any work. I verified this on the dashboard by having my task instantiate a large numpy array. On the dashboard I see both nodes being initialized, but processing is only done on one of the nodes.

@guillaumeeb https://github.com/guillaumeeb , I also simplified the script so that only one process is started per node.

The updated script is included below:

import time import logging import numpy as np from distributed import Client from dask_jobqueue import SLURMCluster from socket import gethostname

logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.DEBUG)

def slow_increment(x): m = np.random.random((100000, 10000)) time.sleep(10) return [x + 1, gethostname(), time.time()]

cluster = SLURMCluster( queue='bdwall', cores=1, memory='128GB', project='BAYES_THERMO', walltime='00:05:00', job_extra=['-o myjob.%j.%N.out', '-e myjob.%j.%N.error'])

cluster.scale(2)

client = Client(cluster)

A = client.map(slow_increment, range(8))

time.sleep(10)

B = client.gather(A)

print('\n::: start user output :::') print(client) print('::: end user output :::\n')

for res in B: print(res)

client.close()

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/dask-jobqueue/issues/206#issuecomment-446654857, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszDOVWX8DyWqjwIt_qnW8qELj9GzAks5u4TE8gaJpZM4ZBsKj .

guillaumeeb commented 5 years ago

There are two different situations here.

@goraj why do you want your tasks to be submitted to different workers? I think @mrocklin comments are for you. There are other solutions if you want only one task per node than using sleep and waiting for all the cluster to be up.

@npaulson I still don't see where the problem can be. I will try to execute your code, but I don't expect to run on the same issue. There should be scheduling error if no tasks can be sent to one of your worker. @mrocklin is there a way to see this kind of errors?

goraj commented 5 years ago

@guillaumeeb so I have hundreds of very long running jobs that are not thread-safe. I also want each to run on its own cluster node and use all available cpus there.

Unfortunately I cannot figure out how to get each job to one cluster node using dask-jobqueue without using the wait like so:

while use_cluster and (client.status == 'running') and (len(client.scheduler_info()['workers']) < n_workers):
            time.sleep(1.0)

It is likely that my mental model is wrong and I would highly appreciate any tips or changes I should consider. Thank you all for the help and sorry for hijacking this issue, I thought I had the same problem as @npaulson.

guillaumeeb commented 5 years ago

I have hundreds of very long running jobs that are not thread-safe. I also want each to run on its own cluster node and use all available cpus there.

That's what I imagined. Please look at #181. Seems like we should definitly document this!

goraj commented 5 years ago

@guillaumeeb Thank you very much!

guillaumeeb commented 5 years ago

@npaulson any update on this? I tried your simple scrit and as expected everything looks fine:

::: start user output :::
<Client: scheduler='tcp://10.120.43.88:46417' processes=2 cores=2>
::: end user output :::

[1, 'node117.sis.cnes.fr', 1546521430.2696798]
[2, 'node118.sis.cnes.fr', 1546521430.313745]
[3, 'node117.sis.cnes.fr', 1546521451.3178716]
[4, 'node118.sis.cnes.fr', 1546521451.4473453]
[5, 'node117.sis.cnes.fr', 1546521472.3758643]
[6, 'node118.sis.cnes.fr', 1546521472.4892852]
[7, 'node117.sis.cnes.fr', 1546521493.4334419]
[8, 'node118.sis.cnes.fr', 1546521493.56492]

There's probably something weird with your network or system configuration. I cannot understand why the scheduler cannot address one of the worker.

npaulson commented 5 years ago

@guillaumeeb Thank you for running the script - your outputs look perfectly fine to me as well.

I did try specifying various interfaces as a keyword in SLURMCluster from a list of interface options I obtained from a node on my cluster, but these either gave a key error:

Traceback (most recent call last):
  File "/home/npaulson/miniconda/envs/dask-jobqueue/lib/python3.6/runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "/home/npaulson/miniconda/envs/dask-jobqueue/lib/python3.6/runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "/home/npaulson/miniconda/envs/dask-jobqueue/lib/python3.6/site-packages/distributed/cli/dask_worker.py", line 259, in <module>
    go()
  File "/home/npaulson/miniconda/envs/dask-jobqueue/lib/python3.6/site-packages/distributed/cli/dask_worker.py", line 255, in go
    main()
  File "/home/npaulson/miniconda/envs/dask-jobqueue/lib/python3.6/site-packages/click/core.py", line 764, in __call__
    return self.main(*args, **kwargs)
  File "/home/npaulson/miniconda/envs/dask-jobqueue/lib/python3.6/site-packages/click/core.py", line 717, in main
    rv = self.invoke(ctx)
  File "/home/npaulson/miniconda/envs/dask-jobqueue/lib/python3.6/site-packages/click/core.py", line 956, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/home/npaulson/miniconda/envs/dask-jobqueue/lib/python3.6/site-packages/click/core.py", line 555, in invoke
    return callback(*args, **kwargs)
  File "/home/npaulson/miniconda/envs/dask-jobqueue/lib/python3.6/site-packages/distributed/cli/dask_worker.py", line 206, in main
    host = get_ip_interface(interface)
  File "/home/npaulson/miniconda/envs/dask-jobqueue/lib/python3.6/site-packages/distributed/utils.py", line 166, in get_ip_interface
    for info in psutil.net_if_addrs()[ifname]:
KeyError: 'ib1'

or this error:

Traceback (most recent call last):
  File "jobqueue_test1.py", line 25, in <module>
    '-e myjob.%j.%N.error'])
  File "/home/npaulson/miniconda/envs/dask-jobqueue/lib/python3.6/site-packages/dask_jobqueue/slurm.py", line 69, in __init__
    super(SLURMCluster, self).__init__(**kwargs)
  File "/home/npaulson/miniconda/envs/dask-jobqueue/lib/python3.6/site-packages/dask_jobqueue/core.py", line 223, in __init__
    kwargs.setdefault('ip', get_ip_interface(interface))
  File "/home/npaulson/miniconda/envs/dask-jobqueue/lib/python3.6/site-packages/distributed/utils.py", line 169, in get_ip_interface
    raise ValueError("interface %r doesn't have an IPv4 address" % (ifname,))
ValueError: interface 'ib0' doesn't have an IPv4 address
guillaumeeb commented 5 years ago

This is probably not due to dask-jobqueue configuration, as scheduler is listening on all interface per defaut, and the Bokeh Dashboard too. See https://github.com/dask/dask-jobqueue/blob/master/dask_jobqueue/core.py#L227-L231.

So changing interface should not solve things... I'm not 100% sure for every network config though, not an expert here. You may want to try defining ip kwarg (scheduler ip) to an interface you're certain the worker nodes can see.

Another thing would be to try launching all this manually: https://docs.dask.org/en/latest/setup/hpc.html#using-a-shared-network-file-system-and-a-job-scheduler.

Yet another could be to start dask-jobqueue from an interactive job, so that both Scheduler and Workers are on the compute nodes of your cluster.

guillaumeeb commented 5 years ago

As workers seem to connect to your scheduler, interface is probably not the= problem. Have you also tried setting local-directory kwarg?

Would need both stderr outputs from all of your jobs to go deeper.

npaulson commented 5 years ago

The first thing I tried was to setting the local-directory. This choice does register with Dask but doesn't fix my problem (still only one node does any processing).

I next tried to manually set up a cluster using the suggested method. Unfortunately this doesn't seem to work either. I can start the scheduler, but the scheduler.json file doesn't seem to recognize the new workers whether they are started on the same or different nodes. I'll try to include the outputs and commands from this process below:

First I log onto one of the allocated nodes and navigate to it's scratch directory. I then start the scheduler as follows:

dask-scheduler --scheduler-file scheduler.json &

resulting in the following output:

Warning: Your terminal does not set locales.

If you use unicode text inputs for command line options then this may cause
undesired behavior.  This is rare.

If you don't use unicode characters in command line options then you can safely
ignore this message.  This is the common case.

You can support unicode inputs by specifying encoding environment variables,
though exact solutions may depend on your system:

    $ export LC_ALL=C.UTF-8
    $ export LANG=C.UTF-8

For more information see: http://click.pocoo.org/5/python3/

distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:  tcp://10.70.129.244:8786
distributed.scheduler - INFO -       bokeh at:                     :8787
distributed.scheduler - INFO - Local Directory: /scratch/scheduler-gt547cbi
distributed.scheduler - INFO - -----------------------------------------------

and a scheduler.json file with the following contents:

{
  "type": "Scheduler",
  "id": "Scheduler-082e55ff-1e4f-4dc0-b392-d3710025c81e",
  "address": "tcp://10.70.129.244:8786",
  "services": {
    "bokeh": 8787
  },
  "workers": {}

Then, on a separate node I start a worker using the following command:

dask-worker --scheduler-file tcp://10.70.129.244:8786

Resulting in the following output on the worker node:

Warning: Your terminal does not set locales.

If you use unicode text inputs for command line options then this may cause
undesired behavior.  This is rare.

If you don't use unicode characters in command line options then you can safely
ignore this message.  This is the common case.

You can support unicode inputs by specifying encoding environment variables,
though exact solutions may depend on your system:

    $ export LC_ALL=C.UTF-8
    $ export LANG=C.UTF-8

For more information see: http://click.pocoo.org/5/python3/

distributed.nanny - INFO -         Start Nanny at: 'tcp://10.70.129.245:43635'
distributed.worker - INFO -       Start worker at:  tcp://10.70.129.245:42121
distributed.worker - INFO -          Listening to:  tcp://10.70.129.245:42121
distributed.worker - INFO -              nanny at:        10.70.129.245:43635
distributed.worker - INFO -              bokeh at:        10.70.129.245:43097
distributed.worker - INFO - Waiting to connect to:   tcp://10.70.129.244:8786
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                         36
distributed.worker - INFO -                Memory:                  135.08 GB
distributed.worker - INFO -       Local Directory: /blues/gpfs/home/npaulson/dask-worker-space/worker-d91hjc5b
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to:   tcp://10.70.129.244:8786
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection

as well as the following recognition on the scheduler node:

distributed.scheduler - INFO - Register tcp://10.70.129.245:42121
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.70.129.245:42121
distributed.core - INFO - Starting established connection

however, none of this shows up in the scheduler.json file

guillaumeeb commented 5 years ago

First I log onto one of the allocated nodes and navigate to it's scratch directory

Is this a directory local to the host? You should use a shared file system location, visible from all your cluster hosts.

dask-worker --scheduler-file tcp://10.70.129.244:8786

You need to start dask-worker with the file created by the dask-scheduler (visible from all your nodes):

dask-worker --scheduler-file scheduler.json

It looks like your worker is able to connect even with these options?

however, none of this shows up in the scheduler.json file

This is pretty normal, you won't have anything but the Scheduler information in the scheduler file.

Once you've created your cluster on several nodes, you should connect to the Scheduler with the client, and launch your test:

client = Client(scheduler_file='scheduler.json')
npaulson commented 5 years ago

So I think you are right, I was putting the scheduler on the node's scratch space, which I thought was shared but I have to assume is actually not (I'll confirm with the sysadmins). I have no idea why it still recognized the workers, I couldn't get that to work this time.

In any case, when I put the scheduler in my working directory and the workers anywhere else (including the node's scratch space), I can successfully connect to the scheduler with the client (as long as I define the path to the scheduler when I initialize the workers). Furthermore, I'm able to use the workers on all the nodes. My next step will be to build a script to automate starting the scheduler and workers.

@guillaumeeb, thanks so much for your persistent help with this problem! While it would certainly be easier to work with dask-jobqueue, I think this will be a workable solution for me.

guillaumeeb commented 5 years ago

Hm, but if it work manually, it should with dask-jobqueue... I wish I could be able to directly test on you system!

npaulson commented 5 years ago

Haha, I wish you could as well.

In any case, I have what I need to move forward, but I'd be happy to test out other ideas if you think of anything.

guillaumeeb commented 5 years ago

Could you sum up the different steps you take to start the Dask cluster on your system?

npaulson commented 5 years ago

Yes, here is a step by step for my system: 1) allocate desired number of nodes 2) ssh into nodes and initialize python environments 3) on one of the nodes start the scheduler in the working directory:

$ dask-scheduler --scheduler-file scheduler.json &

4) start a worker on each node, using --nprocs to define the number of desired processes per node. You can also start workers in the node scratch space as long as you provide a path the the scheduler:

dask-worker --scheduler-file scheduler.json --nprocs 2 --nthreads 1 &

5) get the client as follows

>>> from distributed import Client
>>> client = Client(scheduler_file='scheduler.json'

This can also be done in a SLURM job submission script using the srun command

guillaumeeb commented 5 years ago

Thanks, and what about when you're using dask-jobqueue? Do you launch it from an login node, or do you start an interactive job first?

npaulson commented 5 years ago

When I was trying to use dask-jobqueue I used a submit script from a login node.

lesteve commented 5 years ago

My main guess is that you are getting two workers on the same node just because SLURM is giving you two jobs on the same node. What I would suggest:

When I was trying to use dask-jobqueue I used a submit script from a login node.

For debugging, I find an interactive workflow quite nice. I would suggest looking at the 30 minute "Dask on HPC introduction" video Matthew Rocklin.

guillaumeeb commented 5 years ago

My main guess is that you are getting two workers on the same node just because SLURM is giving you two jobs on the same node

@lesteve, the OP was that only one dask-worker was performing tasks, despite the fact that two workers were connected to Scheduler. I don't think this could come from two workers on the same node, but I may have missed something, did you spot a detail?

When I was trying to use dask-jobqueue I used a submit script from a login node.

@npaulson could you try using dask-jobqueue from an interactive session on a compute node to see if your original test works?

lesteve commented 5 years ago

@lesteve, the OP was that only one dask-worker was performing tasks, despite the fact that two workers were connected to Scheduler. I don't think this could come from two workers on the same node, but I may have missed something, did you spot a detail?

I was just looking at the top post. I have to say that this issue went in many different directions so this is a bit hard to follow ...

Maybe the main useful thing from my suggestions is to do this before client.map:

while len(client.scheduler_info()["workers"]) < 2:
    time.sleep(1)
npaulson commented 5 years ago

@guillaumeeb I recreated my script in an interactive job and it is now appearing to use both nodes @lesteve Your last suggestion also works! I guess my script wasn't giving SLURM enough time to spin up the nodes I requested?

guillaumeeb commented 5 years ago

Well, that's great! Guess we can finally close this one?

npaulson commented 5 years ago

Yes, thank you both for your help!

lesteve commented 5 years ago

@lesteve Your last suggestion also works! I guess my script wasn't giving SLURM enough time to spin up the nodes I requested?

I guess, this is what was happening indeed.

fabiansvara commented 4 years ago

I don't think this should be closed as long as cluster.scale() doesn't wait for the workers to spin up.

This is super hard to debug for someone new to dask, and super easy to get wrong. The first assumption when facing this issue is that the scheduling works differently than expected (which wouldn't be surprising as the documentation is a bit terse).

Currently, there is another closed (!) issue (#304) probably due to the same problem, which doesn't contain a resolution but contains a misleading statement regarding scale():

I'll recommand to use scale instead of start_worker, but mainly to make sure all the workers are started before submitting the tasks.

... making it particularly hard to get to the bottom of this if one has the misfortune of reading this statement while trying to fix it.

lesteve commented 4 years ago

client.scale submit jobs to your job scheduler (SLURM, PBS, etc ...) and indeed does not wait for jobs to start, you can use client.wait_for_workers if you want to wait before some (or all) of your jobs actually start running.

You are probably right that this could be made more clear in the documentation. I invite you to open a PR to improve the documentation.

fabiansvara commented 4 years ago

The API looks analogous to the standard library's multiprocessing, i.e.

p = multiprocessing.Pool(num_workers)
p.map(...)

... which makes the current behavior quite counter-intuitive.

I could see how you might want the current behavior if you are starting many workers that may not be available immediately, but even then the scheduling is going to be unpredictable and may well not be what you want.

If you want to preserve this possibility, a good solution might be to have a wait_for_workers=False keyword argument to scale().

mrocklin commented 4 years ago

Thank you for your interest in the project @fabiansvara . If you'd like the community to change this behavior then I encourage you to raise a new issue at https://github.com/dask/distributed/issues/new . The scale behavior is shared among all Dask deployment projects (hpc, cloud, kubernetes, yarn) and so isn't a decision that will be made in the dask-jobqueue repository.

I understand that now, but is that really sensible behavior? .. but that assumption is going to waste you a day, sort of invalidating the whole idea of using dask as a clean wrapper around the shellscript hell of classic job schedulers

It sounds like you've had a frustrating experience, for which I apologize. Even with that though I encourage you to keep a positive tone. Most people who work on Dask are volunteers and they read dozens of messages a day (sometimes hundreds). Keeping a positive tone can make their volunteer activities a lot more pleasant, which reduces OSS maintainer fatigue.

fabiansvara commented 4 years ago

Apologies, I have edited my message to be more neutral and to the point. I really appreciate the work of the dask community!

lesteve commented 4 years ago

Thanks a lot for editing your mesage! Of course we all get frustrated and we all wish things were simpler and documentation made the caveat we bumped into more obvious but yeah maintainer fatigue is a real thing. On this topic, I really like Setting expectations for open source participation by Brett Cannon.

About Dask-Jobqueue in general and the change you are proposing, I am not really convinced waiting for all jobs to start after .scale is an acceptable default. How long your job takes to start depends on lot on your cluster (also you may be billed for hours when your job doesn't do any computation). In general I feel that in most cases people are fine for the computation to start before all the jobs start and then continue on the Dask workers as they connect to the scheduler (Dask scheduler does this automatically). If that's not the case for you (or if you hit issues with this approach) I would be interested to hear more about your use case.

I am not an expert on this but it feels like Dask-MPI has more an approach where you get all your workers running at the same time so maybe that would be something to investigate for you (being familiar with MPI first would probably be a prerequisite)

fabiansvara commented 4 years ago

My use case is pretty simple: Every job (CNN inference) needs to run on a different cluster node, since they need a GPU and each node has exactly one. I know how many SLURM jobs (dask workers) I can request without having to wait. SLURM starts all the jobs (=dask workers) within less than a second.

However, when calling map right after scale, only one worker ever gets any jobs and all but one of the cluster nodes I requested are sitting idle.

As it is currently, doing anything other than waiting after calling scale is basically guaranteed to be incorrect. Even if you didn't want to wait for all the workers to be ready, you'd have to wait in a loop to monitor the number of available workers and submit the jobs in manually generated batches - basically rolling your own scheduling for the initial start-up phase.

An alternative to having scale wait for worker startup could be for map to schedule jobs to workers that are expected to start up soon and / or to reschedule jobs from busy workers to idle workers.

mrocklin commented 4 years ago

It sounds like you either want to use client.wait_for_workers or dask-mpi, as @lesteve suggested.

On Sun, Jun 21, 2020 at 4:53 PM fabiansvara notifications@github.com wrote:

My use case is pretty simple: Every job (CNN inference) needs to run on a different cluster node, since they need a GPU and each node has exactly one. I know how many SLURM jobs (dask workers) I can request without having to wait. SLURM starts all the jobs within less than a second.

However, when calling map right after scale, only one worker ever gets any jobs.

As it is currently, doing anything other than waiting after calling scale is basically guaranteed to be incorrect. Even if you didn't want to wait for all the workers to be ready, you'd have to wait in a loop to monitor the number of active workers and submit the jobs in manually generated batches - basically rolling your own scheduling for the initial start-up phase.

An alternative to having scale wait for worker startup could be for map to schedule jobs to workers that are expected to start up soon and / or to reschedule jobs from busy workers to idle workers.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/dask-jobqueue/issues/206#issuecomment-647198193, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTB23ZLJ4UAU7UA2TMLRX2MN5ANCNFSM4GIGYKRQ .

lesteve commented 4 years ago

(CNN inference)

You may be interested to look at: https://examples.dask.org/machine-learning/torch-prediction.html. It is with a LocalCluster but easy to translate to a SLURMCluster.

I know how many SLURM jobs (dask workers) I can request without having to wait.

I would say in a typical HPC cluster this is rather unusual but I may be biased by the HPC clusters I am familiar with ... generally you have no idea how long you have to wait for your one job to start (it can depend on how people are using the cluster, on your past usage and probably on other factors) and even less for many jobs to start.

reschedule jobs from busy workers to idle workers.

Work stealing does that in principle automatically for you. If all your jobs do start running (i.e. squeue shows them running) as you say, this does point towards work stealing not kicking in your use case, hard to tell why but maybe the doc can help.

ocaisa commented 4 years ago

I also think dask-mpi is what you want for your use case, the workers will start instantly and you can tweak your SLURM job requirements to give you the distribution you want (1 MPI task per node would mean one worker per node, no need for a map step)

fabiansvara commented 4 years ago

Work stealing does that in principle automatically for you. If all your jobs do start running (i.e. squeue shows them running) as you say, this does point towards work stealing not kicking in your use case, hard to tell why but maybe the doc can help.

If that worked it would make the whole issue moot. According to the work stealing docs:

If a task has been specifically restricted to run on particular workers (such as is the case when special hardware is required) then we do not steal.

... depending on how exactly I interpret this sentence, it might explain what's going on. I instantiate the SLURMCluster with extra=['--resources GPU=1'] and call map with resources={'GPU': 1}. This is because the GPU nodes also have a lot of CPUs, so they could also be used for running multiple CPU-bound jobs in other cases.

If I interpret the sentence from the doc to mean "jobs where resources were specified will not be stolen", then that's the problem. Here in my case, it would be perfectly fine for other workers that have the same resource available to steal the jobs.

Edit: It also explains the trouble that the person in #304 was having.

lesteve commented 4 years ago

If I interpret the sentence from the doc to mean "jobs where resources were specified will not be stolen", then that's the problem.

This was indeed a limitation but a PR supposed to fix this (https://github.com/dask/distributed/pull/3069) has been merged. It seems like this is in 2.12 so maybe check that you distributed version is recent enough.

If that still does not work for you this could be a problem with the PR or work stealing that does not kick in for some reason in your particular case ... hard to tell ...

And of course the distributed doc may also need to be fixed as well ...

fabiansvara commented 4 years ago

Hm, I have 2.18.0. Anything I can do to narrow down what's going on?

lesteve commented 4 years ago

Yeah that was my guess that your version was recent enough.

could you first post a snippet to show how you are creating your cluster and using it. In an ideal world a stand-alone snippet reproducing the problem would be great.

Here are a list of suggestions at this point: