ebonnal / streamable

[Python] Stream-like manipulation of iterables.
Apache License 2.0
131 stars 0 forks source link

Adding support for Process based parallelism? #23

Closed matthewghgriffiths closed 2 months ago

matthewghgriffiths commented 2 months ago

This is a cool library, the way you've set it up has made it fairly straightforward to add Process based parallelism via monkey patching (see code snippet below) - but are there likely to be any issues if a pmap method was added to streamable.Stream?

from streamable import Stream
from streamable import iters, visitors, stream
from concurrent.futures import ProcessPoolExecutor

## streamable.iters

class ProcessConcurrentMappingIterable(
    iters.ThreadConcurrentMappingIterable
):
    def _context_manager(self) -> iters.ContextManager:
        self.executor = ProcessPoolExecutor(max_workers=self.concurrency)
        return self.executor

## streamable.functions

def pmap(
    transformation: iters.Callable[[iters.T], iters.U],
    iterator: iters.Iterator[iters.T],
    concurrency: int = 1, 
    # ordered: bool = True
):
    return iters.RaisingIterator(
        iter(
            ProcessConcurrentMappingIterable(
                iterator,
                transformation,
                concurrency=concurrency,
                buffer_size=concurrency,
                # ordered=ordered,
            )
        )
    )

## streamable.stream

class PMapStream(stream.DownStream[iters.T, iters.U]):
    def __init__(
        self,
        upstream: Stream[iters.T],
        transformation: iters.Callable[[iters.T], iters.U],
        concurrency: int,
        # ordered: bool,
    ) -> None:
        super().__init__(upstream)
        self._transformation = transformation
        self._concurrency = concurrency
        # self._ordered = ordered

    def accept(self, visitor: visitors.Visitor):
        return visitor.visit_pmap_stream(self)

## streamable.stream.Stream monkey patch

def _pmap(
    self, 
    transformation: iters.Callable[[iters.T], iters.U],
    concurrency: int = 1,
    # ordered: bool = True,
) -> "Stream[U]":
    return PMapStream(self, transformation, concurrency)

Stream.pmap = _pmap

## streamable.visitors.iterator.Visitor monkey patch

def _visit_pmap_stream(self, stream: PMapStream[iters.U, iters.T]) -> iters.Iterator[iters.T]:
    from streamable.visitors.iterator import IteratorVisitor

    return pmap(
        stream._transformation,
        stream.upstream.accept(IteratorVisitor[iters.U]()),
        concurrency=stream._concurrency,
        # ordered=stream._ordered,
    )

visitors.Visitor.visit_pmap_stream = _visit_pmap_stream
ebonnal commented 2 months ago

Hi @matthewghgriffiths! Thank you and great monkey patching job, happy to see that 😄!

At some point the .map operation had some concurrency_mode parameter, either THREAD or PROCESS. I removed it to simplify testing (as what works with threads can lead to serialization errors with processes), but I am definitely with you in wanting to add it back!

Questions:

Also related, it would be interesting to check how the lib behaves under the free-threaded mode coming with python 3.13, right?

matthewghgriffiths commented 2 months ago

I'm calling thread unsafe libraries (e.g. pyensembl and sqlite) so have to resort to process based parallelism.

matthewghgriffiths commented 2 months ago

I'm using the pypi version so no ordered keyword in that.

ebonnal commented 2 months ago

Ok makes perfect sense. Issue flagged as TODO. I will ping you in the future draft PR for review/co-authoring 🙏🏻

ebonnal commented 2 months ago

Heya @matthewghgriffiths I fixed a few serialization issues and added an unit-tested exprimental hook (commit):

from streamable.iters import OSConcurrentMappingIterable
from concurrent.futures import ProcessPoolExecutor
OSConcurrentMappingIterable.EXECUTOR_CLASS = ProcessPoolExecutor

It applies to .map and .foreach with concurrency > 1 for both ordered=True (first in first out) and ordered=False (first done first out).

If it makes sense for you I will add it to the next release so that you can try it out for us 🙏🏻

(We can then think about adding it properly to the Stream's interface)

ebonnal commented 2 months ago

fyi @matthewghgriffiths you can try it out after pip install streamable==1.1.0-rc.1

ebonnal commented 2 months ago

fyi #25 adds the within_processes: bool param do .map/.foreach

ebonnal commented 1 month ago

since 1.2.0: use via_processes

ebonnal commented 1 month ago

btw @matthewghgriffiths if the implementation looks good to you, I would like to add you as a co-author on this!

matthewghgriffiths commented 1 month ago

Looks good to me, though I only made a small request!

ebonnal commented 1 month ago

You brought the idea and motivation, explored the code to see how feasible it is, reviewed -> that's definitely co-authoring imo 🤝 If you agree I just need an email associated with your github