casacore / python-casacore

Python bindings for casacore, a library used in radio astronomy
http://casacore.github.io/python-casacore
GNU Lesser General Public License v3.0
35 stars 22 forks source link

Add code to drop the GIL. Demostrate usefulness on reads/writes. #209

Open JSKenyon opened 4 years ago

JSKenyon commented 4 years ago

This is part of ongoing experiments to show the need for parallel reads (and possibly writes) when using measurement sets. This PR is a draft and should not be merged - its purpose is to demonstrate a proof of concept and to encourage discussion. The following issue on casacore is related.

Motivation

Instruments such as MeerKAT and LOFAR produce huge quantities of data. Currently, limitations of the casacore table system prevent reading/writing from multiple threads. As algorithms and implementations become more massively parallel (the direction in which most development is heading), this limitation (particularly for reads) becomes a nasty bottleneck.

Alternatives

Problems

Dropping the GIL

This PR adds the guards.h file. This, plus a small modification to pytable.cc, allows python-casacore to drop the GIL on reads. This code was adapted from the following Stack Overflow. Note that I am not a C++ developer so I might not know all the consequences of a change like this. The value of this will be demonstrated shortly.

Experiment 1

The first experiment involves reading a single, conventional MS using a single thread, multiple threads and multiple processes both with and without the GIL. The code for this experiment is as follows:

Code for Experiment 1

```python import pyrap.tables as pt import numpy as np from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, wait import time def get_data(ms_name, colname, startrow, nrow): ms = pt.table(ms_name, lockoptions="autonoread", ack=False) ms.setmaxcachesize(colname, 1) ref_row = ms.getcol(colname, nrow=1) ref_dims = ref_row.shape[1:] ref_dtype = ref_row.dtype out = np.empty((nrow, *ref_dims), dtype=ref_dtype) ms.getcolnp(colname, out, startrow=startrow, nrow=nrow) ms.close() return if __name__ == "__main__": ms_name = "/path/to/ms" column = "DATA" ms = pt.table(ms_name, lockoptions="autonoread", ack=False) scan_ids, scan_counts = np.unique(ms.getcol("SCAN_NUMBER"), return_counts=True) startrows = [0] startrows.extend(np.cumsum(scan_counts)) numrows = scan_counts inds = list(zip(startrows, numrows)) ms.close() t0 = time.time() with ProcessPoolExecutor(max_workers=len(startrows)) as executor: futures = [executor.submit(get_data, ms_name, column, sr, nr) for sr, nr in inds] wait(futures) print("Reading MS using processes: {:.3f}s".format(time.time() - t0)) t0 = time.time() with ThreadPoolExecutor(max_workers=len(startrows)) as executor: futures = [executor.submit(get_data, ms_name, column, sr, nr) for sr, nr in inds] wait(futures) print("Reading MS using threads: {:.3f}s".format(time.time() - t0)) t0 = time.time() get_data(ms_name, column, 0, sum(numrows)) print("Reading MS using a single thread: {:.3f}s".format(time.time() - t0)) ```

The results of this experiment with casacore 3.3.1 (the GIL is not released):

Reading MS using processes: 4.712s
Reading MS using threads: 41.956s
Reading MS using a single thread: 42.183s

The results of this experiment with this PR (the GIL is released):

Reading MS using processes: 4.275s
Segmentation fault (core dumped)

This segmentation fault is not python-casacore's fault - this is the thread-safety issue alluded to above. Ideally this will be fixed upstream. This does however reveal behaviour which I believe is undesirable: python-casacore should not be relying on the GIL to ensure thread safety in the underlying casacore routines.

Experiment 2

The second experiment involves reading a multi-MS (split by scan into 12 sub-measurement sets) using a single thread, multiple threads and multiple processes both with and without the GIL. The code for this experiment is as follows:

Code for Experiment 2

```python import pyrap.tables as pt import numpy as np from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, wait import time def get_data(ms_name, colname): ms = pt.table(ms_name, lockoptions="autonoread", ack=False) ms.setmaxcachesize(colname, 1) ref_row = ms.getcol(colname, nrow=1) ref_dims = ref_row.shape[1:] ref_dtype = ref_row.dtype out = np.empty((ms.nrows(), *ref_dims), dtype=ref_dtype) ms.getcolnp(colname, out) ms.close() return True if __name__ == "__main__": ms_name = "/path/to/mms" column = "DATA" ms = pt.table(ms_name, lockoptions="autonoread", ack=False) subms_names = list(ms.partnames()) n_subms = len(subms_names) ms.close() t0 = time.time() with ProcessPoolExecutor(max_workers=n_subms) as executor: futures = [executor.submit(get_data, subms_name, column) for subms_name in subms_names] wait(futures) print("Reading multi-MS using processes: {:.3f}s".format(time.time() - t0)) t0 = time.time() with ThreadPoolExecutor(max_workers=n_subms) as executor: futures = [executor.submit(get_data, subms_name, column) for subms_name in subms_names] wait(futures) print("Reading multi-MS using threads: {:.3f}s".format(time.time() - t0)) t0 = time.time() get_data(ms_name, column) print("Reading multi-MS using a single thread: {:.3f}s".format(time.time() - t0)) ```

The results of this experiment with casacore 3.3.1 (the GIL is not released):

Reading multi-MS using processes: 4.537s
Reading multi-MS using threads: 43.657s
Reading multi-MS using a single thread: 45.974s

The results of this experiment with this PR (the GIL is released):

Reading multi-MS using processes: 4.601s
Reading multi-MS using threads: 4.685s
Reading multi-MS using a single thread: 47.493s

