Open JSKenyon opened 2 years ago
Just a note that I'm still trying to ensure minimum thread safety is achieved on the table system itself.
After spending some time yesterday digging through the table system code I have decided the best way forward is to serialize access to table and column pointers by making TableProxy a threadsafe object, locking on the internal Table::Table object.
Not only are there bucket caches, but column caches and table caches and making the entire C++ table system threadsafe is a massive undertaking for which I do not have time due to much higher priorities porting Meqtrees and supporting eMeerKAT.
This will not be optimal in terms of processing when using a single Table (inside TableProxy.h) object (it will give similar lockup issues to relying on the GIL, but at bare minimum it will make the codebase safe to access from multiple threads through the TableProxy.h/cc.
From discussion with @JSKenyon the correct way to use CC is then Correct:
import numpy as np
import threading
import pyrap.tables as pt
import os
import concurrent.futures as cf
import multiprocessing
import time
import sys
def threaded_getcolnp(ms_path, colname, startrow, nrow):
tid = threading.get_ident()
with pt.table(ms_path, lockoptions="user", ack=False) as ms:
ref_row = ms.getcol(colname, nrow=1) # Read an example row.
ref_dims = ref_row.shape[1:]
ref_dtype = ref_row.dtype
out = np.empty((nrow, *ref_dims), dtype=ref_dtype) # Preallocate
output.
ms.getcolnp(colname, out, startrow=startrow, nrow=nrow)
return # We don't want to profile the cost of returning the result.
if __name__ == "__main__":
ms_path = sys.argv[1]
no_times = int(sys.argv[2])
column = "DATA"
nchunk = 40 # Partition the read into this many chunks.
nworker = 4 # Number of threads/processes.
table = pt.table(ms_path, ack=False)
total_nrow = table.nrows()
table.close()
nrow = int(np.ceil(total_nrow / nchunk)) # nrow per chunk.
starts = np.repeat(np.arange(0, total_nrow, nrow), no_times)
np.random.shuffle(starts)
t0 = time.time()
with cf.ThreadPoolExecutor(max_workers=nworker) as tpe:
futures = [
tpe.submit(
threaded_getcolnp,
ms_path,
column,
row,
nrow
)
for row in starts
]
print(f"Reading MS using threads and pid multiton TableCache:
{time.time() - t0:.3f}s")
Wrong:
import numpy as np
import threading
import pyrap.tables as pt
import os
import concurrent.futures as cf
import multiprocessing
import time
import sys
def threaded_getcolnp(ms, colname, startrow, nrow):
tid = threading.get_ident()
ref_row = ms.getcol(colname, nrow=1) # Read an example row.
ref_dims = ref_row.shape[1:]
ref_dtype = ref_row.dtype
out = np.empty((nrow, *ref_dims), dtype=ref_dtype) # Preallocate output.
ms.getcolnp(colname, out, startrow=startrow, nrow=nrow)
return # We don't want to profile the cost of returning the result.
if __name__ == "__main__":
ms_path = sys.argv[1]
no_times = int(sys.argv[2])
column = "DATA"
nchunk = 40 # Partition the read into this many chunks.
nworker = 4 # Number of threads/processes.
table = pt.table(ms_path, ack=False)
total_nrow = table.nrows()
table.close()
nrow = int(np.ceil(total_nrow / nchunk)) # nrow per chunk.
starts = np.repeat(np.arange(0, total_nrow, nrow), no_times)
np.random.shuffle(starts)
with pt.table(ms_path, lockoptions="user", ack=False) as ms
t0 = time.time()
with cf.ThreadPoolExecutor(max_workers=nworker) as tpe:
futures = [
tpe.submit(
threaded_getcolnp,
ms,
column,
row,
nrow)
for row in starts
]
print(f"Reading MS using threads and pid multiton TableCache:
{time.time() - t0:.3f}s")
I know this requires substantial changes to the way dask-ms works, but in my view file type descriptor objects should never be shared between threads and one should rely on POSIX to ensure that locking is done correctly at file system layer.
The minimum workaround in the mean time would perhaps be to pass a proxy object around like simon showed which contains not a file / table descriptor but only the args and kwargs needed for a thread to open readonly read and close as needed. This has some overheads to it but on large enough chunk sizes those overheads should be amortized.
I am also wary of rushing to a solution on the casacore end @bennahugo. Ger may have a different solution in mind. I am just experimenting on the dask-ms end and trying to get something working. This is all moot if we just end up with a different scenario in which everything locks up.
Sure I'm just noting down our discussion -- it won't help to to put the cart in front of the horses here, but it would be good to have a draft solution for this. Thanks for the work on dask-ms front
[ ] Tests added / passed
If the pep8 tests fail, the quickest way to correct this is to run
autopep8
and thenflake8
andpycodestyle
to fix the remaining issues.[ ] Fully documented, including
HISTORY.rst
for all changes and one of thedocs/*-api.rst
files for new APITo build the docs locally:
This PR is a WIP demonstrating a possible approach for parallel reads from threads. This approach is reliant on https://github.com/casacore/casacore/pull/1167, which allows me to avoid using soft links. Instead, the changes in that PR mean that when a table is opened from multiple threads, it does not share its underlying plain table object.
The approach that I am attempting here is almost certainly imperfect but it is very simple. It defines a
ParallelTable
class which inherits frompyrap.tables.table
. This, unfortunately, introduces some limitations as the base class is defined in C++. That said, doing this allows us to create aParallelTable
object which masquerades as a normal table - the only difference is that when a read method is called, it first checks if the thread has an open instance of the table. If not, the table is opened in the thread and added to the cache. I make use ofweakref
to ensure that all tables are closed when the ParallelTable object is GCed.The changes in this PR seem to work although some tests are broken - I suspect this may have to do with subtables, but I have yet to investigate. Note that there is plenty of ugly debugging code in the PR. I will remove it if this coalesces into a stable approach.
One important thing to note is the fact that the
cf.ThreadPoolExecutor
has been dummied out with aDummyThreadPoolExecutor
andDummyFuture
. This seems to work for a simple read case, though further testing is needed. This would be a nice simplification as it suggests that we could get away without internal threadpools. That said, the changes in the PR also work with the internal threadpools with the caveat that those threadpools need more than one thread (as otherwise we serialise).Finally, one thing to note is that using the
processes
scheduler does not function optimally for both this PR andmaster
. Both will repeatedly open tables for reasons I don't fully understand. I suspect that the caching mechanism on theTableProxy
doesn't function as expected in this specific case. What is particularly confusing is that it does seem to operate correctly in the distributed case using aLocalCluster
with multiple workers.