goodboy / tractor

A distributed, structured concurrent runtime for Python (and friends)
GNU Affero General Public License v3.0
258 stars 12 forks source link

Parallel generator pool thing #90

Open ali1234 opened 4 years ago

ali1234 commented 4 years ago

I would like to be able to parallelize generators (across processes). Not async generators - just normal ones.

This concept is a bit difficult to explain, but bear with me.

Consider a function like this:

def f(item):
    return item*3

This is not a generator, and it can be trivially parallelized with map semantics, eg:

for result in multiprocessing.Pool(...).imap(f, itertools.count()):
    print(result)

Now consider a function like this:

def g(iterable):
    for item in iterable:
        yield item*3

We call a generator like this directly on an iterable, eg:

for result in g(itertools.count()):
    print(result)

But in order to parallelize it with map style, we have to do something pretty ugly:

def helper(iterable):
    return next(g([iterable]))

for result in pool.imap(helper, iterable):
    print(result)

This will split the iterable into single item iterables, and call g on each one. This becomes a problem if g() is of the form:

def g(iterable):
    setup()
    for item in iterable:
        yield item*3
    teardown()

Here setup() and teardown() may take a very long time, or they may crash if run more than once in the same process. Running g() once for every item in iterable is undesirable as it may be many times slower than calling it directly on the original iterable, even if it is distributed over a pool of processes.

So I'm looking for a way to parallelize generator functions like g(), where by g() is run once in each worker process, and passed an iterable which asynchronously receives work items from the main process iterable, so it can process items indefinitely without ever having to be restarted.

This is not possible for all generator functions. It is only possible for generators of the form:

def g(iterable):
    setup()
    for item in iterable:
        yield f(item)
    teardown()

where f() is a pure function (but the others do not have to be.) I'm only interested in parallelizing generators of this form. (I call generators of this form "pure generators" in reference to the inner loop being pure, which also implies it yields exactly as many results as there are items in the input.)

The function which handles all this should itself not be async, ie it should just look like a normal generator from outside:

for result in generator_map(g, iterable, nprocs):
    print(result)

I have an implementation of this concept at:

https://github.com/ali1234/vhs-teletext/blob/master/teletext/mp.py

The highest level API is called itermap() for historical reasons, but it works just like what I have describe above. The lower level pools also allow the worker pool to be reused without recreating the workers. There are both parallel and single process implementations of the pool. In particular, ctrl-c should be correctly handled everywhere and there are also unit tests (including ctrl-c tests):

https://github.com/ali1234/vhs-teletext/blob/master/teletext/tests/test_mp.py

We also discussed the concept here: https://gitter.im/python-trio/general?at=5dd70e3aac81632e65ddb9fa

My implementation uses multiprocessing and has some weak points and ugly things in it:

(Note: I've fixed or have a path to fixing all of these in the latest version, but it still has some ugly workarounds.)

  1. Multiprocessing has an atexit handler which kills any left over daemon subprocess. Due to the extensive use of yield, it is possible for SIGINT to happen outside the pool's context, meaning workers arent shutdown cleanly. Instead the atexit runs first and terminates them, and then the pool tries to shutdown after. I have to set the workers daemon, otherwise they can be left running if the main process crashes due to similar race conditions. This is one source of deadlocks which I think I have finally managed to handle, but it is all quite ugly.

  2. The pool can detect when workers crash, but it doesn't know why (it can't access the exception.) It would be really nice if it could re-raise it on the main thread as this would further hide the fact that this is multiprocess code.

  3. There are problems with mp.Queue: if you put an unpickleable item on to one it wil raise an exception. But that exception happens on a background thread where you can't catch it. So is has to check everything before manually before putting it on the queue. If you dont do that you will think the item has been sent when it really hasn't, the sent and received counts go out of sync, and you deadlock.

Note that multiprocessing.Pool() itself suffers from all these problems, so simply refactoring my code to not use "pure generators" and instead just use Pool.imap() wouldn't actually help at all.

So for these reasons I am searching for something better.

It is perhaps worth noting that the codebase where I intend to use this is already asynchonous - but I never use the async keyword. Instead, everything is done with generators. It handles streams of packets, and pipelines are composed from generators eg:

iterable = file_source(filename)
iterable = deconvolve(iterable)
iterable = filter(iterable, pattern)
iterable = paginate(iterable)
iterable = file_sink(iterable, filename)
for result in iterable:
    pass

Some of these generators are pure, others are not. I need the generator_map() function to be able to be inserted transparently into a pipeline like the above at the appropriate place, which depends on what the pipeline does.

For example, here's a case where i use itermap() inline with a complex pipeline (it looks more complicated than the simple example above, but it is really doing the same thing):

https://github.com/ali1234/vhs-teletext/blob/master/teletext/cli.py#L292

