vxgmichel / aiostream

Generator-based operators for asynchronous iteration
http://aiostream.readthedocs.io
GNU General Public License v3.0
801 stars 34 forks source link

Trio support #25

Closed miracle2k closed 4 years ago

miracle2k commented 6 years ago

I wanted this, so I largely ignored the discussions in #17 regarding context vars, performance etc, and tried to build a version that works.

The one part that is missing here is the StreamerManager, because I couldn't see a straightforward way of moving the event-loop specific parts out of it. Maybe each backend has to subclass it or something.

coveralls commented 6 years ago

Coverage Status

Coverage decreased (-3.009%) to 96.991% when pulling 05965d0ba7789deec4c117ffce487d09652ce718 on miracle2k:trio into 755ee1897cb3ac36c066c8251538f195435f93b2 on vxgmichel:master.

vxgmichel commented 6 years ago

Thanks for pushing this issue forward @miracle2k, that's great work!

The one part that is missing here is the StreamerManager, because I couldn't see a straightforward way of moving the event-loop specific parts out of it. Maybe each backend has to subclass it or something.

Yea this seems pretty tricky. I guess it requires some kind of nursery that has the ability to wait for the next task to complete.

Another tricky thing is to parameterize the tests to run with both asyncio and trio. I can have a look at that.

I was also waiting for the hyperio release, since it might implement most of the compatibility layer for us (see the discussion on the trio gitter).

How soon do you need this PR to be merged?

miracle2k commented 6 years ago

Yea this seems pretty tricky. I guess it requires some kind of nursery that has the ability to wait for the next task to complete.

I implemented this just now as well, though just as a proof of concept; the code is very unclean, und it will break the tests as well because it's kind of hardcoded into there. But I am unsure what is the best way to integrate this into the "backend" concept). As you can see, there is a lot of duplication in the base class. We might need to refactor the base class to provide better hooks. And is a custom per-backend subclass of StreamManager even the right approach?

For trio, we need to start a nursery. Would the compatibility layer need to recreate a trio-like object structure for asyncio, so that StreamManager can call, say, setup_concurrency_handler() which opens a nursery on trio and does nothing on asyncio, and returns a nursery-like interface on both?

Maybe that would be what hyperio is essentially? That's the open question.

I don't need it to be merged soon, I'll run off the branch for now.

vxgmichel commented 6 years ago

I implemented this just now as well, though just as a proof of concept; the code is very unclean, und it will break the tests as well because it's kind of hardcoded into there.

Great! I don't think I'll have the time to take care of the tests before the end of the next week though, sorry about that.

And is a custom per-backend subclass of StreamManager even the right approach?

Would the compatibility layer need to recreate a trio-like object structure for asyncio, so that StreamManager can call, say, setup_concurrency_handler() which opens a nursery on trio and does nothing on asyncio, and returns a nursery-like interface on both?

My first intuition would be to abstract and improve the task management instead, and build the stream management on top of it. Something like:

class TaskManager:
    def __init__(self, task_limit=None):
        ...

    @property
    def full(self):
        ...

    async def __aenter__(self):
        ...

    async def __aexit__(self, *args):
        ...

    def schedule(self, coro):
        ...
        return task

    async def cleanup(self):
        ...

    async def completed(self):
        yield task

and have separate implementations for each of the supported loop.

Maybe that would be what hyperio is essentially? That's the open question.

That's what I'm wondering about too, I guess we'll have the answer next week.

vxgmichel commented 6 years ago

Hi @miracle2k,

I just refactored the streamer manager, and I moved the asyncio-specific code to a dedicated class. This should help a lot with trio support. Feel free to review PR #32 if you want to.

I've also been playing with trio channels and after reading Select Statement Considered Harmful I realized that base_combine could be written in a much nicer way using nurseries and channels. I think I'll experiment with that once anyio supports channels.

I'm also considering pytest.mark.anyio for the tests, I'll let you know how it goes.

vxgmichel commented 4 years ago

@miracle2k Thanks for your work here, it definitely helped me find the motivation to move things forward. It took me a while but I finally came up with a cross-library implementation I'm satisfied with :)

It's happening here: #58