This experiment shows the value and potential of releasing the GIL in python-casacore, as the multi-MS was successfully read from multiple threads in a similar amount of time as using processes.

Conclusions

I believe this PR demonstrates both the value and danger of releasing the GIL in the python-casacore layer. Having the ability to read from multiple threads can massively accelerate parallel processing as shown in experiment 2. At the same time, experiment 1 exposes thread-safety issues in the underlying casacore routines. I believe that the ultimate solution is to drop the GIL and fix the cause of the segmentation faults in casacore, allowing conventional (non-multi) measurement sets to be read from multiple threads.

JSKenyon commented 4 years ago

As an additional note, the data I used for the experiments was a MeerKAT MS/MMS with around 72GB of data in the data column.

JSKenyon commented 4 years ago

Any thoughts?

gijzelaerr commented 3 years ago

i love it!

JSKenyon commented 2 years ago

@tammojan I have confirmed that the results of Experiment 1 are unchanged. There is still a segfault once the GIL is disabled. I will try to experiment with user locking next.

JSKenyon commented 2 years ago

I can also confirm that user locking in conjunction with setting table.nolocking does not resolve the problem.

gervandiepen commented 2 years ago

The problem is that a Table object uses internal buffers and caches which are not thread-safe. So when using the same Table object in multiple threads, you'll be in trouble. But it's a bit more than that. Table objects on the same file share the same underlying PlainTable object, thus have the same problem as above. This was done to ensure that writing from different Table objects to the same file works fine. You can circumvent the latter problem by opening the same file with different names by using links (hard or soft). It will work, but not that nice.

I can think of two solutions:

What do you think?

On Mon, Jan 24, 2022 at 9:17 AM JSKenyon @.***> wrote:

I can also confirm that user locking in conjunction with setting table.nolocking does not resolve the problem.

— Reply to this email directly, view it on GitHub https://github.com/casacore/python-casacore/pull/209#issuecomment-1019828179, or unsubscribe https://github.com/notifications/unsubscribe-auth/AB34QPBO3QEND4ZKUWH77HDUXUDIPANCNFSM4P233HKA . Triage notifications on the go with GitHub Mobile for iOS https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675 or Android https://play.google.com/store/apps/details?id=com.github.android&referrer=utm_campaign%3Dnotification-email%26utm_medium%3Demail%26utm_source%3Dgithub.

You are receiving this because you are subscribed to this thread.Message ID: @.***>

JSKenyon commented 2 years ago

Thanks for the response @gervandiepen! I will be discussing this a little further with @sjperkins tomorrow, after which I will likely have an opinion.

JSKenyon commented 2 years ago

Apologies for not replying sooner @gervandiepen. The second of the two options you suggested seems the more appealing. That said, I don't believe that it will address the fact that python-casacore is implicitly relying on the GIL for thread safely. If thread safely is an issue, I believe it should be handled in the C++ layer, rather than using an implementation detail of CPython.

We are currently experimenting with various approaches to circumnavigate the various issues, and your reply has given me a few more ideas (thanks for that!).

Out of interest, would any of these problems become less challenging if it was possible to fully disable caching? We are looking at distributed systems where caching may not be desirable. I appreciate that the cache can be limited to an arbitrarily small size, but that doesn't fundamentally disable the caching (and the thread safety issues associated with it).

gervandiepen commented 2 years ago

I fully agree that thread-safety issues have to be solved at the C++ level. The cache size can be set to 1 which basically disables caching. But that does not make a shared Table object and related classes thread-safe. However, the symlink trick should work because you'll get disjoint Table objects for each thread (although I do not give guarantees :-) I'm interested in your results.

On Mon, Feb 14, 2022 at 9:51 AM JSKenyon @.***> wrote:

Apologies for not replying sooner @gervandiepen https://github.com/gervandiepen. The second of the two options you suggested seems the more appealing. That said, I don't believe that it will address the fact that python-casacore is implicitly relying on the GIL for thread safely. If thread safely is an issue, I believe it should be handled in the C++ layer, rather than using an implementation detail of CPython.

We are currently experimenting with various approaches to circumnavigate the various issues, and your reply has given me a few more ideas (thanks for that!).

Out of interest, would any of these problems become less challenging if it was possible to fully disable caching? We are looking at distributed systems where caching may not be desirable. I appreciate that the cache can be limited to an arbitrarily small size, but that doesn't fundamentally disable the caching (and the thread safety issues associated with it).

— Reply to this email directly, view it on GitHub https://github.com/casacore/python-casacore/pull/209#issuecomment-1038813871, or unsubscribe https://github.com/notifications/unsubscribe-auth/AB34QPD7ADWVYITGT323WX3U3C7ADANCNFSM4P233HKA . Triage notifications on the go with GitHub Mobile for iOS https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675 or Android https://play.google.com/store/apps/details?id=com.github.android&referrer=utm_campaign%3Dnotification-email%26utm_medium%3Demail%26utm_source%3Dgithub.

You are receiving this because you were mentioned.Message ID: @.***>

JSKenyon commented 2 years ago

I have been fiddling away at this problem for most of the week and finally have something worth discussing. @gervandiepen, your mention of soft links got me thinking and led me to implement the following (very rough) proof of concept:

import numpy as np
import threading
import pyrap.tables as pt
import os
import concurrent.futures as cf
import multiprocessing
import time

