dask / dask-jobqueue

Deploy Dask on job schedulers like PBS, SLURM, and SGE
https://jobqueue.dask.org
BSD 3-Clause "New" or "Revised" License
235 stars 142 forks source link

Error in re with 0.8.5 #637

Closed andersonjacob closed 3 months ago

andersonjacob commented 8 months ago

Describe the issue: When using LSF and version 0.8.5. There is an error in using re. I didn't have this issue when using 0.8.2. For the time being, I've just downgraded. The error is TypeError: expected string or bytes-like object.

Minimal Complete Verifiable Example:

from dask_jobqueue import LSFCluster
from distributed import Client

mem_str = "1G"
cluster = LSFCluster(cores=1, memory=mem_str)
all_data = [1000.0]*100
try:
    cluster.adapt(minimum=2, maximum=4)
    print(cluster.dashboard_link)
    with Client(cluster) as rc:
        labeler_future = rc.scatter(all_data, broadcast=True)
    cluster.scale(0)
finally:
    cluster.close()

Anything else we need to know?:

Environment:

The full error is:

2024-03-19 15:09:45,056 - tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOMainLoop object at 0x7f799d524700>>, <Task finished name='Task-90' coro=<SpecCluster._correct_state_internal() done, defined at /group/sw/slc/wafer_maps/janderson/envs/wafersig39/lib/python3.9/site-packages/distributed/deploy/spec.py:346> exception=TypeError('expected string or bytes-like object')>)
Traceback (most recent call last):
  File "/group/sw/slc/wafer_maps/janderson/envs/wafersig39/lib/python3.9/site-packages/tornado/ioloop.py", line 750, in _run_callback
    ret = callback()
  File "/group/sw/slc/wafer_maps/janderson/envs/wafersig39/lib/python3.9/site-packages/tornado/ioloop.py", line 774, in _discard_future_result
    future.result()
  File "/group/sw/slc/wafer_maps/janderson/envs/wafersig39/lib/python3.9/site-packages/distributed/deploy/spec.py", line 390, in _correct_state_internal
    await asyncio.gather(*worker_futs)
  File "/group/sw/slc/wafer_maps/janderson/envs/wafersig39/lib/python3.9/asyncio/tasks.py", line 688, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/group/sw/slc/wafer_maps/janderson/envs/wafersig39/lib/python3.9/site-packages/distributed/deploy/spec.py", line 74, in _
    await self.start()
  File "/group/sw/slc/wafer_maps/janderson/envs/wafersig39/lib/python3.9/site-packages/dask_jobqueue/core.py", line 427, in start
    self.job_id = self._job_id_from_submit_output(out)
  File "/group/sw/slc/wafer_maps/janderson/envs/wafersig39/lib/python3.9/site-packages/dask_jobqueue/core.py", line 433, in _job_id_from_submit_output
    match = re.search(self.job_id_regexp, out)
  File "/group/sw/slc/wafer_maps/janderson/envs/wafersig39/lib/python3.9/re.py", line 201, in search
    return _compile(pattern, flags).search(string)
TypeError: expected string or bytes-like object
/group/sw/slc/wafer_maps/janderson/envs/wafersig39/lib/python3.9/site-packages/distributed/utils.py:837: RuntimeWarning: coroutine 'Job._call' was never awaited
  with self:
2024-03-19 15:10:14,470 - distributed.core - ERROR - Exception while handling op scatter
Traceback (most recent call last):
  File "/group/sw/slc/wafer_maps/janderson/envs/wafersig39/lib/python3.9/site-packages/distributed/core.py", line 970, in _handle_comm
    result = await result
  File "/group/sw/slc/wafer_maps/janderson/envs/wafersig39/lib/python3.9/site-packages/distributed/scheduler.py", line 6022, in scatter
    raise TimeoutError("No valid workers found")
asyncio.exceptions.TimeoutError: No valid workers found
troubadour-hell commented 7 months ago

I meet the same error

guillaumeeb commented 7 months ago

Hi @andersonjacob @troubadour-hell,

@troubadour-hell are you also using LSFCluster?

I just tried to reproduce the issue, but I have only access to a SLURM HPC center, and I did not encounter it.

I've investigate a bit in the code changes, and I'm not sure of what could cause this... Would any of you be able to debug a little more? What type is the output of the job sumission command? Why it is not recognized as a String?

andersonjacob commented 7 months ago

I can probably do some more debugging. I wouldn't know where to start, but I'm happy to run some code and send back logs/outputs if I can.

troubadour-hell commented 7 months ago

Hi @andersonjacob @troubadour-hell,

@troubadour-hell are you also using LSFCluster?

I just tried to reproduce the issue, but I have only access to a SLURM HPC center, and I did not encounter it.

I've investigate a bit in the code changes, and I'm not sure of what could cause this... Would any of you be able to debug a little more? What type is the output of the job sumission command? Why it is not recognized as a String?

It appears that the function "_submit_job" does not work as expected. I've inserted a print statement before "return self._call()", but no output is being displayed, which suggests that "_submit_job" might not be getting called properly.

The out from “out = await self._submit_job(fn)” is <coroutine object Job._call at 0x7fc3be72c3a0>. out is a coroutine object, but not a string.

bsub file cotent:

#!/usr/bin/env bash

#BSUB -J Pal
#BSUB -q normal
#BSUB -n 12
#BSUB -R "span[hosts=1]"
#BSUB -M 36000
#BSUB -W 01:00

/share/home/xxx/.conda/envs/involutionary/bin/python -m distributed.cli.dask_worker tcp://x.x.x.x:46491 --name LSFCluster-17 --nthreads 24 --memory-limit 33.53GiB --nanny --death-timeout 60 --local-directory ~/.cache/dask --interface ib0

