nmlorg / nsync

Bridge code to let non-async programs use async-compatible libraries.
0 stars 0 forks source link

examples/async7.py: Handle background tasks and task groups #2

Open nmlorg opened 2 days ago

nmlorg commented 2 days ago

I think the last major pieces are background tasks and task groups, both of which can be thought of as adding an awaitable to a gather while it's running/after it's been constructed — that is:

async def main():
    create_task(token1)
    create_task(token2)

ret = run(main())

is essentially:

ret = run(gather(main(), token1, token2))[0]

just with token1 and token2 being added to the gather while main() is executing.

  In async1–6, the "top level" run by the executor (sync_await) is a coroutine instance (in async56 it's wrapped in CoroutineWrapper): https://github.com/nmlorg/nsync/blob/9c88731e19283789f01708afd2ea4445c4a69bf7/examples/async6.py#L248-L257

  I think the executor should wrap the raw coroutine instance in a GatherToken, then wrap that in a GatherWrapper, and "run" that. So:

def sync_await(coroutine):
    coro = _BaseWrapper.wrap(None, coroutine)  # CoroutineWrapper(None, coroutine)

    while not coro.finalized:
        process_awaitables(coro.get_waiting_for())

    return coro.value

would become something like:

def sync_await(coroutine):
    dummy_gather = GatherToken(coroutine)
    top_level = _BaseWrapper.wrap(None, dummy_gather)  # GatherWrapper(None, [coroutine])

    while not top_level.finalized:
        process_awaitables(top_level.get_waiting_for())

    return top_level.value[0]

  That top-level GatherWrapper would be made available globally somehow, such that a global create_task(awaitable) would just append awaitable to top_level.awaitables.

Since you can (and often should!) await background tasks, this would mean awaitables might legitimately start showing up in multiple places:

async def f():
    read1 = ReadToken(sock1)
    create_task(read1)
    await something_else()
    await read1

being:

top_level = GatherWrapper():
    awaitables = [
        CoroutineWrapper(coroutine):
            _waiting_for = ReadWrapper(read1)
        ReadWrapper(read1)
    ]

during the second await.

So:

  1. we don't want to call (and block on) sock1.recv() twice, and
  2. we need to either:
    1. allow process_awaitables to finalize multiple awaitables for a single read operation, or
    2. both:
      1. allow _BaseWrapper.wrap(awaitable) track and reuse wrappers and
      2. allow _BaseWrapper.finalize to signal multiple parents.

top_level = GatherWrapper():
    awaitables = [
        CoroutineWrapper(coroutine):
            _coro = coroutine
            _waiting_for = None
    ]
    read1_token = ReadToken(sock1)
    read1_task1 = create_task(read1_token)
top_level = GatherWrapper():
    awaitables = [
        CoroutineWrapper(coroutine):
            _coro = coroutine
            _waiting_for = None
        read1_task1 = ReadWrapper(read1_token)
    ]
    await read1_token  # Is this allowed?
top_level = GatherWrapper():
    awaitables = [
        CoroutineWrapper(coroutine):
            _coro = coroutine
            _waiting_for = [read1_task2 = ReadWrapper(read1_token)]
        read1_task1 = ReadWrapper(read1_token)
    ]

in which case process_awaitables will have to figure out that two entries in waiting_for.readers point to the same socket, then call reader.finalize(data) for both of them. So you'd need to dedupe:

    rlist = [reader.sock for reader in waiting_for.readers]

and change:

        for reader in waiting_for.readers:
            if reader.sock in rlist:
                data = reader.sock.recv(65536)
                reader.finalize(data)

to something like:

        for sock in rlist:
            datas[sock] = sock.recv(65536)
        for reader in waiting_for.readers:
            if (data := datas.get(reader.sock)) is not None:
                reader.finalize(data)

― or ―

    await read1_task1  # Waiting for the task rather than waiting the token a second time.
top_level = GatherWrapper():
    awaitables = [
        CoroutineWrapper(coroutine):
            _coro = coroutine
            _waiting_for = read1_task1
        read1_task1 = ReadWrapper(read1_token)
    ]

in which case all of the magic would be in _BaseWrapper.wrap, presumably crawling through top_level recursively looking for something that already points to awaitable (meaning, among other things, all of the …Wrapper subclasses need to actually hold on to references to their tokens!) and just adding the new parent to its _parents (a list, versus the current _parent). I think everything else would stay the same, but finalize would need to call step on each parent, which might complicate things (need to make sure a step doesn't cause an awaitable to finalize before it gets stepped a second time; I'm not sure if that's actually possible yet...).

  The second approach (_BaseWrapper.wrap deduping, with wrappers having multiple parents) feels more correct than the first (deduping sockets/calls to recv around select.select), but might end up being unmaintainably complicated.

(Note that await read1_task1 could be made to work like await read1_token by having _BaseWrapper.wrap just explicitly clone a wrapper if it's given one.)

nmlorg commented 2 days ago

Initial implementation (of the first approach (deduping in process_awaitables)), running:

async def async_fetch(delay):
    await SleepToken(delay)
    sock = socket.create_connection(('httpbin.org', 80))
    sock.sendall(b'GET /status/200 HTTP/1.0\r\n\r\n')
    await ReadToken(sock)
    return f'async_fetch({delay})'

async def async_read():
    create_task(async_fetch(.1))
    create_task(async_fetch(.2))
    return 'async_read done!'

ret = sync_await(async_read())
log('ret =', ret)

using:

class GatherWrapper(_BaseWrapper):
    def add(self, awaitable):
        task = _BaseWrapper.wrap(self, awaitable)
        self._awaitables.append(task)
        return task

finalizes as:

0.758                         [finalize:88] self = <__main__.CoroutineWrapper object at 0x7881ad16d690> value = async_fetch(0.2)
0.758                             [step:206] self = <__main__.GatherWrapper object at 0x7881ad16d600>
0.758                                 [finalize:88] self = <__main__.GatherWrapper object at 0x7881ad16d600> value = ['async_fetch(0.1)', 'async_fetch(0.2)', 'async_read done!']
0.758 [main:285] ret = async_fetch(0.1)

because _BaseWrapper.wrap(self, coroutine) calls CoroutineWrapper(self, coroutine): https://github.com/nmlorg/nsync/blob/9c88731e19283789f01708afd2ea4445c4a69bf7/examples/async6.py#L153-L160

That initial manual step (self._send(None)) causes the two tasks to be scheduled before the actual top-level coroutine is added to top_level.

This initially also caused a problem in sync_await, where:

def sync_await(coroutine):
    dummy_gather = GatherToken(coroutine)
    top_level = _BaseWrapper.wrap(None, dummy_gather)
    create_task.add = top_level.add

didn't work because GatherWrapper wrapped coroutine, and CoroutineWrapper tried to immediately step it, so create_task(async_fetch(.1)) was evaluated before create_task.add was assigned. I ended up using:

def sync_await(coroutine):
    top_level = GatherWrapper(None, ())
    create_task.add = top_level.add
    create_task(coroutine)

which ensures coroutine is not wrapped (and so not initially stepped) before the create_task machinery is hooked into top_level, but I don't see an obvious analog for the misordering problem :/

  For what it's worth:

async def async_read():
    create_task(async_fetch(.1))
    create_task(async_fetch(.2))
    await SleepToken(.05)
    create_task(async_fetch(.3))
    return 'async_read done!'

finalizes as ['async_fetch(0.1)', 'async_fetch(0.2)', 'async_read done!', 'async_fetch(0.3)'], so it's not just changing return top_level.value[0] to return top_level.value[-1] or anything.

  I suspect it's going to end up being that we need to not step in CoroutineWrapper.__init__, and instead leave that initial step to the first pass of the process_awaitables loop. It's just... right now it feels like we're doing something special (step with a dummy value rather than the finalized value of an awaitable) in a special place (wrapper initialization), but moving that into the loop would be doing a one-time special case in a repeated area, which... feels wrong :slightly_frowning_face:.

nmlorg commented 2 days ago

(For what it's worth, I haven't come up with any way to make create_task work without some kind of global state, which gets undone/redone every time the executor is instantiated (and so running multiple executors in different threads of the same interpreter would step on each other). This really defies a part of Python I really liked:  self was an explicit argument to method calls, there was no magic this in the global namespace that meant something different in different places.

Maybe that dummy step should have actually been used to pass the executor into the running coroutine, i.e. coro.step(None) should have been coro.step(loop), where that value was made available to the async function somehow, and all the functions like asyncio.create_task would be methods on whatever was passed in. So:

async def mycoro(a, b):
    asyncio.create_task(othercoro(a, b))

mycoro(1, 2).send(None)

would have been:

async def mycoro(loop, a, b):
    loop.create_task(othercoro(a, b))

mycoro(1, 2).send(asyncio)

or maybe:

async def mycoro(a, b, loop=None):
    loop.create_task(othercoro(a, b))

mycoro(1, 2).send(asyncio)

or even (since it is using coro.send):

async def mycoro(a, b):
    loop = await None
    loop.create_task(othercoro(a, b))

mycoro(1, 2).send(asyncio)

Anyway...)