FRBs / sigpyproc3

Python3 version of Ewan Barr's sigpyproc library
https://sigpyproc3.readthedocs.io
MIT License
14 stars 11 forks source link

Feature: buffered file reads #33

Closed ewanbarr closed 5 months ago

ewanbarr commented 6 months ago

Issue

Currently the file IO infrastructure in sigpyproc is limited by the fact that each read of the file creates a new buffer. Calls to the sigpyproc.io.fileio.FileReader.cread(...) function can result in two allocations, one for the data read from file and another for the unpacking buffer used in the case of 1, 2 and 4 bit data. While Python may elide some of the performance cost of these buffer allocations via caching, the behaviour is unpredictable.

To reduce the memory performance issues from from these re-allocations and to open up interoperability between sigpyproc and tools like pycuda and torch, it would be useful to have more fine-grained control over some of the buffer allocations.

Say we wish to build a sigpyproc pipeline that uses torch. To enable asynchronous memcopies between the host and GPU it is necessary to page-align, lock and register each memory buffer ("pinning" in CUDA parlance). Currently, to do this with a cread() we need to pin a new buffer on each loop. Pinning has a very heavy overhead and so should be avoided at all costs. The general strategy is to pin a buffer once at the beginning of a program and reuse that buffer.

Feature request

I suggest that the read_plan interface (at least on FilReader but maybe elsewhere) be updated to take an allocator method. The allocator method should take a number of bytes as an argument and return a object that exports the Python Buffer Protocol interface (PEP 3118), e.g.

# Simple bytearray allocator (probably the default allocator)
def bytearray_allocator(nbytes) -> Buffer:
    return bytearray(nbytes)

# A torch pinned memory allocator
def pinned_allocator(nbytes) -> Buffer:
    buffer = bytearray(nbytes)
    cudart = torch.cuda.cudart()
    tensor = torch.frombuffer(buffer, dtype=torch.int8)
    r = cudart.cudaHostRegister(tensor.data_ptr(), tensor.numel() * tensor.element_size(), 0)
    if not r.success:
        raise RuntimeError(f"Unable to pin memory buffer: {r}")
    return buffer

The new call signature for read_plan would look like:

    def read_plan_buffered(
        self,
        gulp: int = 16384,
        start: int = 0,
        nsamps: int | None = None,
        skipback: int = 0,
        description: str | None = None,
        quiet: bool = False,
        allocator: Callable[[int], Buffer] = None,
    ) -> Iterator[tuple[int, int, np.ndarray]]:

The semantics of the call would remain the mostly same. The main difference now being that the ndarray returned on each iteration is the same ndarray just containing different data. This could have some side affects if the behaviour is not understood, e.g. if I push the array from each loop to a list then I end up with a list containing only references to the same object, where updating one, updates all.

Other parts of the codebase that would need to change would be:

pravirkr commented 5 months ago

Hi @ewanbarr, thanks for the PR. This is a cool feature to have.

Merging the PR now. I will also make the read_plan_buffered() default plan.