agronholm / anyio

High level asynchronous concurrency and networking framework that works on top of either trio or asyncio
MIT License
1.76k stars 134 forks source link

Write guide for porting asyncio libraries to AnyIO #255

Open agronholm opened 3 years ago

agronholm commented 3 years ago

This should cover at least the following:

Bluenix2 commented 2 years ago

Dealing with the lack of Event.clear()

Out of all the things this is actually what I am having the most issues with. I had to change the structure of the ratelimit system.

See I had a locking mechanism for rate limiting. This was essentially a weakly referenced dictionary where locks were lazily created.

When the locks were created they were given an event that was always set and before acquiring the underlying lock they would .wait() this event. This event represented a global lock. When a global ratelimit was hit this would be cleared, and then set when it was over.

Essentially it's an inverted event. But without the ability to clear it of course couldn't work.

ckuhlmann commented 2 years ago

I second the need for such a document, and would like to add the suggestion for discussing issues coming from

I have difficulties to convert code that uses asyncio's asyncio.Future() future objects, which can be manually created, started, cancelled, completed and failed and async awaited.

I would need a pattern that allows me to interface anyio with existing code, where the beginning and end of an async operation is in different threads. I know anyio returns a concurrent.futures.Future for tasks, which is similar, but I would need a pseudo-task that is not actually running in anyio, only monitored for completion/result, cancellation or exceptions. This is because the real operation is happening in separately managed threads.
I'm not sure whether this is at all compatible with anyio's style, but manually created futures are really useful when interfacing with low-level libraries (think compiled ones, where you cannot yourself change how they work). If this is not possible with anyio, an alternative approach should be documented.

The concept of a user managed future construct is also in other languages, e.g. c.f. C#'s TaskCompletionSource, meaning that people coming from those languages would likely also be interested in finding a substitute when they start to use anyio.

agronholm commented 2 years ago

If you could give me a concrete example of a task you're struggling with, I could possible offer some suggestions as for how to do things in the "AnyIO way".

ckuhlmann commented 2 years ago

Hi, thanks for the quick response and the offer to help.

I have posted the problem at stackoverflow Converting code using asyncio.Future futures to anyio. I'll admit that I'm not too experienced with either asyncio or anyio, so I'm sorry if this is kind of a stupid approach to create an async interface. However, I don't think I can get rid of the two manually managed threads, as they do hardware coordination necessary for the connection that must not be interrupted and has to be coordinated between them.

agronholm commented 2 years ago

I think it would be better to use tasks instead of long running threads, and only use worker threads briefly when doing something that requires blocking I/O. At least nothing in that SO post seems to actually require threads so I'm wondering what they're really needed for.

ckuhlmann commented 2 years ago

They are long running e.g. because the connection requires keep-alive messages and hardware monitoring that is coordinated with send/receive actions. You are probably right that in hindsight individual tasks could also work. But since this is existing code that I'm trying to interface with, that choice for threads has been made a long time ago. I think this is not an uncommon situation when dealing with existing code, where people would like to use async libraries to make things easier when using the high-level interface, yet also deal with lots of legacy code that uses manually created threads.

smurfix commented 2 years ago

You are probably right that in hindsight individual tasks could also work. But since this is existing code that I'm trying to interface with, that choice for threads has been made a long time ago.

Well then the first step should be to convert each of these threads into a task. This basically involves painting the call path between the task's run method and whichever code ultimately blocks in recvfrom (or similar calls) with async and await code words.

I'm serious. Threads are annoying. You can't stop a thread without closing the file descriptor it's waiting for (yes there are workarounds involving using another file descriptor, poll, and so on, but that is anything but programmer-friendly code and I can almost guarantee that there are error conditions which will cause it to deadlock). You need to deal with implicit concurrency and locking your data structures and all that, for no good reason. And so on.

While you're doing that, rewrite your keepalive handling. or rather, remove it, because that job is absurdly easy with anyio. Let me show you how.

Sending: you create a separate sender task which you talk to using a memory stream. The sending task basically does

while True:
    try:
        with anyio.fail_after(30):
            msg = await sendq.receive()
    except TimeoutError:
        await send_keepalive(socket)
    else:
        await socket.send(msg)

The receiver task is even shorter:

while True:
    with fail_after(65):
        msg = await socket.receive()
    await handle_message(msg)

The two pages of convoluted timeout handling and keeping-track-of-keepalive-timing in your legacy code base can now simply be deleted. I've been there, multiple times.

ckuhlmann commented 2 years ago

Thank you both for taking the time to point out a better way to do it. I want to stop hijacking this issue with my specific use case, but I believe that the answers show that the original idea of a pattern document is a good one, even if it is just pointers to doing things differently. I'm not certain I have the time to walk that road in this case, but I agree with you that it would pay off in the long run. Having at a lot of working code to rewrite, it's not always easy to justify to change the complete architecture. Just a quick last question: is anyio.create_memory_object_stream() thread safe? If I want to redo that code, I need an intermediate solution for some basic functionality and having a stream interface between the new tasks and the old code would help to test against the old and new backend with the same high level anyio code. Thanks again.

agronholm commented 2 years ago

AnyIO's memory object streams might work here. You just need to use from_thread.run() in the threads when sending something down the pipes.

ckuhlmann commented 2 years ago

I had the same idea when I saw that feature. Are the streams obtained from anyio.create_memory_object_stream() thread safe?

Edit: never mind the question, i just realized that you mentioned from_thread.run ...

waketzheng commented 2 years ago

How can I change the following code to be anyio @agronholm

import asyncio
from typing import Coroutine
from tortoise import Toirtoise

def run_async(coro: Coroutine) -> None:
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(coro)
    finally:
        loop.run_until_complete(Tortoise.close_connections())

https://github.com/tortoise/tortoise-orm/issues/1119

agronholm commented 2 years ago

You should open a separate discussion (not an issue) about that.

smurfix commented 2 years ago

Well, it's a good example for a guide anyway.

The anyio-ish solution is

async def close_when_done(main):
    try:
        await main()
    finally:
        await Tortoise.close_connections()
anyio.run(close_when_done, your_main_code)

asyncio's standard calling convention would look like asyncio.run(close_when_done, your_main_code()), and just write await main in the body. anyio and trio don't do that, for multiple reasons.