I hope this information will be helpful in resolving the issue. I am ready to perform additional tests if necessary.

troubadour-hell commented 7 months ago

The "_call" function uses async and await in version 0.8.5, but not in version 0.8.2.

Additionally, I encountered the following error message:

python3.11/site-packages/jinja2/nodes.py:199: RuntimeWarning: coroutine 'Job._call' was never awaited
  for child in self.iter_child_nodes():

I believe this could be the root cause of the issue.

troubadour-hell commented 6 months ago

@guillaumeeb https://github.com/dask/dask-jobqueue/blob/8713202488c664452bf0883bcd4f776536644676/dask_jobqueue/lsf.py#L116 Perhaps the issue is due to the absence of 'await' here.

troubadour-hell commented 6 months ago

I managed to get this to work by modifying the code in two places. First, change: https://github.com/dask/dask-jobqueue/blob/8713202488c664452bf0883bcd4f776536644676/dask_jobqueue/lsf.py#L116

return await self._call(piped_cmd)

Then, change: https://github.com/dask/dask-jobqueue/blob/8713202488c664452bf0883bcd4f776536644676/dask_jobqueue/core.py#L494

proc = await asyncio.create_subprocess_shell(

In my experience, "asyncio.create_subprocess_exec" only works properly when "shell=False" is set. However, this setup resulted in a 'file not found' error when the code attempted to execute the 'bsub' command. As a solution, I used "asyncio.create_subprocess_shell", which seemed to work. Although I'm not an expert in asyncio programming and wasn't able to completely resolve this issue, I hope the solution I found can provide useful information for others. After implementing this change, 'dask.compute' executed successfully, but it led to a new error:

  File "/share/home/xxx/.conda/envs/py312/lib/python3.12/site-packages/distributed/utils.py", line 832, in wrapper
    return await func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/share/home/xxx/.conda/envs/py312/lib/python3.12/site-packages/distributed/deploy/adaptive.py", line 204, in scale_down
    await f
  File "/share/home/xxx/.conda/envs/py312/lib/python3.12/site-packages/distributed/deploy/spec.py", line 580, in scale_down
    await self
  File "/share/home/xxx/.conda/envs/py312/lib/python3.12/site-packages/distributed/deploy/spec.py", line 420, in _
    await self._correct_state()
  File "/share/home/xxx/.conda/envs/py312/lib/python3.12/site-packages/distributed/deploy/spec.py", line 359, in _correct_state_internal
    await asyncio.gather(*tasks)
  File "/share/home/xxx/.conda/envs/py312/lib/python3.12/site-packages/dask_jobqueue/core.py", line 455, in close
    await self._close_job(self.job_id, self.cancel_command)
  File "/share/home/xxx/.conda/envs/py312/lib/python3.12/site-packages/dask_jobqueue/core.py", line 461, in _close_job
    await cls._call(shlex.split(cancel_command) + [job_id])
  File "/share/home/xxx/.conda/envs/py312/lib/python3.12/site-packages/dask_jobqueue/core.py", line 494, in _call
    proc = await asyncio.create_subprocess_shell(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/share/home/xxx/.conda/envs/py312/lib/python3.12/asyncio/subprocess.py", line 211, in create_subprocess_shell
    transport, protocol = await loop.subprocess_shell(
                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/share/home/xxx/.conda/envs/py312/lib/python3.12/asyncio/base_events.py", line 1711, in subprocess_shell
    transport = await self._make_subprocess_transport(
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/share/home/xxx/.conda/envs/py312/lib/python3.12/asyncio/unix_events.py", line 211, in _make_subprocess_transport
    transp = _UnixSubprocessTransport(self, protocol, args, shell,
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/share/home/xxx/.conda/envs/py312/lib/python3.12/asyncio/base_subprocess.py", line 36, in __init__
    self._start(args=args, shell=shell, stdin=stdin, stdout=stdout,
  File "/share/home/xxx/.conda/envs/py312/lib/python3.12/asyncio/unix_events.py", line 820, in _start
    self._proc = subprocess.Popen(
                 ^^^^^^^^^^^^^^^^^
  File "/share/home/xxx/.conda/envs/py312/lib/python3.12/subprocess.py", line 992, in __init__
    errread, errwrite) = self._get_handles(stdin, stdout, stderr)
                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/share/home/xxx/.conda/envs/py312/lib/python3.12/subprocess.py", line 1710, in _get_handles
    p2cread = stdin.fileno()
              ^^^^^^^^^^^^
AttributeError: 'str' object has no attribute 'fileno'
guillaumeeb commented 6 months ago

Thanks for the investigation @troubadour-hell! Indeed, with LSF, _submit_job function is superseeded, and the new async nature of _call not taken into account here.

I'm no async programming expert too, so I'm not sure what should be done to properly take into account the need for LSF to be in shell mode...

cc @jacobtomlinson

jacobtomlinson commented 6 months ago

Thanks for all the effort here so far. I've also had trouble with the asyncio subprocesses module. I've has better experience with anyio which should already be a dependency here becasuse it is used elsewhere in Dask. That may behave more as you would expect.

aiudirog commented 3 months ago

I just ran into this today and found that https://github.com/dask/dask-jobqueue/commit/19856184b01ee36665725aa74c129fb0a453bbd4 partially implements @troubadour-hell's suggestions. However, it drops shell=True without changing the subprocess creation call, breaking the cluster setup when stdin=True.

Without ensuring the LSF startup runs in a shell, the redirect code will be treated as a single executable filename. This results in this error FileNotFoundError: [Errno 2] No such file or directory: 'bsub< /tmp/tmpfile.sh 2> /dev/null' when it tries to setup the redirect.

I've submitted #661 to resolve this and tested on a local LSF cluster that uses stdin bsub.