dask / dask-yarn

Deploy dask on YARN clusters
http://yarn.dask.org
BSD 3-Clause "New" or "Revised" License
69 stars 41 forks source link

dask-yarn not requesting the exact number of workers i specified #59

Open fvannee opened 5 years ago

fvannee commented 5 years ago

I'm running the following, very simple example, with dask-yarn.

cluster = YarnCluster(environment='venv:///home/florisvannee/my_envsd',
            worker_vcores=1,
            worker_memory="256MiB")
cluster.scale(1)
client = Client(cluster)

def inc(x):
    return x + 1

a = client.submit(inc, 20)
print(a)
print(a.result())

This never completes. If I change the call to scalewith any number higher than 1, it does complete. Eg. with cluster.scale(2) I get the following output in the log:

19/04/19 14:07:05 INFO skein.ApplicationMaster: REQUESTED: dask.scheduler_0
19/04/19 14:07:06 INFO skein.ApplicationMaster: Starting container_1555675382310_0003_01_000002...
19/04/19 14:07:06 INFO skein.ApplicationMaster: RUNNING: dask.scheduler_0 on container_1555675382310_0003_01_000002
19/04/19 14:07:07 INFO skein.ApplicationMaster: Scaling service 'dask.worker' to 2 instances, a delta of 2.
19/04/19 14:07:07 INFO skein.ApplicationMaster: REQUESTED: dask.worker_0
19/04/19 14:07:07 INFO skein.ApplicationMaster: REQUESTED: dask.worker_1
19/04/19 14:07:12 INFO skein.ApplicationMaster: Starting container_1555675382310_0003_01_000003...
19/04/19 14:07:12 INFO skein.ApplicationMaster: RUNNING: dask.worker_1 on container_1555675382310_0003_01_000003

Even though it looks like it is requesting worker_0, it never gets it. When I look in the overall resourcemanager log, I never see any request for worker_0 though. I only see requests for worker 1. Note that this is not, because there is in this example no work (for more complex examples with more load I see exactly the same behavior, the first worker never gets properly requested). Also, when running with cluster.scale(1), it never completes because that first worker is just never started up. Running scale() with any number higher than 2 always results in all workers being started except worker_0.

Any ideas what could be causing this?

jcrist commented 5 years ago

Looks like the request for dask.worker_0 is never being fullfilled by YARN's resource manager (or if it is we're not accepting it). This works in our tests (and on several users systems), so it's something specific to your system. Can you run the following (untested) script to debug? What version of hadoop are you using (and if provided by a distribution (e.g. cloudera) what distribution)?

import time
import dask_yarn
import skein

print(dask_yarn.__version__)
print(skein.__version__)

# Use an internal routine to make the spec for us easily.
spec = dask_yarn.core._make_specification(
    environment="venv:///home/florisvannee/my_envsd",
    worker_vcores=1,
    worker_memory="256MiB"
)
spec.master.log_level = 'debug'

cluster = dask_yarn.YarnCluster.from_specification(spec)

def running():
    return len(c for c in cluster.workers() if c.state == 'running')

print("N workers == %d" % running())
print("Application id: %s" % cluster.app_id)

def wait_for_workers(timeout):
    for i in range(timeout):
        n = running()
        if n:
            print("%d workers have started" % n)
            break
        time.sleep(1)
    else:
        print("No workers after %d seconds" % timeout)

cluster.scale(1)
wait_for_workers(60)
cluster.scale(2)
wait_for_workers(60)
cluster.scale(0)
time.sleep(2)
cluster.close()

After its completed, can you get the logs from the application and post them here along with the output of that script?

fvannee commented 5 years ago

Thanks for your help! I'm running Hadoop v3.1.2 just on a regular server that I used to set it up on (pseudo-distributed, everything running on one node). Below you can find the output of the script, as well as from the master process. Attached you can find the logs from the Hadoop YARN process. In these logs, you can see that the only requests for a container that were done were for the master and the dask scheduler. The worker_0 is never requested. Then one minute later there's a request for a worker, when the first one timed out.

