Open albertz opened 11 months ago
Ah, I just see that all calls to for_all_nodes
share a common thread pool. So if this thread pool queue is full via the JobCleaner
, that would explain it, right?
Some possible suggestions:
The JobCleaner
has this code:
while not self.stopped:
self.sis_graph.for_all_nodes(f)
time.sleep(gs.JOB_CLEANER_INTERVAL)
I think we maybe could swap the sleep
with the for_all_nodes
?
Can we extend for_all_nodes
to have some low-priority mode, and if the thread pool is full, wait a bit before we add more to it, so that once the main thread also gets to for_all_nodes
, it would be able to run and no need to wait before the job cleaner finished?
The hang sometimes can also be longer, here 8 minutes:
[2024-01-04 11:31:49,989] INFO: error(8) queue(4) runnable(1) running(12) waiting(1484)
Clear jobs in error state? [y/N]
Print verbose overview (v), update aliases and outputs (u), start manager (y), or exit (n)? y
[2024-01-04 11:40:38,732] INFO: Submit to queue: work/i6_core/returnn/training/ReturnnTrainingJob.LgJ1IEE3O1g8 create_files [1]
I think we maybe could swap the
sleep
with thefor_all_nodes
?
Sounds like a good idea, alternatively we could also introduce a longer cleaner startup wait period to make sure the sisyphus had enough time to handle everything else before starting the clean up processes.
- Can we extend
for_all_nodes
to have some low-priority mode, and if the thread pool is full, wait a bit before we add more to it, so that once the main thread also gets tofor_all_nodes
, it would be able to run and no need to wait before the job cleaner finished?
Sounds good as well. The clean up jobs are IO heavy and at are not time critical, so they are the perfect jobs to wait.
Maybe first run as sanity check if the problem really comes from the cleaner jobs by just disabling the cleaner completely and check if the problem goes away?
make sure the sisyphus had enough time to handle everything else before starting the clean up processes.
Yes, this would probably solve the problem at startup time, but the problem still could occur on whatever else the manager is currently doing at that point in time. I think it's never good if the cleaner complete blocks the manager during the time it does its job.
... to have some low-priority mode, and if the thread pool is full, ...
Sounds good as well.
Hm I was checking a bit how to do that, and it's unclear. I'm not sure I can check how full the queue of the ThreadPool
is currently, or give tasks low priority when putting them onto the queue.
Maybe first run as sanity check if the problem really comes from the cleaner jobs by just disabling the cleaner completely and check if the problem goes away?
Hm, good that you asked for that. I still have the hang. See the log: https://gist.github.com/albertz/1f845ffebf78277abb5b034afc667cb2
Main thread:
Thread 2543720 (idle): "MainThread"
for_all_nodes (sisyphus/graph.py:579)
jobs (sisyphus/graph.py:345)
startup (sisyphus/manager.py:548)
run (sisyphus/manager.py:574)
wrapped_func (sisyphus/tools.py:311)
manager (sisyphus/manager.py:138)
main (sisyphus/__main__.py:234)
<module> (sis:14)
And there are a lot of other threads looking like this:
Thread 2543833 (idle): "Thread-3 (worker)"
isfile (<frozen genericpath>:30)
job_finished (sisyphus/job.py:59)
cached_call (sisyphus/tools.py:564)
_sis_finished (sisyphus/job.py:468)
path_available (sisyphus/job.py:1066)
available (sisyphus/job_path.py:153)
cached_call (sisyphus/tools.py:564)
_sis_all_path_available (sisyphus/job.py:682)
_sis_runnable (sisyphus/job.py:691)
runner_helper (sisyphus/graph.py:556)
wrapped_func (sisyphus/tools.py:311)
worker (multiprocessing/pool.py:125)
run (threading.py:975)
_bootstrap_inner (threading.py:1038)
_bootstrap (threading.py:995)
Looks to me like it's busy checking the state of all jobs. I think the only way to avoid this checking would be to introduce a separate database to keep track of the last know states. I tried to avoided this so far since it adds another layer to worry about and might get out of sync with the actual state on the filesystem... Is the file system currently slower than usual? Was this a new problem or was the startup always that slow for you?
introduce a separate database to keep track of the last know states
Yeah see #168.
Is the file system currently slower than usual?
No, it's just slow as usual.
Was this a new problem or was the startup always that slow for you?
I think always the same. The graph size increased more and more over time, so now I have a larger setup again, so I notice it much more. Probably there is also some amount of files which NFS caches, so up to some point, it is still reasonably fast, but then becomes slow.
A problem which just came to my mind is that the status checks start from the outputs and work there way back to the inputs. Once a finished job is found the search is stopped.
Just caching the finished jobs would therefore barely speed up the whole process, you would have to cache the finished jobs, find which jobs could potentially finish next and than only check these jobs. This makes everything of cause more complicated...
Hm, my current situation is that most of the jobs are already finished. So in that case, such a DB would speed it up a lot, if it can skip the checks for all the finished jobs.
This is because my workflow is mostly like: I started with an empty setup, added experiment after experiment, waited until they are finished, then added more experiments. So the graph grows over time, but only the most recent experiments are not finished yet.
The logic you suggest is indeed complicated, and potentially also problematic, and also not really needed for my situation.
Good point, I was thinking about a large and deep dependency tree, if you have a flat tree with many outputs you would probably save a lot I/O by avoiding to check the finished jobs.
I still have this problem. A lot. Startup time is often now 30 minutes.
I forgot about that I reported this before, but now after some profiling, I found this again, which is exactly the same problem. I was also trying out #214 but this doesn't really help with the issue here. It still hangs. Specifically (via py-spy
), I see:
Thread 1009468 (idle): "MainThread"
for_all_nodes (sisyphus/graph.py:589)
get_jobs_by_status (sisyphus/graph.py:488)
update_jobs (sisyphus/manager.py:242)
__init__ (sisyphus/manager.py:225)
manager (sisyphus/manager.py:103)
main (sisyphus/__main__.py:234)
<module> (sis:14)
It hangs in for_all_nodes
. Looking at other threads: My current graph also expands a lot on _sis_update_inputs
-> update
. For example, via py-spy
, I see many threads like this:
Thread 1009762 (idle): "Thread-3 (worker)"
exists (<frozen genericpath>:19)
_chkpt_exists (i6_experiments/users/zeyer/returnn/training.py:255)
get_relevant_epochs_from_training_learning_rate_scores (i6_experiments/users/zeyer/returnn/training.py:236)
update (i6_experiments/users/zeyer/recog.py:729)
_sis_update_inputs (sisyphus/job.py:369)
_sis_runnable (sisyphus/job.py:695)
runner_helper (sisyphus/graph.py:566)
wrapped_func (sisyphus/tools.py:303)
worker (multiprocessing/pool.py:125)
run (threading.py:975)
_bootstrap_inner (threading.py:1038)
_bootstrap (threading.py:995)
Then I thought, why is the _sis_runnable
call needed at all? If the job is already finished, I don't think we need to call this? So I changed in for_all_nodes.runner_helper
to do a _sis_finished
check first:
def runner_helper(job):
"""
:param Job job:
"""
# make sure all inputs are updated
if not job._sis_finished():
job._sis_runnable()
Now, when running again, I see many threads like this:
isfile (<frozen genericpath>:30)
job_finished (sisyphus/job.py:59)
cached_call (sisyphus/tools.py:570)
_sis_finished (sisyphus/job.py:470)
runner_helper (sisyphus/graph.py:566)
wrapped_func (sisyphus/tools.py:303)
worker (multiprocessing/pool.py:125)
run (threading.py:975)
_bootstrap_inner (threading.py:1038)
_bootstrap (threading.py:995)
After a while, I also see that the cleanup manager again kicks in, and I see many threads like:
Thread 1012596 (idle): "Thread-13 (worker)"
islink (<frozen posixpath>:167)
_sis_cleanable (sisyphus/job.py:502)
f (sisyphus/manager.py:36)
runner_helper (sisyphus/graph.py:576)
wrapped_func (sisyphus/tools.py:303)
worker (multiprocessing/pool.py:125)
run (threading.py:975)
_bootstrap_inner (threading.py:1038)
_bootstrap (threading.py:995)
What I wonder: As said before, it already printed:
...
[2024-11-27 11:01:26,444] INFO: running: Job<alias/lm/trafo-n32-d1024-noAbsPos-rmsNorm-ffGated-rope-noBias-drop0-b32_1k/train work/i6_core/returnn/training/ReturnnTrainingJob.qyNzTeaNI3UN> {ep 64/100}
[2024-11-27 11:01:26,444] INFO: runnable: Job<work/i6_core/returnn/search/SearchCollapseRepeatedLabelsJob.RSx1whDm9gF8>
[2024-11-27 11:01:26,444] INFO: error(2) queue(6) runnable(1) running(8) waiting(2037)
Clear jobs in error state? [y/N]
Print verbose overview (v), update aliases and outputs (u), start manager (y), or exit (n)? y
And there it hangs, in all the job_finished
checks. But how does it know the job status that it has already printed before (error(2) queue(6) runnable(1) running(8)
)? When it knows which jobs are runnable, are running, in queue, then it must have already checked which are finished, or not? So, looking at the code, it already did the print_state_overview
. It already finished a previous run of update_jobs
. So, only the new call to update_jobs
causes the issues? (Maybe there anyway have been some _sis_update_inputs
on finished jobs, so now it checks more than before?)
While again looking at the code (I already forgot all the details), I see that there is the option CACHE_FINISHED_RESULTS
, which is disabled by default (why?). I will also try that.
(cc @Atticus1806)
So, I thought there are probably other _sis_runnable
calls, and I want to make that faster when the job is already finished. I'm now trying this:
diff --git a/sisyphus/job.py b/sisyphus/job.py
index 5243bdb..a446ea1 100644
--- a/sisyphus/job.py
+++ b/sisyphus/job.py
@@ -18,7 +18,7 @@ import subprocess
import sys
import time
import traceback
-from typing import List, Iterator, Type, TypeVar
+from typing import List, Iterator, Type, TypeVar, Optional
from sisyphus import block, tools
from sisyphus.task import Task
@@ -472,7 +472,7 @@ class Job(metaclass=JobSingleton):
self._sis_is_finished = True
return True
else:
- if self._sis_setup() and self._sis_runnable():
+ if self._sis_setup() and self._sis_runnable(return_when_finished=None):
# check all task if they are finished
for task in self._sis_tasks():
# job is only finished if all sub tasks are finished
@@ -684,8 +684,15 @@ class Job(metaclass=JobSingleton):
return False
return True
- def _sis_runnable(self):
+ def _sis_runnable(self, *, return_when_finished: Optional[bool] = True):
"""True if all inputs are available, also checks if new inputs are requested"""
+ if return_when_finished is not None:
+ # Avoid _sis_finished call due to potential recursive calls.
+ if not self._sis_is_finished and job_finished(self._sis_path()):
+ # Job is already marked as finished, skip check next time
+ self._sis_is_finished = True
+ if self._sis_is_finished:
+ return return_when_finished
if not self._sis_update_possible():
# Short cut used for most jobs
This again seems to resolve the hang in job_finished
. Now I just see many workers which call _sis_cleanable
, i.e. via the JobCleaner
. But those have spammed the thread pool, so that is why the update_jobs
hangs.
One potential solution: The JobCleaner
could use an own small thread pool, also only maybe 1 or 2 workers or so, so the main proc can still do its work, and update_jobs
can finish faster. (Edit It already has an own thread pool, but that is only used for the cleanup itself, not for the for_all_nodes
graph traversal. So I now try to let it use the same pool also for the graph traversal.)
One potential solution: The
JobCleaner
could use an own small thread pool, also only maybe 1 or 2 workers or so, so the main proc can still do its work, andupdate_jobs
can finish faster. (Edit It already has an own thread pool, but that is only used for the cleanup itself, not for thefor_all_nodes
graph traversal. So I now try to let it use the same pool also for the graph traversal.)
I tried that:
diff --git a/sisyphus/graph.py b/sisyphus/graph.py
index d9c7e70..4a4bed4 100644
--- a/sisyphus/graph.py
+++ b/sisyphus/graph.py
@@ -488,7 +488,7 @@ class SISGraph(object):
self.for_all_nodes(get_unfinished_jobs, nodes=nodes)
return states
- def for_all_nodes(self, f, nodes=None, bottom_up=False):
+ def for_all_nodes(self, f, nodes=None, bottom_up=False, *, pool: Optional[ThreadPool] = None):
"""
Run function f for each node and ancestor for `nodes` from top down,
stop expanding tree branch if functions returns False. Does not stop on None to allow functions with no
@@ -497,6 +497,7 @@ class SISGraph(object):
:param (Job)->bool f: function will be executed for all nodes
:param nodes: all nodes that will be checked, defaults to all output nodes in graph
:param bool bottom_up: start with deepest nodes first, ignore return value of f
+ :param pool: use custom thread pool
:return: set with all visited nodes
"""
@@ -544,7 +545,8 @@ class SISGraph(object):
pool_lock = threading.Lock()
finished_lock = threading.Lock()
- pool = self.pool
+ if not pool:
+ pool = self.pool
# recursive function to run through tree
def runner(job):
diff --git a/sisyphus/manager.py b/sisyphus/manager.py
index 4d17c60..e174741 100644
--- a/sisyphus/manager.py
+++ b/sisyphus/manager.py
@@ -38,7 +38,7 @@ class JobCleaner(threading.Thread):
return True
while not self.stopped:
- self.sis_graph.for_all_nodes(f)
+ self.sis_graph.for_all_nodes(f, pool=self.thread_pool)
time.sleep(gs.JOB_CLEANER_INTERVAL)
def close(self):
But now, again I see many threads doing this:
Thread 1039869 (idle): "Thread-6 (worker)"
isfile (<frozen genericpath>:30)
job_finished (sisyphus/job.py:59)
cached_call (sisyphus/tools.py:576)
_sis_runnable (sisyphus/job.py:691)
get_unfinished_jobs (sisyphus/graph.py:450)
runner_helper (sisyphus/graph.py:577)
wrapped_func (sisyphus/tools.py:303)
worker (multiprocessing/pool.py:125)
run (threading.py:975)
_bootstrap_inner (threading.py:1038)
_bootstrap (threading.py:995)
It still hangs for more than 5 minutes at least (and this is with warm NFS cache).
I still wonder why it now hangs with so many job_finished
calls, and already has done update_jobs
and print_state_overview
before that.
I see this:
Using py-spy, I see that the main thread hangs here:
Looking at the code, in the other threads, I should see sth related to the
for_all_nodes
with theget_unfinished_jobs
, but I don't see anything like that.In the other threads, I see lots of instances in
for_all_nodes
via theJobCleaner
, though. That job cleanup check seems to take a while. Maybe that blocks the otherfor_all_nodes
? But shouldn't I see at least something in the other threads about anythingget_jobs_by_status
related?See the full py-spy dump log here: https://gist.github.com/albertz/b4a3fa7c5140dfecb5d67ebe30bf0eff