fusedio / fused-local

MIT License
1 stars 0 forks source link

Try a WorkerContext? #1

Open richardsheridan opened 4 months ago

richardsheridan commented 4 months ago

Hi, is there any reason that this wrapper can't be replaced with a WorkerContext? https://github.com/fusedio/fused-local/blob/9733b4aba2b0221f07d6c2c92762e0ea8df6af32/src/fused_local/workers.py#L24-L34 If there's something I could do to make my feature a little more usable or better documented, I'd like to hear it. :)

gjoseph92 commented 4 months ago

Hey @richardsheridan, impressed you noticed this! I'm traveling right now btw, so my replies may be somewhat limited until next week.

Short story is that this really is a wrapper around a WorkerContext. In order to "restart" the pool, we just close and release the current WorkerContext, then start a new one and point the reference to that. I found it sufficiently documented to figure that out at least :)

I would say that if the WorkerContext exposed public methods for manually controlling the scaling, that would have saved having to write this! Then we could keep the same context the whole time, and presumably await scale(0); await scale(n) to restart. That might also be helpful because I found that when workers crashed, there was no way to know that had happened, and to replace them.

On the subject of wishlists, it would also be great to have a way to cancel tasks on workers less violently than SIGKILL. I'd love for cancellation to propagate into the worker process as Cancelled or SIGINT, so workers could stop doing what they're doing and pick up new work without the latency of spinning up a new process.

Overall though, thanks for trio-parallel, I found it very helpful and usable!

richardsheridan commented 4 months ago

Cool! Let me break this down, in reverse order.

Actually, SIGINT can work as you describe by running signal.signal(signal.SIG_DFL) in your init function and keeping cancellable=False, as well as creating a trio signal handler so your parent program doesn't end. I considered the behavior a bug before, but maybe it was secretly a feature, as long as you remember to catch the exception... maybe I'll put this recipe in the docs.

I don't have a generic way to run code that respects trio-style cancellation. I don't want to maintain something that complex, so if you really need that, you should try tractor.

Worker scaling is "automatic" based on the way LIFO caching interacts with the idle_timeout, to support arbitrary concurrency patterns. Consider several independent "submit loops" using the same context, but with different limiters. If all the loops are suddenly running, it'll scale up to whatever their summed limit is, and if a few run out of work for several minutes, the idle workers will time out and scale down to only what is still in use.

If on the other hand you really want a specific, constant number of workers, you can just do partial(ctx.run_sync, limiter=limiter) and save yourself a class.

Finally, consider retiring workers instead of restarting the pool if what you need is fresh processes. Still, restarting the pool is also an intended use-case, but a trionic recipe that doesn't touch internal methods would look more like this:

def _worker_init():
    print(f"worker init {os.getpid()}")
    import_user_code(USER_CODE_PATH)

def _handle_changed_file(file):
    print(f"changed {file} in worker {os.getpid()}")

def _drain_and_merge_batches(batch):
    batch = set(batch)
    try:
         while True:
            batch |= change_recv.receive_nowait()
    except trio.WouldBlock:
        return batch

async def submit_batch(batch):
    async with trio_parallel.open_worker_context() as ctx, trio.open_nursery() as nursery:
        for changed_file in batch:
            nursery.start_soon(
                ctx.run_sync,
                _handle_changed_file,
                changed_file,
                limiter=limiter,
                ),
            )

if __name__ == "__main__":
    import trio
    import trio_parallel
    from fused_local.user_code import (
        import_user_code,
        watch_with_channel,   # highly recommend to make this
    )

    USER_CODE_PATH = Path.cwd() / "example.py"
    N_WORKERS = 2

    change_send, change_recv = trio.open_memory_channel(float("inf"))
    limiter = trio.CapacityLimiter(N_WORKERS)

    async def main():
        async with trio.open_nursery() as nursery:
              nursery.start_soon(
                  watch_with_channel, USER_CODE_PATH, change_send
              )

              for changed_file_batch in change_recv:
                  changed_file_batch = _drain_and_merge_batches(changed_file_batch)
                  await submit_batch(changed_file_batch)
                  # or maybe you're hasty
                  # something_to_cancel_previous_batch()
                  # nursery.start_soon(submit_batch, changed_file_batch)
    try:
        trio.run(main)
    except* KeyboardInterrupt:
        print("shut down")

Not tested or anything, but it gives you the shape of things. You could also wire up a cancelscope to handle early stopping for rapid changes if handling changes takes a long time.