0.5.2
0.6.1
19/04/19 19:52:20 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/04/19 19:52:21 INFO skein.Driver: Driver started, listening on 37532
19/04/19 19:52:21 INFO conf.Configuration: resource-types.xml not found
19/04/19 19:52:21 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
19/04/19 19:52:21 INFO skein.Driver: Uploading application resources to hdfs://opamux0977:9000/user/florisvannee/.skein/application_1555696188572_0002
19/04/19 19:52:21 INFO skein.Driver: Submitting application...
19/04/19 19:52:21 INFO impl.YarnClientImpl: Submitted application application_1555696188572_0002
19/04/19 19:52:25 INFO skein.Driver: Driver shut down
N workers == 0
Application id: application_1555696188572_0002
No workers after 60 seconds
1 workers have started
2019-04-19 19:53:29,370 - distributed.comm.tcp - DEBUG - Setting TCP keepalive: nprobes=10, idle=10, interval=2
2019-04-19 19:53:29,370 - distributed.comm.tcp - DEBUG - Setting TCP user timeout: 30000 ms
2019-04-19 19:53:29,372 - distributed.comm.tcp - DEBUG - Setting TCP keepalive: nprobes=10, idle=10, interval=2
2019-04-19 19:53:29,373 - distributed.comm.tcp - DEBUG - Setting TCP user timeout: 30000 ms
2019-04-19 19:53:29,377 - distributed.client - DEBUG - Started scheduling coroutines. Synchronized

And this of the master:

19/04/19 19:52:22 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19/04/19 19:52:22 INFO skein.ApplicationMaster: Running as user florisvannee
19/04/19 19:52:22 INFO conf.Configuration: resource-types.xml not found
19/04/19 19:52:22 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
19/04/19 19:52:22 INFO skein.ApplicationMaster: Application specification successfully loaded
19/04/19 19:52:23 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8030
19/04/19 19:52:23 INFO skein.ApplicationMaster: gRPC server started at opamux0977.optiver.com:45245
19/04/19 19:52:23 DEBUG skein.WebUI: Serving resources from jar:file:/tmp/hadoop-florisvannee/nm-local-dir/usercache/florisvannee/appcache/application_1555696188572_0002/filecache/10/skein.jar!/META-INF/resources/
19/04/19 19:52:23 INFO skein.ApplicationMaster: WebUI server started at opamux0977.optiver.com:34639
19/04/19 19:52:23 INFO skein.ApplicationMaster: Registering application with resource manager
19/04/19 19:52:23 DEBUG skein.ApplicationMaster: Determining resources available for application master
19/04/19 19:52:23 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/04/19 19:52:23 INFO skein.ApplicationMaster: Initializing service 'dask.worker'.
19/04/19 19:52:23 INFO skein.ApplicationMaster: Initializing service 'dask.scheduler'.
19/04/19 19:52:23 INFO skein.ApplicationMaster: REQUESTED: dask.scheduler_0
19/04/19 19:52:23 DEBUG skein.ApplicationMaster: Starting allocator thread
19/04/19 19:52:23 DEBUG skein.ApplicationMaster: Heartbeat intervals [idle: 5000 ms, pending: 1000 ms]
19/04/19 19:52:24 DEBUG skein.ApplicationMaster: New watch stream created [stream: 1577411635]
19/04/19 19:52:24 DEBUG skein.ApplicationMaster: Created watcher [stream: 1577411635, watcher: 0, start: 'dask.scheduler', end: 'dask.scheduler�', type: PUT]
19/04/19 19:52:25 DEBUG skein.ApplicationMaster: Received 1 new containers
19/04/19 19:52:25 INFO skein.ApplicationMaster: Starting container_1555696188572_0002_01_000002...
19/04/19 19:52:25 INFO skein.ApplicationMaster: RUNNING: dask.scheduler_0 on container_1555696188572_0002_01_000002
19/04/19 19:52:25 DEBUG skein.ApplicationMaster: Removed watcher [stream: 1577411635, watcher: 0]
19/04/19 19:52:26 INFO skein.ApplicationMaster: Scaling service 'dask.worker' to 1 instances, a delta of 1.
19/04/19 19:52:26 INFO skein.ApplicationMaster: REQUESTED: dask.worker_0
19/04/19 19:53:26 INFO skein.ApplicationMaster: Scaling service 'dask.worker' to 2 instances, a delta of 1.
19/04/19 19:53:26 INFO skein.ApplicationMaster: REQUESTED: dask.worker_1
19/04/19 19:53:28 DEBUG skein.ApplicationMaster: Received 1 new containers
19/04/19 19:53:28 INFO skein.ApplicationMaster: Starting container_1555696188572_0002_01_000003...
19/04/19 19:53:28 INFO skein.ApplicationMaster: RUNNING: dask.worker_1 on container_1555696188572_0002_01_000003
19/04/19 19:53:29 INFO skein.ApplicationMaster: KILLED: dask.worker_0 - Killed by user request.
19/04/19 19:53:29 DEBUG skein.ApplicationMaster: New watch stream created [stream: 232174643]
19/04/19 19:53:29 DEBUG skein.ApplicationMaster: Created watcher [stream: 232174643, watcher: 1, start: 'dask.scheduler', end: 'dask.scheduler�', type: PUT]
19/04/19 19:53:29 DEBUG skein.ApplicationMaster: Removed watcher [stream: 232174643, watcher: 1]
19/04/19 19:53:31 INFO skein.ApplicationMaster: Shutting down: Shutdown requested by user.
19/04/19 19:53:31 DEBUG skein.ApplicationMaster: Stopping allocator thread
19/04/19 19:53:31 INFO skein.ApplicationMaster: Unregistering application with status SUCCEEDED
19/04/19 19:53:31 INFO impl.AMRMClientImpl: Waiting for application to be successfully unregistered.
19/04/19 19:53:31 DEBUG skein.ApplicationMaster: Watch stream canceled [stream: 1577411635]
19/04/19 19:53:31 INFO skein.ApplicationMaster: Deleted application directory hdfs://opamux0977:9000/user/florisvannee/.skein/application_1555696188572_0002
19/04/19 19:53:31 INFO skein.ApplicationMaster: WebUI server shut down
19/04/19 19:53:31 INFO skein.ApplicationMaster: gRPC server shut down