def linked_getcolnp(ms_path, colname, startrow, nrow):

    tid = threading.get_ident()

    ms_name = ms_path.split("/")[-1]

    link_path = f"/tmp/{tid}-{ms_name}"  # Name based on thread ID.

    try:
        os.symlink(ms_path, link_path)
    except FileExistsError:  # This is just a precaution for now.
        os.unlink(link_path)
        os.symlink(ms_path, link_path)

    ms = pt.table(link_path, lockoptions="user", ack=False)
    ms.setmaxcachesize(colname, 1)  # Disable caching.

    ref_row = ms.getcol(colname, nrow=1)  # Read an example row.
    ref_dims = ref_row.shape[1:]
    ref_dtype = ref_row.dtype

    out = np.empty((nrow, *ref_dims), dtype=ref_dtype)  # Preallocate output.

    ms.getcolnp(colname, out, startrow=startrow, nrow=nrow)

    ms.close()
    os.unlink(link_path)  # Remove the soft link.

    return  # We don't want to profile the cost of returning the result.

if __name__ == "__main__":

    ms_path = "path/to/ms"
    column = "DATA"
    nchunk = 20  # Partition the read into this many chunks.
    nworker = 6  # Number of threads/processes.

    table = pt.table(ms_path, ack=False)
    total_nrow = table.nrows()
    table.close()

    nrow = int(np.ceil(total_nrow / nchunk))  # nrow per chunk.

    starts = np.arange(0, total_nrow, nrow)

    t0 = time.time()
    result = [linked_getcolnp(ms_path, column, row, nrow) for row in starts]
    print(f"Reading MS using a single thread: {time.time() - t0:.3f}s")

    t0 = time.time()
    with cf.ThreadPoolExecutor(max_workers=nworker) as tpe:
        futures = [
            tpe.submit(
                linked_getcolnp,
                ms_path,
                column,
                row,
                nrow
            )
            for row in starts
        ]
    print(f"Reading MS using threads and symlinks: {time.time() - t0:.3f}s")

    mp_context = multiprocessing.get_context("spawn")  # Forking breaks things.

    t0 = time.time()
    with cf.ProcessPoolExecutor(max_workers=nworker,
                                mp_context=mp_context) as ppe:
        futures = [
            ppe.submit(
                linked_getcolnp,
                ms_path,
                column,
                row,
                nrow
            )
            for row in starts
        ]
    print(f"Reading MS using processes and symlinks: {time.time() - t0:.3f}s")

If I run this example on an 80GB column using the latest python-casacore, I get the following output:

Reading MS using a single thread: 45.991s
Reading MS using threads and symlinks: 45.103s
Reading MS using processes and symlinks: 9.621s

As you can see, threading still fails to improve performance. This is completely expected as the soft linking doesn't get around the GIL i.e. the reads will still be serialised. However, if I use my version of python-casacore (this PR), which drops the GIL, I get the following:

Reading MS using a single thread: 44.503s
Reading MS using threads and symlinks: 9.418s
Reading MS using processes and symlinks: 9.514s

Threads and processes perform equivalently and no more segfaults! This is really compelling to me and I will definitely try and refine this approach. However, we still come back to the fact that the GIL needs too be dropped in order for this to work.

@gervandiepen, @tammojan I would really appreciate any thoughts/opinions/advice you may have. :-)

gervandiepen commented 2 years ago

It's good to hear that the symlink trick works fine. Thanks for trying that. It means that having separate Table objects for the same table will work as well. I'll see if I can implement that in the coming month. I'll create a casacore issue for it. I have no opinion on the GIL problem. I leave it to Tammo Jan to review your PR.

On Fri, Feb 18, 2022 at 10:41 AM JSKenyon @.***> wrote:

I have been fiddling away at this problem for most of the week and finally have something worth discussing. @gervandiepen https://github.com/gervandiepen, your mention of soft links got me thinking and led me to implement the following (very rough) proof of concept:

import numpy as npimport threadingimport pyrap.tables as ptimport osimport concurrent.futures as cfimport multiprocessingimport time

def linked_getcolnp(ms_path, colname, startrow, nrow):

tid = threading.get_ident()

ms_name = ms_path.split("/")[-1]

link_path = f"/tmp/{tid}-{ms_name}"  # Name based on thread ID.

try:
    os.symlink(ms_path, link_path)
except FileExistsError:  # This is just a precaution for now.
    os.unlink(link_path)
    os.symlink(ms_path, link_path)

ms = pt.table(link_path, lockoptions="user", ack=False)
ms.setmaxcachesize(colname, 1)  # Disable caching.

ref_row = ms.getcol(colname, nrow=1)  # Read an example row.
ref_dims = ref_row.shape[1:]
ref_dtype = ref_row.dtype

out = np.empty((nrow, *ref_dims), dtype=ref_dtype)  # Preallocate output.

ms.getcolnp(colname, out, startrow=startrow, nrow=nrow)

ms.close()
os.unlink(link_path)  # Remove the soft link.

return  # We don't want to profile the cost of returning the result.

if name == "main":

ms_path = "path/to/ms"
column = "DATA"
nchunk = 20  # Partition the read into this many chunks.
nworker = 6  # Number of threads/processes.

table = pt.table(ms_path, ack=False)
total_nrow = table.nrows()
table.close()

nrow = int(np.ceil(total_nrow / nchunk))  # nrow per chunk.

starts = np.arange(0, total_nrow, nrow)

t0 = time.time()
result = [linked_getcolnp(ms_path, column, row, nrow) for row in starts]
print(f"Reading MS using a single thread: {time.time() - t0:.3f}s")

t0 = time.time()
with cf.ThreadPoolExecutor(max_workers=nworker) as tpe:
    futures = [
        tpe.submit(
            linked_getcolnp,
            ms_path,
            column,
            row,
            nrow
        )
        for row in starts
    ]