Nearly all the subcommands of the program do this to some extent.

goodboy commented 4 years ago

@ali1234 I was thinking that this is really interesting idea for my own purposes as well since I'm working towards building out a stream based analytics system for financial data. The idea has interesting implications for processing real-time frame oriented data (like audio) where you could just keep inter-frame state in a generator's local scope as opposed managing it with more "global" variables. Also neat if the switch from parallel <-> non-parallel is simple as well.

I want to respond to a bunch of this stuff in terms of what tractor can already do to handle a lot of your missing requirements but I want to do a deeper re-read through.

I'll report back soon!

ali1234 commented 4 years ago

I'm using it to process video data. Yes - you can keep some state in the generators, but only in as much as it does not affect the output. For example, the state i set up and tear down is mostly setting up a CUDA context, compiling kernels, and loading pattern data in to GPU memory. That can and must only be done once in any given process. Putting it in to the generator instead of a separate set up function makes it far easier to only do the set up when it is necessary (ie it only happens if the pipeline contains that particular generator - the pipeline builder does not need to keep track of it).

As soon as you try to parallelize a generator like this, it can't have results that are influenced by previous results, because operations will happen in a different order (from the pov of each subprocess, some will not happen at all as they happen in a different process.)

That said, if the inter-frame state is something like "count the number of frames with property x" then you could add a thing to sum the results from each subprocess fairly easily.

ali1234 commented 4 years ago

More contretely, if you have a generator like:

def cumulative_sum(it):
    sum = 0:
    for x in it:
        sum += x
        yield sum

This generator isn't pure according to my definition, and isn't a case that I need to handle, but if the pool is aware, it can still be parallelized by just adding together the final value yielded by each subprocess.

parity3 commented 4 years ago

Have you considered switching from the generator paradigm to the synchronous coroutine paradigm? The outer layer would be simpler and not require as many queues (at the expense of the slightly weird worker function style). So you could just:


def do_the_work():
    some_context = init()
    result = None
    with some_context:  # I heard somewhere that we should beware of context in generators/coroutines but I don't exactly understand in which cases it's ok / not ok
        while True:
            task = yield result
            result = task.arg[0] + 1  # replace with actual code to produce result

coro = do_the_work()
with closing(coro):  # make sure the context inside the inner coroutine is finalized by throwing a GeneratorClosed exception in it
    next(coro)  # prime it (run the part of the coroutine up to first yield)
    for task_num, task_obj in iter(work_queue.get, None):
        response = coro.send(task_obj)
        done_queue.put((task_num, response))
ali1234 commented 4 years ago

@parity3 that seems to be pretty much what I am looking for, but I need it to also be multiprocess to leverage multiple CPUs and GPUs. I don't see any benefit to making the rest of my codebase work this way as it just adds extra complexity, so I would like it all wrapped in a function that looks like a normal generator from outside. Then it can be applied to any existing (pure) generator without having to refactor it and additionally the codebase can still work (slowly) even if some particular async library isn't available.

goodboy commented 4 years ago

Just quickly addressing the pitfalls section of your current solution:

My implementation uses multiprocessing and has some weak points and ugly things in it

Responding in order:

  1. in tractor SIGNINT is handled by the top level trio.run scheduler which in turn propagates cancel signals down the task and process trees using nursery / cancel scopes. Cancellation and error handling is deterministic so there should be no "weird teardown state" problems that arise. All sub-processes (aka actors) are joined prior to any shutdown and thus the multiprocessing atexit handler is irrelevant and should not interfere with anything by the time it runs (if using mp at all of course).

  2. In tractor all errors propagate and each layer of the process tree must expect to handle such errors or (depending on the supervisor strategy - coming soon) the entire tree will systematically error at each ActorNursery and teardown in a deterministic way just like tasks in trio. So there is no such thing as an "un-handled error happening in the background", ever (at least for local actor trees). In terms of relaying error "types" across IPC boundaries, #5 is a task to begin solving that. One other note, cancelling streams mid-iteration is something I need to test more stringently test as per #87 but, cancellation should work as expected even when a bunch of streams break all over the place.

  3. Errors like this can't go hidden in tractor due to point 2. above. That being said tractor has no intention to include a built-in object "pickling" or marshalling system as part of its principal design; there is only a minimal system in place to support the inter-actor-streaming protocol (see #58). An object passing system would be something you build on top of tractor primitives.

Regarding,

It handles streams of packets, and pipelines are composed from generators eg

iterable = file_source(filename)
iterable = deconvolve(iterable)
iterable = filter(iterable, pattern)
iterable = paginate(iterable)
iterable = file_sink(iterable, filename)
for result in iterable:
pass