yarn_output.txt

jcrist commented 5 years ago

Huh, that's a new one. I'm a bit baffled here, as we're successfully calling rmClient.addContainerRequest for both worker 0 and worker 1, but the resource manager never receives the request for worker 0. We test on Hadoop 2.6 (CDH 5) and Hadoop 3.0 (CDH 6) and both of those seem to work fine (at least for the configurations we test with).

I'm not sure what's going on here, and likely won't have time to debug until maybe next week. If you're not set on a specific hadoop version you might try switching to a different one and trying things again.

fvannee commented 5 years ago

Alright, I can try if it occurs with a different version as well. Any pointers you'd have for me if I wanted to try and debug it myself though? I might give it a go if I have some time, even though it'd probably take me a while to get familiar with the code :-)

jcrist commented 5 years ago

Any pointers you'd have for me if I wanted to try and debug it myself though?

The application master is part of the Skein library - development instructions are here: https://jcrist.github.io/skein/develop.html. Given a maven install pip install from git should work fine.

I'd try to remove the dask dependency and just submit a bash based service:

import time
import skein
client = skein.Client()

spec = skein.ApplicationSpec.from_yaml(
"""
name: test

services:
  sleeper:
    resources:
      memory: 512 MiB
      vcores: 1
    script: sleep infinity
""")

app = client.submit_and_connect(spec)
app.scale('sleeper', 1)
time.sleep(20)
app.scale('sleeper', 2)
time.sleep(20)
app.shutdown()

The application master sends a container request to the resource manager here:

https://github.com/jcrist/skein/blob/95421a05fbaf765717b6cf9e0c7753dc11163195/java/src/main/java/com/anaconda/skein/ApplicationMaster.java#L1109

From the logs you can see this was called for all containers, but the resource manager never receives one of them. This seems specific to your setup, and may be a yarn bug (wouldn't be the first one we've hacked around). If you do give it a try feel free to post here with questions and I'll try to answer them pretty quickly - I'm just unlikely to get to this soon myself.

fvannee commented 5 years ago