print(f"Reading MS using threads and symlinks: {time.time() - t0:.3f}s")

mp_context = multiprocessing.get_context("spawn")  # Forking breaks things.

t0 = time.time()
with cf.ProcessPoolExecutor(max_workers=nworker,
                            mp_context=mp_context) as ppe:
    futures = [
        ppe.submit(
            linked_getcolnp,
            ms_path,
            column,
            row,
            nrow
        )
        for row in starts
    ]
print(f"Reading MS using processes and symlinks: {time.time() - t0:.3f}s")

If I run this example on an 80GB column using the latest python-casacore, I get the following output:

Reading MS using a single thread: 45.991s Reading MS using threads and symlinks: 45.103s Reading MS using processes and symlinks: 9.621s

As you can see, threading still fails to improve performance. This is completely expected as the soft linking doesn't get around the GIL i.e. the reads will still be serialised. However, if I use my version of python-casacore (this PR), which drops the GIL, I get the following:

Reading MS using a single thread: 44.503s Reading MS using threads and symlinks: 9.418s Reading MS using processes and symlinks: 9.514s

Threads and processes perform equivalently and no more segfaults! This is really compelling to me and I will definitely try and refine this approach. However, we still come back to the fact that the GIL needs too be dropped in order for this to work.

@gervandiepen https://github.com/gervandiepen, @tammojan https://github.com/tammojan I would really appreciate any thoughts/opinions/advice you may have. :-)

— Reply to this email directly, view it on GitHub https://github.com/casacore/python-casacore/pull/209#issuecomment-1044225957, or unsubscribe https://github.com/notifications/unsubscribe-auth/AB34QPC34PHORSXW6EXVQTLU3YH35ANCNFSM4P233HKA . Triage notifications on the go with GitHub Mobile for iOS https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675 or Android https://play.google.com/store/apps/details?id=com.github.android&referrer=utm_campaign%3Dnotification-email%26utm_medium%3Demail%26utm_source%3Dgithub.

You are receiving this because you were mentioned.Message ID: @.***>

bennahugo commented 2 years ago

Alright @JSKenyon I've added the necessary fix (https://github.com/casacore/casacore/pull/1167) to casacore to force the plaintables to be cached separately for separate threads or fork(). I cannot immediately see any issues with this (it forces the buckets etc. that are allocated inside the tables to be different between the different threads.

Easiest to get these things to play along is to compile casacore into a clean virtualenv (ie. things will be in venv/bin venv/lib and venv/include). You then need to do export C_INCLUDE_PATH=<venvpathabs>/include; export CPLUS_INCLUDE_PATH=<venvpathabs>/include; export LD_LIBRARY_PATH=<venvpathabs>/lib before running pip install -e . on python-casacore.

I would be curious to see if we get similar performance to the symlink kludge you mentioned in https://github.com/casacore/python-casacore/pull/209#issuecomment-1044225957 on your dataset.

bennahugo commented 2 years ago

@JSKenyon please apply the following patch to this branch. I cannot commit to it

Unfortunately I don't know how to make boost capture the exception --- it is definitely being raised in C++ (as tested for in tables/Tables/test/tTableReadThreaded.cc in 52b83f2), but it looks like it is swallowing the exception -- it even swallows std::runtime_error. I tried all the solutions I can find online but none works

From c1a9f9ba23cfed1f6c87a2d5c93c61151f3506a6 Mon Sep 17 00:00:00 2001
From: bennahugo <bennahugo@gmail.com>
Date: Sun, 27 Feb 2022 17:43:29 +0200
Subject: [PATCH] Fixes missing pthreadlib linkage for casacore#1163

---
 README.rst | 1 +
 setup.py   | 2 +-
 2 files changed, 2 insertions(+), 1 deletion(-)

diff --git a/README.rst b/README.rst
index ed27754..ca5a409 100644
--- a/README.rst
+++ b/README.rst
@@ -76,6 +76,7 @@ On ubuntu you can install these with::
   this::

     CFLAGS="-std=c++11 \
+            -lpthread \
             -I/usr/local/Cellar/boost/1.68.0/include/ \
             -I/usr/local/include/  \
             -L/usr/local/Cellar/boost/1.68.0/lib \
diff --git a/setup.py b/setup.py
index def6ec5..52dc2c1 100755
--- a/setup.py
+++ b/setup.py
@@ -233,7 +233,7 @@ def get_extensions():
                                     library_dirs=library_dirs,
                                     include_dirs=include_dirs,
                                     # Since casacore 3.0.0 we have to be C++11
-                                    extra_compile_args=['-std=c++11']))
+                                    extra_compile_args=['-std=c++11', '-lpthread']))
     return extensions

-- 
2.17.1
bennahugo commented 2 years ago

@JSKenyon please apply the following patch. I finally found a working solution

From fc19b4b6e04c93716664c27818920cd8a9928467 Mon Sep 17 00:00:00 2001
From: bennahugo <bennahugo@gmail.com>
Date: Sun, 27 Feb 2022 20:42:26 +0200
Subject: [PATCH] Add converter routines for nonthreadsafe exception

---
 src/tables.cc | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/src/tables.cc b/src/tables.cc
index 37fb8dd..92733e3 100755
--- a/src/tables.cc
+++ b/src/tables.cc
@@ -32,15 +32,17 @@
 #include <casacore/python/Converters/PycValueHolder.h>
 #include <casacore/python/Converters/PycRecord.h>
 #include <casacore/python/Converters/PycArray.h>
