jgehrcke / gipc

gevent-cooperative child processes and inter-process communication
https://gehrcke.de/gipc
MIT License
83 stars 13 forks source link

Bounded process pool? #26

Open jgehrcke opened 7 years ago

jgehrcke commented 7 years ago

Originally reported by: jaddison (Bitbucket: jaddison, GitHub: jaddison)


Is gipc compatible with a pool scenario? I want to take web requests to resize images - the resizing of images likely fits well into a separate process, given the GIL.

That said, I do not want uncontrolled process spawning. Is it possible to achieve something like gevent.pool.ThreadPool like upper limit pool sizing? Along the order of:

On initialization, create a process pool size of 20. Handle incoming web request, ask the pool to perform image resizing function and cooperatively wait for the result Once the result is returned from the process, return it to the web client Handle many incoming requests, but do not perform more than 20 image resizes at a time because the process pool is bounded

If this is possible, can the processes be reused to save on process initialization overhead?

The docs make it seem like gipc is great for spawning processes and waiting, but in an uncontrolled fashion? Or am I misunderstanding? Thanks!


jgehrcke commented 6 years ago

This is now tracked here @jaddison (moved from BitBucket).

jaddison commented 6 years ago

@jgehrcke I believe the combination of ProcessPoolExecutor with asyncio does exactly this - of course, in the async/await style of asyncio - which is where I am focusing my attention.

So, at the moment, I do not need this particular feature in gipc, although I have another project that would benefit if this should get implemented.

Thanks for picking up the reins again! šŸš€

ifiddes commented 4 years ago

Any update on this?

ifiddes commented 4 years ago

To add detail to my use case:

I am using gipc to create a split-run-join paradigm. Within a gevent WSGI environment, I need to spawn some number of jobs that are long running that need to all return results to the original spawn. My current wrapper looks something like this:

def wrapper(cend):
    """
    Wrapper that makes functions act normally.
    """
    fn, args = cend.get()
    cend.put(fn(*args))

def gipc_starmap(fn, arg_list):
    """
    Driver function that maps each arg in arg_list on to fn, and waits for the functions to finish.
    """
    workers = []
    for args in arg_list:
        cend, pend = gipc.pipe(duplex=True)
        p = gipc.start_process(wrapper, (cend,))
        pend.put((fn, args))
        workers.append([p, pend])
        log.debug("Spawned job")
    results = []
    log.debug("Joining jobs")
    for worker, pend in workers:
        worker.join()
        results.append(pend.get())
        pend.close()
    return results

So, gipc_starmap is given a function and a list of arguments, these functions are ran in parallel, and the return values are returned once all jobs are done.

I am looking for a way to cap the number of simultaneous greenlets this would spawn, because as-is it is unbounded.