gatoatigrado / vimap

Variations on imap
Other
12 stars 7 forks source link

Build Status

Variations on imap, not in C

The vimap package is designed to provide a more flexible alternative for multiprocessing.imap_unordered. (You should read multiprocessing documentation if you haven't already, or else this README won't make sense!) It aspires to support HTTP-like clients processing data, though contains nothing client-specific.

What in particular makes it more flexible?

What do we aspire to do better than the regular multiprocessing library?

Other features / design decisions:

Defining worker functions

vimap provides its custom initialization and such via decorated functions. If your inputs are HTTP requests, and you want to get responses from any of a set of servers, you could express your program as such (using the requests HTTP library -- it's intuitive so you probably don't need to read its documentation),

import vimap.worker_process
@vimap.worker_process.worker
def send_reqests_worker(requests, server):
    s = requests.Session()
    for request in requests:
        yield s.post('http://{0}{1}'.format(server, request.uri), data=request.data)
    s.close()

What is happening? When the worker processes start up, a new session is opened. Each request (some pickleable object containing a .uri and a .data), sent by the parent process, is posted to the server. Then, the worker yields a single response, and this response is sent back to the parent process.

imapping data from the parent process

Let's continue the example,

import vimap.pool
pool = vimap.pool.fork(send_requests_worker.init_args(server=server) for server in my_servers)

This initializes a pool of workers. Each one gets a bound argument server. When the worker processes start up, they start running until they try to pull an element off of the requests iterator; then they must pause for the parent process to send data. The parent process can send data like so,

Request = namedtuple("Request", ["uri", "data"])
pool.imap(Request(**ujson.loads(line)) for line in fileinput.input()).block_ignore_output()

This reads lines from a file containing JSON input, and sends the loaded entries to the workers. In the real world, you'd probably want to make the workers do the JSON loading. The .block_ignore_output() will cause the entire iterable (input file) to be read, and [by default] close the pool after it's done.

variations on imap

The input binder

The first Variation on Imap tuples inputs with outputs. So, you have some [lazy] iterable of inputs,

x1, x2, x3, x4, x5, ...

and when you vimap this with some function f, you get back tuples,

(x1, f(x1)), (x2, f(x2)), ...

possibly not in order. Since it's streaming, one shouldn't need to keep around inputs for long -- the input binder will keep around O(# processes) inputs, hence it's safe to iterate through large inputs. In code, this could look like,

for input, output in pool.imap(iterable).zip_in_out():
    results[input] = output
# do some more processing

Handling exceptions

If you want to gracefully handle exceptions that bubble up to the main function of your worker processes, you can request that vimap yield back to you any exceptions it receives from the workers.

for input, output, typ in pool.imap(iterable).zip_in_out_typ():
    if typ == 'exception':
        print('Worker had an exception:')
        print(output.formatted_traceback)
    elif typ == 'output':
        print('I got some actual output from a worker!')
        print(output)

output will be an ExceptionContext namedtuple if the return type is exception; those contain a value of the exception raised and formatted_traceback string of the traceback that would have been printed to stderr.