The generator composition here is why I think a reactive style api for building so called "pipelines" (really in the functional world these are just plain old compositions) is probably the best approach. There's good motivation (streamz and rxpy) for why such systems exist and how generators / iterators are insufficient for more complex flows. I want to dig into this more later on but for now just know support for a declarative style streaming system is definitely something tractor will be addressing in the future.

ali1234 commented 4 years ago

Sounds good. Re 3., my streams are basically pure data - ints, strings, tuples, and numpy arrays. Nothing that should be particularly difficult to serialize. I do need it to be fast though.

And Re streamz, as I mentioned, I looked at it the other day. It looks like a fine API, but they have a rationale that says "generators quickly become cumbersome" without actually explaining why, and I don't think they are cumbersome at all.

The benefit of using generators is that you can use them with a large number of other libraries, eg tqdm by just inserting a tqdm object anywhere in the pipeline:

iterable = file_source(filename)
iterable = tqdm(iterable)
iterable = deconvolve(iterable)
iterable = filter(iterable, pattern)
iterable = paginate(iterable)
iterable = file_sink(iterable, filename)
for result in iterable:
    pass

... and now I've got a progress bar with no other changes. Notice that file_source could return an object with or without __len__ depending on whether it is a file on disk or standard input. tqdm will handle it appropriately, meaning I get an estimated time-to-complete if it can be calculated. How can I do that with streamz or rxpy or some other API?

ali1234 commented 4 years ago

I just completely rewrote mp.py to use a pipe per worker instead of a single pair of shared queues. This makes the code much simpler and removes pitfalls 1 and 3 entirely. Still can't tell why a worker crashed, but it is much easier to detect it happened since the pipe will die and raise an error. That means most of the shared events are no longer needed. It still has the same apply API.

The best part is it made my slowest pipeline 10% faster, and the "no op" demo at the bottom of mp.py runs about 2.5x as fast. So the shared queues must have had quite a lot of overhead, for apparently little benefit and a lot of headaches.

goodboy commented 4 years ago

@ali1234 sweet, yes this looks much easier to understand now.

The reactive stuff becomes useful when you want to do forking and merging of streams (obviously not used in your case) as well as time based operations.

I will take a look at implementing a version with tractor hopefully sometime today if not this week.

ali1234 commented 4 years ago

I think I've only used itertools.tee once anywhere in the program. I do merge streams (sort of) in service.py, but it should be done according to very specific rules (which I don't currently follow, that will be implemented later.)

New mp.py turned out to be broken on Windows because multiprocessing.Pipe has a 8192 byte buffer by default (vs 64K on Linux), which isn't big enough for more than 3 of my objects (and I try to keep a few in the buffer to avoid underruns). Windows Pipe.send blocks if the buffer is full, which means the pool can deadlock if worker and main are both trying to write the pipe at the same time. Luckily the buffer can be expanded which seems to resolve the problem. It's a workaround though. For a general pool, someone will always want to send a bigger object.

ali1234 commented 4 years ago

Go a proper fix for the blocking pipes: use threads. This allows everything to continue (slowly) even if the pipe blocks in both directions, as the reader thread will eventually unblock it. It also allows the pool to block on pipe.recv instead of polling which is faster and also means workers use no CPU when waiting for work. It doesn't quite make up for the overhead of the extra queues, but it is still faster than multiprocessing.Queue. I even have a path to communicating worker exceptions now - I can put them in the sentinel object returned to the main process, which is how worker crashes are signalled now.

parity3 commented 4 years ago

Not sure if this helps anything or not, but I remember seeing this project a while back:

https://github.com/KholdStare/generators-to-coroutines

It actually provides a decorator which takes any iterator function and ast-transforms it into a coroutine! It could help in certain scenarios especially if using a heavy-weight construct like a thread/queue to get around this issue.

goodboy commented 3 years ago

Though not for the same type of work (this is targetting IO bound) the asyncio-buffered-pipeline has a "composition" api for doing something similar on a single process.

goodboy commented 3 years ago

@ali1234 sorry to have left this for so long but we do indeed need something like this for piker, and it will likely be implemented using a shared memory system that is now in test. I will ping you if you'd like as that progresses to a usable state and it will likely be published as a separate project.

ali1234 commented 3 years ago

Please do.

I ended up rewriting my code with ZMQ. Not sure if that was before or after we discussed it. It is slightly more portable but still has many of the same issues, mostly involving deadlocks and IPC.

goodboy commented 3 years ago

@ali1234 nice. Link to the new code if you don't mind (I'm sure I can also find it if need be).

I will definitely keep you in the loop. The new sub-system will hopefully get broken out into a new repo within the next few weeks.

ali1234 commented 3 years ago

New code is at the same link: https://github.com/ali1234/vhs-teletext/blob/master/teletext/mp.py