Ok thanks. To start off, I tried with several other versions (3.2.0, 3.0.3, 2.6.5). None of them worked for me (all of them suffer from the same issue of not getting worker_0). So I think it must be something with the config. I've attached my config, but I don't know what would be so weird about it. I basically just followed the steps as outlined on the Hadoop website to setup a simple one-node cluster in pseudo-distributed manner ( https://hadoop.apache.org/docs/r3.1.2/hadoop-project-dist/hadoop-common/SingleCluster.html ). Changed a few other parameters in the XML, but none of which I could relate to such an error.

config.zip

jcrist commented 5 years ago

Hmmm, nothing stands out in your config. This does work on production systems as well as our test clusters (https://github.com/jcrist/hadoop-test-cluster), so something is odd (not sure what though). I'll try to take a closer look sometime next week.

fvannee commented 5 years ago

Ok, I appreciate you're helping me out! I'll post updates here as I go along in testing it. I tried reproducing without Dask with the infinite sleep script you provided. This script runs fine however. I'm looking at the differences now. One difference is that Dask first spawns the scheduler in another container. I'll try to take a look if I can reproduce that kind of setup without actually spawning Dask.

19/04/20 12:26:23 INFO skein.ApplicationMaster: Initializing service 'sleeper'.
19/04/20 12:26:23 INFO skein.ApplicationMaster: REQUESTED: sleeper_0
19/04/20 12:26:24 INFO skein.ApplicationMaster: Scaling service 'sleeper' to 1 instances, a delta of 0.
19/04/20 12:26:24 INFO skein.ApplicationMaster: Starting container_1555717228394_0006_01_000002...
19/04/20 12:26:24 INFO skein.ApplicationMaster: RUNNING: sleeper_0 on container_1555717228394_0006_01_000002
19/04/20 12:26:44 INFO skein.ApplicationMaster: Scaling service 'sleeper' to 2 instances, a delta of 1.
19/04/20 12:26:44 INFO skein.ApplicationMaster: REQUESTED: sleeper_1
19/04/20 12:26:50 INFO skein.ApplicationMaster: Starting container_1555717228394_0006_01_000003...
19/04/20 12:26:50 INFO skein.ApplicationMaster: RUNNING: sleeper_1 on container_1555717228394_0006_01_000003
jcrist commented 5 years ago

Hmmm, that is odd. Let me know if you can get a reproducible example (even just a docker image I can test in would be nice (but not required)).

fvannee commented 5 years ago

I'm afraid I don't have a Docker image. I did manage to reproduce it on another machine doing the exact same steps as I did on the first one. So I hope following these steps also leads to a reproducible example on your machine as well.

python 3.6 java 1.8 dask_yarn 0.5.2 skein 0.6.1 hadoop 3.1.2

download hadoop 3.1.2 binaries and extract to some directory copy config files from attached file to $HADOOP_HOME/etc/hadoop set environment variables correct in .bashrc and in $HADOOP_HOME/etc/hadoop/hadoop-env.sh (to point to your Hadoop and Java directory) make sure ssh to localhost is configured etc.

bin/hdfs namenode -format sbin/start-dfs.sh sbin/start-yarn.sh python3 test_yarn.py

Attached test_yarn.py is the file you provided earlier to test. It first scales to 1 worker, then to 2. On both of my machines I can see the same behavior as I posted in my earlier post.

config.zip test_yarn.py.txt

jcrist commented 5 years ago

Thanks for the directions, I'll try to reproduce sometime early next week.

fvannee commented 5 years ago

I've got a couple more pointers that hopefully point into the right direction! I managed to work-around the issue in two ways. I'll use a slightly modified version of your script above to show you what I mean. Judging from these results it seems to be something timing related. The file as displayed below only starts the second worker and thus doesn't work correctly. Now as soon as I uncomment the line after (1) or (2), it works as expected.

  1. Explicitly specifying nr_workers=1 starts all workers as expected
  2. Sleeping for a while before scaling up the first worker also fixes the issue
import time
import dask_yarn
import skein

spec = dask_yarn.core._make_specification(
    environment="venv:///home/florisvannee/my_envsd",
    worker_vcores=1,
    # (1)
    #n_workers=1,
    worker_memory="256MiB"
)
spec.master.log_level = 'debug'
cluster = dask_yarn.YarnCluster.from_specification(spec)

def running():
    return len([c for c in cluster.workers() if c.state == 'running'])

print("N workers == %d" % running())
print("Application id: %s" % cluster.app_id)

def wait_for_workers(wait_for, timeout):
    for i in range(timeout):
        n = running()
        if n == wait_for:
            print("%d workers have started" % n)
            break
        time.sleep(1)
    else:
        print("Only %d workers after %d seconds" % (n, timeout))

# (2)
#time.sleep(10)
cluster.scale(1)
wait_for_workers(1, 20)
cluster.scale(2)
wait_for_workers(2, 20)
cluster.scale(0)
time.sleep(3)
cluster.close()

Base output:

19/04/26 14:03:29 INFO skein.ApplicationMaster: Initializing service 'dask.worker'.
19/04/26 14:03:29 INFO skein.ApplicationMaster: Initializing service 'dask.scheduler'.
19/04/26 14:03:29 INFO skein.ApplicationMaster: REQUESTED: dask.scheduler_0
19/04/26 14:03:29 DEBUG skein.ApplicationMaster: Starting allocator thread
19/04/26 14:03:29 DEBUG skein.ApplicationMaster: Heartbeat intervals [idle: 5000 ms, pending: 1000 ms]
19/04/26 14:03:29 DEBUG skein.ApplicationMaster: New watch stream created [stream: 1211172083]
19/04/26 14:03:29 DEBUG skein.ApplicationMaster: Created watcher [stream: 1211172083, watcher: 0, start: 'dask.scheduler', end: 'dask.scheduler�', type: PUT]
19/04/26 14:03:30 DEBUG skein.ApplicationMaster: Received 1 new containers
19/04/26 14:03:30 INFO skein.ApplicationMaster: Starting container_1556201211161_0025_01_000002...
19/04/26 14:03:30 INFO skein.ApplicationMaster: RUNNING: dask.scheduler_0 on container_1556201211161_0025_01_000002
19/04/26 14:03:31 DEBUG skein.ApplicationMaster: Removed watcher [stream: 1211172083, watcher: 0]
19/04/26 14:03:31 INFO skein.ApplicationMaster: Scaling service 'dask.worker' to 1 instances, a delta of 1.
19/04/26 14:03:31 INFO skein.ApplicationMaster: REQUESTED: dask.worker_0
19/04/26 14:03:51 INFO skein.ApplicationMaster: Scaling service 'dask.worker' to 2 instances, a delta of 1.
19/04/26 14:03:51 INFO skein.ApplicationMaster: REQUESTED: dask.worker_1
19/04/26 14:03:53 DEBUG skein.ApplicationMaster: Received 1 new containers
19/04/26 14:03:53 INFO skein.ApplicationMaster: Starting container_1556201211161_0025_01_000003...
19/04/26 14:03:53 INFO skein.ApplicationMaster: RUNNING: dask.worker_1 on container_1556201211161_0025_01_000003
19/04/26 14:03:54 DEBUG skein.ApplicationMaster: New watch stream created [stream: 1430154584]
19/04/26 14:03:54 DEBUG skein.ApplicationMaster: Created watcher [stream: 1430154584, watcher: 1, start: 'dask.scheduler', end: 'dask.scheduler�', type: PUT]
19/04/26 14:03:54 DEBUG skein.ApplicationMaster: Removed watcher [stream: 1430154584, watcher: 1]
19/04/26 14:04:12 INFO skein.ApplicationMaster: KILLED: dask.worker_0 - Killed by user request.
19/04/26 14:04:12 DEBUG skein.ApplicationMaster: Watch stream canceled [stream: 1430154584]
19/04/26 14:04:12 DEBUG skein.ApplicationMaster: Received 1 completed containers
19/04/26 14:04:12 INFO skein.ApplicationMaster: SUCCEEDED: dask.worker_1 - Completed successfully.
19/04/26 14:04:15 INFO skein.ApplicationMaster: Shutting down: Shutdown requested by user.

Fix #1 output:

19/04/26 14:04:30 INFO skein.ApplicationMaster: Initializing service 'dask.worker'.
19/04/26 14:04:30 INFO skein.ApplicationMaster: WAITING: dask.worker_0
19/04/26 14:04:30 INFO skein.ApplicationMaster: Initializing service 'dask.scheduler'.
19/04/26 14:04:30 INFO skein.ApplicationMaster: REQUESTED: dask.scheduler_0
19/04/26 14:04:30 DEBUG skein.ApplicationMaster: Starting allocator thread
19/04/26 14:04:30 DEBUG skein.ApplicationMaster: Heartbeat intervals [idle: 5000 ms, pending: 1000 ms]
19/04/26 14:04:31 DEBUG skein.ApplicationMaster: New watch stream created [stream: 661896165]
19/04/26 14:04:31 DEBUG skein.ApplicationMaster: Created watcher [stream: 661896165, watcher: 0, start: 'dask.scheduler', end: 'dask.scheduler�', type: PUT]
19/04/26 14:04:31 DEBUG skein.ApplicationMaster: Received 1 new containers
19/04/26 14:04:31 INFO skein.ApplicationMaster: Starting container_1556201211161_0026_01_000002...
19/04/26 14:04:31 INFO skein.ApplicationMaster: RUNNING: dask.scheduler_0 on container_1556201211161_0026_01_000002
19/04/26 14:04:31 INFO skein.ApplicationMaster: REQUESTED: dask.worker_0
19/04/26 14:04:32 DEBUG skein.ApplicationMaster: Removed watcher [stream: 661896165, watcher: 0]
19/04/26 14:04:33 DEBUG skein.ApplicationMaster: Received 1 new containers
19/04/26 14:04:33 INFO skein.ApplicationMaster: Starting container_1556201211161_0026_01_000003...
19/04/26 14:04:33 INFO skein.ApplicationMaster: RUNNING: dask.worker_0 on container_1556201211161_0026_01_000003
19/04/26 14:04:34 INFO skein.ApplicationMaster: Scaling service 'dask.worker' to 2 instances, a delta of 1.
19/04/26 14:04:34 INFO skein.ApplicationMaster: REQUESTED: dask.worker_1
19/04/26 14:04:34 DEBUG skein.ApplicationMaster: New watch stream created [stream: 818222217]
19/04/26 14:04:34 DEBUG skein.ApplicationMaster: Created watcher [stream: 818222217, watcher: 1, start: 'dask.scheduler', end: 'dask.scheduler�', type: PUT]
19/04/26 14:04:34 DEBUG skein.ApplicationMaster: Removed watcher [stream: 818222217, watcher: 1]
19/04/26 14:04:39 DEBUG skein.ApplicationMaster: Received 1 new containers
19/04/26 14:04:39 INFO skein.ApplicationMaster: Starting container_1556201211161_0026_01_000004...
19/04/26 14:04:39 INFO skein.ApplicationMaster: RUNNING: dask.worker_1 on container_1556201211161_0026_01_000004
19/04/26 14:04:40 DEBUG skein.ApplicationMaster: Watch stream canceled [stream: 818222217]
19/04/26 14:04:40 DEBUG skein.ApplicationMaster: New watch stream created [stream: 894459300]
19/04/26 14:04:40 DEBUG skein.ApplicationMaster: Created watcher [stream: 894459300, watcher: 2, start: 'dask.scheduler', end: 'dask.scheduler�', type: PUT]
19/04/26 14:04:40 DEBUG skein.ApplicationMaster: Removed watcher [stream: 894459300, watcher: 2]
19/04/26 14:04:43 INFO skein.ApplicationMaster: Shutting down: Shutdown requested by user.

Fix #2 output:

19/04/26 14:05:03 INFO skein.ApplicationMaster: Initializing service 'dask.worker'.
19/04/26 14:05:03 INFO skein.ApplicationMaster: Initializing service 'dask.scheduler'.
19/04/26 14:05:03 INFO skein.ApplicationMaster: REQUESTED: dask.scheduler_0
19/04/26 14:05:03 DEBUG skein.ApplicationMaster: Starting allocator thread
19/04/26 14:05:03 DEBUG skein.ApplicationMaster: Heartbeat intervals [idle: 5000 ms, pending: 1000 ms]
19/04/26 14:05:04 DEBUG skein.ApplicationMaster: New watch stream created [stream: 747946710]
19/04/26 14:05:04 DEBUG skein.ApplicationMaster: Created watcher [stream: 747946710, watcher: 0, start: 'dask.scheduler', end: 'dask.scheduler�', type: PUT]
19/04/26 14:05:05 DEBUG skein.ApplicationMaster: Received 1 new containers
19/04/26 14:05:05 INFO skein.ApplicationMaster: Starting container_1556201211161_0027_01_000002...
19/04/26 14:05:05 INFO skein.ApplicationMaster: RUNNING: dask.scheduler_0 on container_1556201211161_0027_01_000002
19/04/26 14:05:05 DEBUG skein.ApplicationMaster: Removed watcher [stream: 747946710, watcher: 0]
19/04/26 14:05:16 INFO skein.ApplicationMaster: Scaling service 'dask.worker' to 1 instances, a delta of 1.
19/04/26 14:05:16 INFO skein.ApplicationMaster: REQUESTED: dask.worker_0
19/04/26 14:05:20 DEBUG skein.ApplicationMaster: Received 1 new containers
19/04/26 14:05:20 INFO skein.ApplicationMaster: Starting container_1556201211161_0027_01_000003...
19/04/26 14:05:20 INFO skein.ApplicationMaster: RUNNING: dask.worker_0 on container_1556201211161_0027_01_000003
19/04/26 14:05:21 INFO skein.ApplicationMaster: Scaling service 'dask.worker' to 2 instances, a delta of 1.
19/04/26 14:05:21 INFO skein.ApplicationMaster: REQUESTED: dask.worker_1
19/04/26 14:05:21 DEBUG skein.ApplicationMaster: New watch stream created [stream: 327770522]
19/04/26 14:05:21 DEBUG skein.ApplicationMaster: Created watcher [stream: 327770522, watcher: 1, start: 'dask.scheduler', end: 'dask.scheduler�', type: PUT]
19/04/26 14:05:21 DEBUG skein.ApplicationMaster: Removed watcher [stream: 327770522, watcher: 1]
19/04/26 14:05:26 DEBUG skein.ApplicationMaster: Received 1 new containers
19/04/26 14:05:26 INFO skein.ApplicationMaster: Starting container_1556201211161_0027_01_000004...
19/04/26 14:05:26 INFO skein.ApplicationMaster: RUNNING: dask.worker_1 on container_1556201211161_0027_01_000004
19/04/26 14:05:27 DEBUG skein.ApplicationMaster: Watch stream canceled [stream: 327770522]
19/04/26 14:05:27 DEBUG skein.ApplicationMaster: New watch stream created [stream: 268457619]
19/04/26 14:05:27 DEBUG skein.ApplicationMaster: Created watcher [stream: 268457619, watcher: 2, start: 'dask.scheduler', end: 'dask.scheduler�', type: PUT]
19/04/26 14:05:27 DEBUG skein.ApplicationMaster: Removed watcher [stream: 268457619, watcher: 2]
19/04/26 14:05:30 INFO skein.ApplicationMaster: Shutting down: Shutdown requested by user.
jcrist commented 5 years ago

Hi, apologies for the long delay here. The debugging you've done above makes it look like an issue with pseudodistributed hadoop (or your setup of it), and not a bug in our code.

I created a docker test setup for pseudodistributed mode, and am unable to reproduce. Perhaps this has to do with a patch cloudera made (possible, but unlikely), or my test case isn't sufficient to trigger. If you feel up for it, you might try things out here: https://gist.github.com/jcrist/75e81f6792e610f81ffa86b4474a9b1f. The dockerfile there follows cloudera's pseudodistributed install instructions found here: https://www.cloudera.com/documentation/enterprise/5-6-x/topics/cdh_qs_yarn_pseudo.html

Did you run into this issue in a production cluster, or are you only trying things out in pseudodistributed mode?

manugarri commented 5 years ago

im actually having the same issue when deployinh on emr. I cant seem to get to full worker utilization either