+#include <casacore/python/Converters/PyNotThreadSafe.h>
 #include <casacore/tables/Tables/TableProxy.h>

 #include <casacore/meas/MeasUDF/Register.h>
 #include <casacore/derivedmscal/DerivedMC/Register.h>
-
 #include <boost/python.hpp>
+#include <string>

 BOOST_PYTHON_MODULE(_tables)
 {
+  casacore::python::registerNotThreadSafeException();
   casacore::python::register_convert_excp();
   casacore::python::register_convert_basicdata();
   casacore::python::register_convert_casa_valueholder();
-- 
2.17.1
bennahugo commented 2 years ago

The following use case works (essentially repeats the c++ test at the time of writing -- Note reading and writing from threads are still an open issue

import numpy as np
import threading
import pyrap.tables as pt
import os
import concurrent.futures as cf
import multiprocessing
import time
import sys
import hashlib

def threaded_getcolnp(ms_path, colname, startrow, nrow):

    tid = threading.get_ident()

    with pt.table(ms_path, lockoptions="user", ack=False) as ms:

        ref_row = ms.getcol(colname, nrow=1)  # Read an example row.
        ref_dims = ref_row.shape[1:]
        ref_dtype = ref_row.dtype

        out = np.empty((nrow, *ref_dims), dtype=ref_dtype)  # Preallocate output.

        ms.getcolnp(colname, out, startrow=startrow, nrow=nrow)

    return hashlib.md5(out).hexdigest() # We don't want to profile the cost of returning the result.

def threaded_getcolnp_sharetable(ms, colname, startrow, nrow):
    try:
        ms.nrows()
    except:
        return True

    return False

if __name__ == "__main__":

    ms_path = sys.argv[1]
    no_times = int(sys.argv[2])
    column = "DATA"
    nchunk = 10  # Partition the read into this many chunks.
    nworker = 8  # Number of threads/processes.

    table = pt.table(ms_path, ack=False)
    total_nrow = table.nrows()
    table.close()
    # skip last few rows if needed
    nrow = int(np.floor(total_nrow / nchunk))  # nrow per chunk.

    starts = np.repeat(np.arange(0, total_nrow, nrow), no_times)
    np.random.shuffle(starts)
    t0 = time.time()
    resultRef = [threaded_getcolnp(ms_path, column, row, nrow) for row in starts]
    print(f"Reading MS using a single thread: {time.time() - t0:.3f}s")

    t0 = time.time()
    result = [threaded_getcolnp(ms_path, column, row, nrow) for row in starts]
    assert(all(np.array(result) == np.array(resultRef)))
    print(f"Again: Reading MS using a single thread: {time.time() - t0:.3f}s")

    t0 = time.time()
    with cf.ThreadPoolExecutor(max_workers=nworker) as tpe:
        futures = [
            tpe.submit(
                threaded_getcolnp,
                ms_path,
                column,
                row,
                nrow
            )
            for row in starts
        ]
    assert(all(np.array(list(map(lambda x: x.result(), futures))) == np.array(resultRef)))
    print(f"Reading MS using threads and pooled table cache: {time.time() - t0:.3f}s")

    mp_context = multiprocessing.get_context("spawn")  # Forking breaks things.

    t0 = time.time()
    with cf.ProcessPoolExecutor(max_workers=nworker) as ppe:
        futures = [
            ppe.submit(
                threaded_getcolnp,
                ms_path,
                column,
                row,
                nrow
            )
            for row in starts
        ]
    assert(all(np.array(list(map(lambda x: x.result(), futures))) == np.array(resultRef)))
    print(f"Reading MS using processes and pooled table cache: {time.time() - t0:.3f}s")

    t0 = time.time()

    doExcept=False
    with pt.table(ms_path, lockoptions="user", ack=False) as ms:
          with cf.ThreadPoolExecutor(max_workers=nworker) as tpe:
                futures = [
                      tpe.submit(
                      threaded_getcolnp_sharetable,
                      ms,
                      column,
                      row,
                      nrow
                    )
                    for row in starts
                ]
    assert(all(list(map(lambda x: x.result(), futures))))
    print(f"Usecase failed successfully")

    with cf.ThreadPoolExecutor(max_workers=nworker) as tpe:
        futures = [
            tpe.submit(
                threaded_getcolnp,
                ms_path,
                column,
                row,
                nrow
            )
            for row in starts
        ]
    print(f"Reading MS using threads and pooled table cache: {time.time() - t0:.3f}s")
    assert(all(np.array(list(map(lambda x: x.result(), futures))) == np.array(resultRef)))

Outputs

(venvcc) > $ python threaded_tableread.py msdir/1491291289.1ghz.1.1ghz.4hrs.ms 10                                                                       
Reading MS using a single thread: 8.575s
Again: Reading MS using a single thread: 8.449s
Reading MS using threads and pooled table cache: 2.939s
Reading MS using processes and pooled table cache: 2.898s
Usecase failed successfully
Reading MS using threads and pooled table cache: 2.969s
pep8speaks commented 2 years ago

Hello @JSKenyon! Thanks for updating this PR. We checked the lines you've touched for PEP 8 issues, and found:

There are currently no PEP 8 issues detected in this Pull Request. Cheers! :beers:

Comment last updated at 2022-05-18 08:48:19 UTC
JSKenyon commented 2 years ago

@bennahugo I have applied both patches. Let me know if that is incorrect.

bennahugo commented 2 years ago

Ok thanks. That is all for now

As mentioned on casacore the next big job is to make TableProxy.h/cc threadsafe. I'm steadily working through that.

Note: ReadWrite access with multiple tables is still undefined -- there is another cache or static variable somewhere in casacore that gives a problem. Please let me know if you run into segfaults on reading in the dask-ms test suite -- it should not happen (it should give you a runtime error bound error from boost-python in that case (as demonstrated above)

bennahugo commented 2 years ago

Please apply the following patches to link up with the casacore just put up github.com/bennahugo/casacore#5859a780a7867a184cff44c389d44c4e83bcbbec

From c1a9f9ba23cfed1f6c87a2d5c93c61151f3506a6 Mon Sep 17 00:00:00 2001
From: bennahugo <bennahugo@gmail.com>
Date: Sun, 27 Feb 2022 17:43:29 +0200
Subject: [PATCH 1/4] Fixes missing pthreadlib linkage for casacore#1163

---
 README.rst | 1 +
 setup.py   | 2 +-
 2 files changed, 2 insertions(+), 1 deletion(-)

diff --git a/README.rst b/README.rst
index ed27754..ca5a409 100644
--- a/README.rst
+++ b/README.rst
@@ -76,6 +76,7 @@ On ubuntu you can install these with::
   this::

     CFLAGS="-std=c++11 \
+            -lpthread \
             -I/usr/local/Cellar/boost/1.68.0/include/ \
             -I/usr/local/include/  \
             -L/usr/local/Cellar/boost/1.68.0/lib \
diff --git a/setup.py b/setup.py
index def6ec5..52dc2c1 100755
--- a/setup.py
+++ b/setup.py
@@ -233,7 +233,7 @@ def get_extensions():
                                     library_dirs=library_dirs,
                                     include_dirs=include_dirs,
                                     # Since casacore 3.0.0 we have to be C++11
-                                    extra_compile_args=['-std=c++11']))
+                                    extra_compile_args=['-std=c++11', '-lpthread']))
     return extensions