The API is the same. It just uses ZMQ sockets instead of multiprocessing shared memory objects.

goodboy commented 3 years ago

As bump for this issue after discussion in chat.

import time
from itertools import count

from multiprocessing import Pool
from functools import partial

already_called = False

def work(it, a, b):
    global already_called

    if already_called:
        raise AssertionError("You can only call work() once per process.")

    already_called = True

    time.sleep(1)

    for x in it:
        time.sleep(0.1)
        yield (a*x)**b

def helper(x, args):
    return next(work([x], *args))

def parallel(f, it, *args):
    with Pool(1) as p:
        yield from p.map(partial(helper, args=args), it)

def notparallel(f, it, *args):
    yield from f(it, *args)

if __name__ == '__main__':

    parallel(work, count(), 3, 2)
    notparallel(work, count(), 3, 2)

Further comments:

pipes work fine on linux and fail on windows. zmq seems to be the other way around oh yeah also notice that itermap is supposed to be ordered so you have to implement the denumerate/renumerate trick. that isn't shown in the problem statement, i should add it that's easily done with a queue and a couple of wrappers, you can check mp.py for that updated comments with links to my previous failed attempts (code is the same) https://paste.ubuntu.com/p/HSzbC2XJBY/ fixed links, oops https://paste.ubuntu.com/p/nn5wtzZjC5/

goodboy commented 3 years ago

Linking to @richardsheridan's map_concurrently_in_subthread_trio.py which may allow for keeping a sync api up front.

ali1234 commented 3 years ago

Best to paste the code from the third and final link, because the original one I shared was half finished and doesn't work. And those pastebins will expire in a month I think.

goodboy commented 3 years ago

@ali1234 ur wish is me command:

from itertools import count, islice
import time
import timeit

from multiprocessing import Pool
from functools import partial

already_called = False

def work(it, a, b):
    global already_called
    if already_called: # simulate not thread safe
        assert False, "You called work() more than once in the same process."
    already_called = True
    time.sleep(0.1) # simulate slow set up.
    for x in it:
        time.sleep(0.01) # simulate work per work item.
        yield (a*x)**b

def notparallel(procs, f, it, *args):
    # procs is ignored because this is notparallel
    global already_called
    already_called = False
    yield from f(it, *args)

def parallel(procs, f, it, *args):
    # only modify the body of this function
    yield from notparallel(procs, f, it, *args)

if __name__ == '__main__':

    procs = 2

    start = time.time()
    a = list(islice(notparallel(procs, work, count(), 3, 2), 100))
    atime = time.time() - start

    start = time.time()
    b = list(islice(parallel(procs, work, count(), 3, 2), 100))
    btime = time.time() - start

    # check we got the same result
    assert(all(x == y for x, y in zip(a, b)))

    print(atime, btime)

    # check it was actually faster
    assert(btime < atime)

# Task: Rewrite parallel() so that it runs faster
# using parallel processing. It must not be slower
# than notparallel() for any pool size:
#      1 < N < number of available processors

# You may not modify the interface of (not)parallel().

# Note: parallel() must return results in the same order as
# the original iterable.

# You may not modify work() or count().

# work() performs one-time setup which takes a long time.
# and may crash if run more than once in the same process.

# Work items come from an iterable stream of possibly
# infinite length. This is why count() has been chosen
# as a source of example data.

# You may use the multiprocessing library, as long as
# you have never told me not to use it for this task,
# then you have to use something else.

# Your implementation must work on Windows, Mac, and Linux.

# Your implementation must exit cleanly if the user presses
# ctrl-c exactly once.

# Optional extra: make it work over multiple machines
# in a cluster.

if False:

# Below: solutions which have previously been proposed
# but which fail to satisfy the requirements:

# 1. multiprocessing.Pool.map:

    def helper(x, args):
        return next(work([x], *args))

    def parallel(procs, f, it, *args):
        with Pool(2) as p:
            yield from p.imap(partial(helper, args=args), it)

# It fails for at least three reasons: it calls work() more
# than once per process which causes worker processes to crash,
# it runs up to 10x slower than notparallel(), and it can
# deadlock when pressing ctrl-c.

# 2. multiprocessing with self-managed subprocesses:

# https://github.com/ali1234/vhs-teletext/blob/8eff82ba57cc8e7c2798cd7f9730456d61e60356/teletext/mp.py

# This sometimes deadlocks when user presses ctrl-c on windows.
# This seems to be due to use of OS pipes.

# 3. multiprocessing with zmq instead of OS pipes:

# https://github.com/ali1234/vhs-teletext/blob/446b93b6ebcfe011895e63ff53d40a7539519c4d/teletext/mp.py

# This sometimes deadlocks on linux when pressing ctrl-c.
# I have no idea why.