Open mxmlnkn opened 2 weeks ago
for which I added a method to join all threads,
fsspec might find this useful. We have an unreliable guard against fork
in general, but perhaps a function prefork()
which makes sure nothing unsafe (threads, even loops, open files) is copied out of the main process?
fsspec might find this useful. We have an unreliable guard against
fork
in general, but perhaps a functionprefork()
which makes sure nothing unsafe (threads, even loops, open files) is copied out of the main process?
Sounds good.
I don't think that opened files are a problem, at least not as long as they are not asynchronous, see man fork. I did not have problems with those and I think that I open some before the fork and it works after the work.
The child inherits copies of the parent's set of open file descriptors. Each file descriptor in the child refers to the same open file description (see open(2)) as the corresponding file descriptor in the parent. This means that the two file descriptors share open file status flags, file offset, and signal-driven I/O attributes (see the description of F_SETOWN and F_SETSIG in fcntl(2)).
Here is the libfuse wiki mentioning the problem with threads and fork and the recommendation to start those in the init
callback.
Btw, currently, I simply have this check to avoid hangs. (Argh, and while linking this, I noticed that the printDebug >= 1
test shouldn't be in the if. It was a simple warning before, but now is an error. That's how this happened.)
I simply have this check
It is good practice, I suppose, but still you need remediation in the case you have threads but then want to fork. Note that even a console ipython session has extra threads, which is only a problem if you try to do anything with them; they don't exist in the child.
In [1]: import multiprocess
In [2]: def f(_):
...: import threading
...: return str(threading.enumerate())
...:
In [3]: f(0)
Out[3]: '[<_MainThread(MainThread, started 8383734016)>, <HistorySavingThread(IPythonHistorySavingThread, started 6113292288)>]'
In [4]: pool = multiprocess.Pool()
In [5]: pool.map(f, [0])
Out[5]: ['[<_MainThread(MainThread, started 8383734016)>]']
(If you pass a thread object to the child, you can still check in threading.enumerate()
, which works by ID, or call .is_alive()
on it)
I am trying to use this library with FUSE. However, I have noticed:
SSHFileSystem
seems to start background threads. I have this problem myself in rapidgzip with the thread pool, for which I added a method to join all threads, so that the process can be forked into the background, where the thread pool gets automatically restarted on first usage. It would be nice to have a similar API for SSHFilesystem.The running threads according to
import threading; threading.enumerate()
are: MainThread, fsspecIO,asyncio_0
. Maybe I am running into https://github.com/fsspec/sshfs/issues/42 Then again, looking at the code, it does not seem like there is any logic for joining or canceling that thread at all!?On further inspection, the program also hangs inside
sshfs.file.SSHFile.close
when calling it manually, which itself hangs in SSHFile._close, which itself hangs in fsspec.asyn.sync_wrapper, which hangs in this loop.It would already be helpful if
SSHFile.close
would forward kwargs like all the other methods that are initialized with__mirror_method
, so that an argument withtimeout = 3
, for example, can be specified. Some kind of timeout should then also be used in__exit__
.