-- 
2.17.1
From fc19b4b6e04c93716664c27818920cd8a9928467 Mon Sep 17 00:00:00 2001
From: bennahugo <bennahugo@gmail.com>
Date: Sun, 27 Feb 2022 20:42:26 +0200
Subject: [PATCH 2/4] Add converter routines for nonthreadsafe exception

---
 src/tables.cc | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/src/tables.cc b/src/tables.cc
index 37fb8dd..92733e3 100755
--- a/src/tables.cc
+++ b/src/tables.cc
@@ -32,15 +32,17 @@
 #include <casacore/python/Converters/PycValueHolder.h>
 #include <casacore/python/Converters/PycRecord.h>
 #include <casacore/python/Converters/PycArray.h>
+#include <casacore/python/Converters/PyNotThreadSafe.h>
 #include <casacore/tables/Tables/TableProxy.h>

 #include <casacore/meas/MeasUDF/Register.h>
 #include <casacore/derivedmscal/DerivedMC/Register.h>
-
 #include <boost/python.hpp>
+#include <string>

 BOOST_PYTHON_MODULE(_tables)
 {
+  casacore::python::registerNotThreadSafeException();
   casacore::python::register_convert_excp();
   casacore::python::register_convert_basicdata();
   casacore::python::register_convert_casa_valueholder();
-- 
2.17.1
From 1e26b184dd5815df0ce0ca040e9897d97853037e Mon Sep 17 00:00:00 2001
From: bennahugo <bennahugo@gmail.com>
Date: Fri, 4 Mar 2022 08:38:09 +0200
Subject: [PATCH 3/4] Add wrapper for casacore::PlainTable system methods

---
 casacore/tables/__init__.py |  1 +
 casacore/tables/table.py    |  1 -
 src/pytable.cc              | 19 ++++++++++++++++++-
 src/tables.cc               |  2 +-
 4 files changed, 20 insertions(+), 3 deletions(-)

diff --git a/casacore/tables/__init__.py b/casacore/tables/__init__.py
index c2e86ae..a20619e 100755
--- a/casacore/tables/__init__.py
+++ b/casacore/tables/__init__.py
@@ -59,6 +59,7 @@ submodule `msutil <#measurementset-utility-functions>`_

 from .msutil import *
 from .table import table
+from .tablesubsystem import *
 from .table import default_ms
 from .table import default_ms_subtable
 from .table import tablecommand
diff --git a/casacore/tables/table.py b/casacore/tables/table.py
index 1f924c8..521acd2 100755
--- a/casacore/tables/table.py
+++ b/casacore/tables/table.py
@@ -163,7 +163,6 @@ def taql(command, style='Python', tables=[], globals={}, locals={}):
 # alias
 tablecommand = taql

-
 class table(Table):
     """The Python interface to Casacore tables.

diff --git a/src/pytable.cc b/src/pytable.cc
index bdacfb7..c9a7797 100755
--- a/src/pytable.cc
+++ b/src/pytable.cc
@@ -26,6 +26,7 @@
 //# $Id: pytable.cc,v 1.5 2006/11/08 00:12:55 gvandiep Exp $

 #include <casacore/tables/Tables/TableProxy.h>
+#include <casacore/tables/Tables/PlainTable.h>

 #include <casacore/python/Converters/PycBasicData.h>
 #include <casacore/python/Converters/PycValueHolder.h>
@@ -38,9 +39,25 @@
 using namespace boost::python;

 namespace casacore { namespace python {
-
+  
+  // Not going to do the full PlainTable interface
+  // just the "System-behavioural" routines
+  class TableSubSystem {
+  public:
+    TableSubSystem() {}
+    void useTableCachePerThread() { PlainTable::useTableCachePerThread(); }
+    void useProcessWideTableCache() { PlainTable::useProcessWideTableCache(); }
+    bool isUsingTableCachePerProcess() { return PlainTable::isUsingTableCachePerProcess(); }
+  };
+  
   void pytable()
   {
+    class_<TableSubSystem> ("TableSubSystem",
+            init<>())
+      .def ("_useTableCachePerThread", &PlainTable::useTableCachePerThread)
+      .def ("_useProcessWideTableCache", &PlainTable::useProcessWideTableCache)
+      .def ("_isUsingTableCachePerProcess", &PlainTable::isUsingTableCachePerProcess);
+
     // Note that all constructors must have a different number of arguments.
     class_<TableProxy> ("Table",
             init<>())
diff --git a/src/tables.cc b/src/tables.cc
index 92733e3..68331c1 100755
--- a/src/tables.cc
+++ b/src/tables.cc
@@ -48,7 +48,7 @@ BOOST_PYTHON_MODULE(_tables)
   casacore::python::register_convert_casa_valueholder();
   casacore::python::register_convert_casa_record();
   casacore::python::register_convert_std_vector<casacore::TableProxy>();
-
+  
   casacore::python::pytable();
   casacore::python::pytablerow();
   casacore::python::pytableiter();
-- 
2.17.1
From 9cf73b118c1d701d9224b0c5272f603091eeaf83 Mon Sep 17 00:00:00 2001
From: bennahugo <bennahugo@gmail.com>
Date: Sun, 6 Mar 2022 14:52:02 +0200
Subject: [PATCH 4/4] forgot to add tablesubsystem wrapper

---
 casacore/tables/tablesubsystem.py | 73 +++++++++++++++++++++++++++++++
 1 file changed, 73 insertions(+)
 create mode 100644 casacore/tables/tablesubsystem.py

diff --git a/casacore/tables/tablesubsystem.py b/casacore/tables/tablesubsystem.py
new file mode 100644
index 0000000..c718990
--- /dev/null
+++ b/casacore/tables/tablesubsystem.py
@@ -0,0 +1,73 @@
+# table.py: Python PlainTable functions
+# Copyright (C) 2022
+# South African Radio Astronomy Observatory
+#
+# This library is free software; you can redistribute it and/or modify it
+# under the terms of the GNU Library General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or (at your
+# option) any later version.
+#
+# This library is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Library General Public
+# License for more details.
+#
+# You should have received a copy of the GNU Library General Public License
+# along with this library; if not, write to the Free Software Foundation,
+# Inc., 675 Massachusetts Ave, Cambridge, MA 02139, USA.
+#
+# Correspondence concerning AIPS++ should be addressed as follows:
+#        Internet email: aips2-request@nrao.edu.
+#        Postal address: AIPS++ Project Office
+#                        National Radio Astronomy Observatory
+#                        520 Edgemont Road
+#                        Charlottesville, VA 22903-2475 USA
+
+from ._tables import TableSubSystem
+def systemAwareOfTablesProcessWide():
+    """
+    Switch the CC API to use a table cache over the entire
+    process. This is the default behaviour of the table system,
+    meaning that the process is aware of all tables open on the
+    process. Only one PlainTable object per table may then
+    be open at any point in time. The calling process is aware of all
+    such open tables across all threads. At present the table
+    system itself is not threadsafe, but this method is provided to ensure
+    the traditionally assumed behaviour of the TableCache
+    system. This globally-aware caching has the side-effect
+    of synchronizing IO to a PlainTable (or derived object) on the process.
+    When a PlainTable is reopened in e.g. update mode any existing
+    table object within the process pointing to the same disk 
+    location is upgraded to writable as well through this system
+    WARNING :-: calling this closes all open tables across
+    Has no effect if already on a cache per process mode
+    the **process**.
+    """
+    TableSubSystem._useProcessWideTableCache()
+
+def systemAwareOfTablesPerThread():
+    """
+    New: switch the CC API to use a table cache per thread.
+    Each thread is treated as sole-custodian of the tables
+    it creates in auto locking and is unaware of tables created
+    in other threads (regardless of auto-locking mode). If autolocking
+    is used it is restricted to automatically apply within the current
+    thread.
+    This mode requires all locks allocated to be readonly to ensure
+    database consistency
+    Has no effect if already on a cache per thread mode
+    WARNING :-: calling this closes all open tables across
+    the **process*
+    """
+    TableSubSystem._useTableCachePerThread()
+
+def isSystemAwareOfTablesProcessWide():
+    """
+    Check if the table system is currently set to cache process wide
+    See also:: PlainTable::useProcessWideTableCache() and
+                PlainTable::useTableCachePerThread()
+    Warning:: boolean returned may become stale while you are checking
+    if running in threaded mode. It is strongly suggested that you set this
+    behaviour only in the main thread
+    """
+    return TableSubSystem._isUsingTableCachePerProcess()
\ No newline at end of file
-- 
2.17.1
bennahugo commented 2 years ago

Updated test case to show that failures are successfully induced for non-supported uses

import numpy as np
import threading
import pyrap.tables as pt
from pyrap.tables import tablesubsystem as tss
import os
import concurrent.futures as cf
import multiprocessing
import time
import sys
import hashlib

def threaded_getcolnp(ms_path, colname, startrow, nrow):

    tid = threading.get_ident()

    with pt.table(ms_path, lockoptions="usernoread", ack=False) as ms:
        ms.nrows()
        ref_row = ms.getcol(colname, nrow=1)  # Read an example row.
        ref_dims = ref_row.shape[1:]
        ref_dtype = ref_row.dtype

        out = np.empty((nrow, *ref_dims), dtype=ref_dtype)  # Preallocate output.

        ms.getcolnp(colname, out, startrow=startrow, nrow=nrow)

    return hashlib.md5(out).hexdigest() # We don't want to profile the cost of returning the result.

def threaded_get_and_put_colnp(ms_path, colname, startrow, nrow):
    # each thread puts into the array its own chunk which should not overlap (usernoread lock!!!!)
    tid = threading.get_ident()
    try:
        with pt.table(ms_path, lockoptions="usernoread", ack=False, readonly=False) as ms:
            ms.nrows()
            ref_row = ms.getcol(colname, nrow=1)  # Read an example row.
            ref_dims = ref_row.shape[1:]
            ref_dtype = ref_row.dtype

            out = np.empty((nrow, *ref_dims), dtype=ref_dtype)  # Preallocate output.
            testvals = np.random.randint(0, 10000, nrow)[:, None, None]
            out[...] = testvals
            ms.lock(True)
            ms.putcol(colname, out, startrow=startrow, nrow=nrow)
            ms.getcolnp(colname, out, startrow=startrow, nrow=nrow)
            ms.unlock()
            return (np.abs(out - testvals) < 1e-10).all()
    except Exception as e:
        return str(e)
    return False

def threaded_getcolnp_sharetable(ms, colname, startrow, nrow):
    try:
        ms.nrows()
        ref_row = ms.getcol(colname, nrow=1)  # Read an example row.
        ref_dims = ref_row.shape[1:]
        ref_dtype = ref_row.dtype

        out = np.empty((nrow, *ref_dims), dtype=ref_dtype)  # Preallocate output.

        ms.getcolnp(colname, out, startrow=startrow, nrow=nrow)
    except Exception as e:
        return str(e)
    return False

if __name__ == "__main__":
    tss.systemAwareOfTablesPerThread()

    ms_path = sys.argv[1]
    no_times = int(sys.argv[2])
    column = "DATA"
    nchunk = 15  # Partition the read into this many chunks.
    nworker = 8  # Number of threads/processes.

    table = pt.table(ms_path, ack=False)
    total_nrow = table.nrows()
    table.close()
    # skip last few rows if needed
    nrow = int(np.floor(total_nrow / nchunk))  # nrow per chunk.

    starts = np.repeat(np.arange(0, total_nrow, nrow), no_times)
    t0 = time.time()
    resultRef = [threaded_getcolnp(ms_path, column, row, nrow) for row in starts]
    print(f"Reading MS using a single thread: {time.time() - t0:.3f}s")

    t0 = time.time()
    result = [threaded_getcolnp(ms_path, column, row, nrow) for row in starts]
    assert(all(np.array(result) == np.array(resultRef)))
    print(f"Again: Reading MS using a single thread: {time.time() - t0:.3f}s")

    t0 = time.time()
    with cf.ThreadPoolExecutor(max_workers=nworker) as tpe:
        futures = [
            tpe.submit(
                threaded_getcolnp,
                ms_path,
                column,
                row,
                nrow
            )
            for row in starts
        ]
    assert(all(np.array(list(map(lambda x: x.result(), futures))) == np.array(resultRef)))
    print(f"Reading MS using threads and pooled table cache: {time.time() - t0:.3f}s")

    mp_context = multiprocessing.get_context("spawn")  # Forking breaks things.

    t0 = time.time()
    with cf.ProcessPoolExecutor(max_workers=nworker) as ppe:
        futures = [
            ppe.submit(
                threaded_getcolnp,
                ms_path,
                column,
                row,
                nrow
            )
            for row in starts
        ]
    assert(all(np.array(list(map(lambda x: x.result(), futures))) == np.array(resultRef)))
    print(f"Reading MS using processes and pooled table cache: {time.time() - t0:.3f}s")

    t0 = time.time()

    with pt.table(ms_path, lockoptions="usernoread", ack=False) as ms:
          with cf.ThreadPoolExecutor(max_workers=nworker) as tpe:
                futures = [
                      tpe.submit(
                      threaded_getcolnp_sharetable,
                      ms,
                      column,
                      row,
                      nrow
                    )
                    for row in starts
                ]
    errshouldbe = "casacore operation not threadsafe - object may not be shared between threads/processes"
    assert(all(list(map(lambda x: x.result() == errshouldbe, futures))))
    print(f"Usecase failed successfully: {errshouldbe}")

    tss.systemAwareOfTablesProcessWide()
    table = pt.table(ms_path, ack=False)
    t0 = time.time()
    result = [threaded_get_and_put_colnp(ms_path, column, row, nrow) 
              for row in starts[:len(starts)//int(sys.argv[2])]]
    assert(all(np.array(list(map(lambda x: x.result(), futures)))))
    print(f"Reading and writing MS using a single thread: {time.time() - t0:.3f}s")

    tss.systemAwareOfTablesPerThread()
    t0 = time.time()
    with cf.ThreadPoolExecutor(max_workers=nworker) as ppe:
        futures = [
            ppe.submit(
                threaded_get_and_put_colnp,
                ms_path,
                column,
                row,
                nrow
            )
            for row in starts[:len(starts)//int(sys.argv[2])]
        ]
    assert(all(np.array(list(map(lambda x: x.result(), futures)))))
    errshouldbe = "Error -- Table system in Threaded readonly mode. You may not request write locks at this time"
    assert(all(list(map(lambda x: x.result().strip() == errshouldbe, futures))))
    print(f"Usecase failed successfully: {errshouldbe}")
JSKenyon commented 2 years ago

@bennahugo I am going to add you to my fork - that way you can implement these patches yourself. Will make your life easier and means that I don't need to do a load of manual patching.