Open JoachimMoe opened 2 years ago
Is this something you experience during the workflow or at the end when the cluster is shutting down?
The program I run is in a loop, where input increases from as such:
def run_multiplication():
for s in range(min, max):
size = 2 ** s
print_size = size * WORK_SIZE
try:
CMD_EXE = ["python3", "./multiplication.py", str(size), str(WORK_SIZE)]
delay = check_output(CMD_EXE).decode("ascii")
avg_watt = measure_watt(CMD_EXE)
total_joules = watts_to_joules(avg_watt, float(delay))
write_throughput_results("multiplicationGPU", size=print_size, wattage = total_joules, time=delay)
except CalledProcessError as e:
print(e)
write_error_output("multiplication", print_size, e)
The program never terminates, so the problem occurs during the workflow. As I write my output stats to a CSV, I can determine that exactly after executing the workflow for (2^20, 1000), the program just does not terminate, but just prints that the program dies, whilst still not terminating.
What I understand is the program containing run_multiplication
runs that code from the original description, which creates LocalCUDACluster
, etc., is that right? If that is the case, you're starting a new cluster for each iteration of run_multiplication
, which brings me again to the question of whether this happens during the execution of the actual Dask workflow as opposed to the cluster shutdown.
The way I understand it, you could completely disregard run_multiplication
and just run the workflow from the top with the parameters that fail and you would still experience the same issue. However, there are differences as to whether this is only happening at the end (when the work has completed and the Dask cluster is shutting down) or in the middle of the workflow, which is exactly what I'm trying to understand.
You are right. I have two separate programs, one calling the scripts where I implement the multiplication and so forth. So the loop I posted is a .py
script which runs the multiplication.py
script for all integers in range [5:22]. So yes, there is a top program running the cluster, which means a new cluster is started for every iteration in the main script.
The problem originates from the Cluster not shutting down, why I now realize my original post may be mistitled, I apologize for that. I can run the program multiplication.py
for arbitrary sizes, including 2 ^ 21, 1000 with no problems, why the problem must be the cluster shutdown. The reason as to my doubt is that I added client.shutdown()
to multiplication.py
in order to run it from the master script, this instantly resulted in the following error:
2022-11-23 12:32:12,869 - distributed.preloading - INFO - Creating preload: dask_cuda.initialize
2022-11-23 12:32:12,869 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
0.2419048309326172
2022-11-23 12:32:13,744 - distributed.client - ERROR -
ConnectionRefusedError: [Errno 111] Connection refused
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/joachim/anaconda3/envs/rapids-22.12/lib/python3.9/site-packages/distributed/comm/core.py", line 291, in connect
comm = await asyncio.wait_for(
File "/home/joachim/anaconda3/envs/rapids-22.12/lib/python3.9/asyncio/tasks.py", line 479, in wait_for
return fut.result()
File "/home/joachim/anaconda3/envs/rapids-22.12/lib/python3.9/site-packages/distributed/comm/tcp.py", line 511, in connect
convert_stream_closed_error(self, e)
File "/home/joachim/anaconda3/envs/rapids-22.12/lib/python3.9/site-packages/distributed/comm/tcp.py", line 142, in convert_stream_closed_error
raise CommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}") from exc
distributed.comm.core.CommClosedError: in <distributed.comm.tcp.TCPConnector object at 0x7fa777db4b50>: ConnectionRefusedError: [Errno 111] Connection refused
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/joachim/anaconda3/envs/rapids-22.12/lib/python3.9/site-packages/distributed/utils.py", line 742, in wrapper
return await func(*args, **kwargs)
File "/home/joachim/anaconda3/envs/rapids-22.12/lib/python3.9/site-packages/distributed/client.py", line 1298, in _reconnect
await self._ensure_connected(timeout=timeout)
File "/home/joachim/anaconda3/envs/rapids-22.12/lib/python3.9/site-packages/distributed/client.py", line 1328, in _ensure_connected
comm = await connect(
File "/home/joachim/anaconda3/envs/rapids-22.12/lib/python3.9/site-packages/distributed/comm/core.py", line 315, in connect
await asyncio.sleep(backoff)
File "/home/joachim/anaconda3/envs/rapids-22.12/lib/python3.9/asyncio/tasks.py", line 652, in sleep
return await future
asyncio.exceptions.CancelledError
Traceback (most recent call last):
File "/home/joachim/anaconda3/envs/rapids-22.12/lib/python3.9/site-packages/distributed/comm/tcp.py", line 225, in read
frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/joachim/anaconda3/envs/rapids-22.12/lib/python3.9/site-packages/distributed/client.py", line 1500, in _handle_report
msgs = await self.scheduler_comm.comm.read()
File "/home/joachim/anaconda3/envs/rapids-22.12/lib/python3.9/site-packages/distributed/comm/tcp.py", line 241, in read
convert_stream_closed_error(self, e)
File "/home/joachim/anaconda3/envs/rapids-22.12/lib/python3.9/site-packages/distributed/comm/tcp.py", line 144, in convert_stream_closed_error
raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) Client->Scheduler local=tcp://127.0.0.1:40494 remote=tcp://127.0.0.1:34111>: Stream is closed
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/joachim/anaconda3/envs/rapids-22.12/lib/python3.9/site-packages/distributed/utils.py", line 742, in wrapper
return await func(*args, **kwargs)
File "/home/joachim/anaconda3/envs/rapids-22.12/lib/python3.9/site-packages/distributed/client.py", line 1508, in _handle_report
await self._reconnect()
File "/home/joachim/anaconda3/envs/rapids-22.12/lib/python3.9/site-packages/distributed/utils.py", line 742, in wrapper
return await func(*args, **kwargs)
File "/home/joachim/anaconda3/envs/rapids-22.12/lib/python3.9/site-packages/distributed/client.py", line 1298, in _reconnect
await self._ensure_connected(timeout=timeout)
File "/home/joachim/anaconda3/envs/rapids-22.12/lib/python3.9/site-packages/distributed/client.py", line 1328, in _ensure_connected
comm = await connect(
File "/home/joachim/anaconda3/envs/rapids-22.12/lib/python3.9/site-packages/distributed/comm/core.py", line 315, in connect
await asyncio.sleep(backoff)
File "/home/joachim/anaconda3/envs/rapids-22.12/lib/python3.9/asyncio/tasks.py", line 652, in sleep
return await future
asyncio.exceptions.CancelledError
Why I thought I would let DASK handle the cluster shut-downs and openings. However, I now realize that I will have to find a way to shutdown the cluster after executing the workload, which I will investigate.
I don't mean that you're necessarily doing anything wrong, just trying to understand where the issue occurs. Unfortunately ensuring everything closes correctly is more challenging than it may seem, so it's possible that for any reason your cluster is actually taking longer than expected to terminate and that may have the effect of spreading down the stack.
So in this case, are you sure that the work has actually completed? In other words, are you able to verify that somehow? What could still be happening is that the cluster is not shutting down, but some of the processes could have deadlocked or died, and thus making Dask unable to continue the job (for example, if the process stopped responding and can't be killed). So the next step here is to ensure that the cluster is shutting down but fails to do so, or if it's actually stuck for some other reason.
The following program:
Crashes when input sizes get too big, i.e. an inputsize of (2 ^ 22, 1000). The normal DASK solution is to pass
processes=False
client, however this is not an option when using GPU.Set up:
Rig:
What causes this error?