HuyaneMatsu / scarletio

Asynchronous blackmagic & Witchcraft
Other
14 stars 1 forks source link
async

scarletio

Asynchronous blackmagic & Witchcraft



Table of contents


Introduction

Scarletio is a coroutine based concurrent Python library using modern async / await syntax. Originally inspired by asyncio.

One of the core concepts of the library is that the event loops should not intercept with synchronous code execution. When an event loop is started it will not block the control flow, instead it provides you various synchronization tools to start new asynchronous procedures and to retrieve their results cross environment.

You can experiment with scarletio in the REPL:

$ python3 -m scarletio
                     _      _   _
                    | |    | | (_)
  ___  ___ __ _ _ __| | ___| |_ _  ___
 / __|/ __/ _` | '__| |/ _ \ __| |/ _ \
 \__ \ (_| (_| | |  | |  __/ |_| | (_) |
 |___/\___\__,_|_|  |_|\___|\__|_|\___/

                                  1.0.56

Scarletio interactive console 3.8.10 (default, May 26 2023, 14:05:08) [GCC 9.4.0] on linux.
Use "await" directly.
Type "help", "copyright", "credits" or "license" for more information.
In [0]: 

Note a great deal of Scarletio features only works on Linux

[ ↑ Back to top ↑ ]

Coroutines

Coroutines are a special type of function that can be suspended and resumed, allowing other code to run in between. Coroutines enable developers to write asynchronous code in a more sequential and readable manner.

In traditional programming functions are called, executed, and completed before control is returned to the caller. However, coroutines differ in that they can be suspended in the middle of their execution, allowing the program to switch to another task. When a coroutine is suspended, it yields control back to the event loop, allowing other code to be executed.

A coroutine function is declared by prefixing a function definition with the async keyword, like this:

In [0]: async def main():
   ...:     print('hello')
   ...:     await sleep(1.0)
   ...:     print('world')
   ...:

This functions prints out "hello", waits 1 second and then prints "world".

Inside a coroutine function, you can use the await keyword to wait for the result of another coroutine or an asynchronous operation. When an await statement is encountered, the coroutine suspends its execution until the awaited task is complete, allowing other coroutines to run in the meantime.

Coroutines are scheduled and executed within an event loop, which is responsible for managing their execution and switching between them.

Note that simply calling a coroutine function will not schedule it to be executed:

In [1]: main()
<coroutine object main at 0x7fbb2615d340>

To run the coroutine we have to await it:

In [2]: await main()
hello
world
[ ↑ Back to top ↑ ]

Running coroutines

Scarletio repl provides a native way of using await, but traditionally await can only be used inside of coroutine functions.

from scarletio import sleep

async def main():
    print('hello')
    await sleep(1.0)
    print('world')

await main()

# Produces:
#
#  File "file.py", line 8
#    await main()
#    ^
#SyntaxError: 'await' outside function

We can use scarletio.run function to run our entry point, the "main" function.

from scarletio import run, sleep

async def main():
    print('hello')
    await sleep(1.0)
    print('world')

run(main())

# Produces:
#
# hello
# world

For more control over our application we want to access our event loop directly, since that is handling the scheduling and the execution of our tasks. To get our event loop we will use the scarletio.get_or_create_event_loop function.

from scarletio import get_or_create_event_loop, sleep

async def main():
    print('hello')
    await sleep(1.0)
    print('world')

LOOP = get_or_create_event_loop()
LOOP.run(main())
LOOP.stop()

# Produces:
#
# hello
# world

While scarletio.run handles loop detection, creation and stopping as required, loop.run will not stop the event loop after our coroutine finishes.

[ ↑ Back to top ↑ ]

Tasks

Tasks are used to schedule coroutines concurrently. To schedule up a task we wrap our coroutine with a function just as loop.create_task. It will return our task and schedule up the coroutine.

from scarletio import get_or_create_event_loop, sleep

async def say_after(to_say, after):
    await sleep(after)
    print(to_say)

async def main():
    # We schedule up `say_after`
    task = LOOP.create_task(say_after('hello', 1))

    # Lets wait for our task's completion.
    await task

LOOP = get_or_create_event_loop()
LOOP.run(main())
LOOP.stop()

# Produces:
#
# hello

To run coroutines concurrently we just have to create more tasks.

from scarletio import get_or_create_event_loop, sleep

async def say_after(to_say, after):
    await sleep(after)
    print(to_say)

async def main():
    # We schedule up `say_after`. One finishes after 1 and the other after 2 seconds.
    task_0 = LOOP.create_task(say_after('hello', 1))
    task_1 = LOOP.create_task(say_after('world', 2))

    # Lets await for out tasks' completion.
    await task_0
    await task_1

LOOP = get_or_create_event_loop()
LOOP.run(main())
LOOP.stop()

# Produces:
#
# hello
# world
[ ↑ Back to top ↑ ]

Task groups

Task groups allow you to manage and coordinate a collection of tasks. They provide convenient way to work with multiple tasks concurrently and track their progress and results.

Task groups provide methods to create and add individual tasks to the group. They are particularly useful in scenarios where you have a set of related tasks that can be executed concurrently and need to be managed collectively. They make it easier to handle complex workflows and improve the readability of asynchronous code.

from scarletio import TaskGroup, get_or_create_event_loop, sleep

async def say_after(to_say, after):
    await sleep(after)
    print(to_say)

async def main():
    task_group = TaskGroup(LOOP)

    # We schedule up `say_after`.
    task_group.create_task(say_after('hello', 1))
    task_group.create_task(say_after('world', 2))

    # Lets await for out tasks' completion.
    await task_group.wait_all()

LOOP = get_or_create_event_loop()
LOOP.run(main())
LOOP.stop()

# Produces:
#
# hello
# world
[ ↑ Back to top ↑ ]

Task cancellation

Tasks can easily and safely be cancelled. When a task is cancelled a scarletio.CancelledError will be raised into task at the next opportunity.

from scarletio import TaskGroup, get_or_create_event_loop, sleep

async def say_after(to_say, after):
    await sleep(after)
    print(to_say)

async def main():
    task_group = TaskGroup(LOOP)

    # We schedule up `say_after`. By cancelling `hello` it will not print.
    task_group.create_task(say_after('hello', 1)).cancel()
    task_group.create_task(say_after('world', 2))

    # Lets await for out tasks' completion.
    await task_group.wait_all()

LOOP = get_or_create_event_loop()
LOOP.run(main())
LOOP.stop()

# Produces:
#
# world
[ ↑ Back to top ↑ ]

Task suspension

Tasks can be suspended using either sleep, skip_poll_cycle or skip_ready_cycle.

sleep suspends the task for the given amount of seconds.

from time import strftime
from scarletio import get_or_create_event_loop, sleep

async def main():
    # Print out the current time every 4 seconds.
    while True:
        print(strftime('%X'))
        await sleep(4)

LOOP = get_or_create_event_loop()
try:
    LOOP.run(main())
finally:
    LOOP.stop()

sleep returns a Future, so they can be cancelled or simply used inside a task group too.

from scarletio import TaskGroup, get_or_create_event_loop, sleep

async def say_after(to_say, after):
    await sleep(after)
    print(to_say)

async def main():
    task_group = TaskGroup(LOOP)

    task_group.create_task(say_after('hello', 1))
    task_group.add_future(sleep(2))

    # Lets await for out tasks' completion.
    await task_group.wait_all()
    print('world')

LOOP = get_or_create_event_loop()
LOOP.run(main())
LOOP.stop()

# Produces:
#
# hello
# world

skip_ready_cycle skips every scheduled and ready to run tasks. This can be used to synchronise between other tasks, or to wait for other scheduled callbacks to finish before we continue our tasks' execution.

This is particularly useful in event driven programming when we know our event handlers will be run, but we do not know in what order.

from scarletio import TaskGroup, get_or_create_event_loop, skip_ready_cycle

async def say_first(to_say):
    print(to_say)

async def say_second(to_say):
    await skip_ready_cycle()
    print(to_say)

async def main():
    task_group = TaskGroup(LOOP)

    # `say_second` will always print after `say_first` if scheduled concurrently.
    task_group.create_task(say_second('world'))
    task_group.create_task(say_first('hello'))

    # Lets await for out tasks' completion.
    await task_group.wait_all()

LOOP = get_or_create_event_loop()
LOOP.run(main())
LOOP.stop()

# Produces:
#
# hello
# world

skip_poll_cycle is a more extreme version of skip_ready_cycle, because it schedules the task back after the next io polling.

from scarletio import TaskGroup, get_or_create_event_loop, skip_poll_cycle, skip_ready_cycle

async def skip_10_times_then_say(to_say):
    for _ in range(10):
        await skip_ready_cycle()

    print(to_say)

async def skip_io_poll_then_say(to_say):
    await skip_poll_cycle()
    print(to_say)

async def main():
    task_group = TaskGroup(LOOP)

    task_group.create_task(skip_io_poll_then_say('world'))
    task_group.create_task(skip_10_times_then_say('hello'))

    # Lets await for out tasks' completion.
    await task_group.wait_all()

LOOP = get_or_create_event_loop()
LOOP.run(main())
LOOP.stop()

# Produces:
#
# hello
# world

Since tasks are usually scheduled after io operations, Scarletio will always prefer to finish all already scheduled and ready to run tasks before again polling from io.

[ ↑ Back to top ↑ ]

Timeouts

Timeouts can be applied to Future-s and Task-s using their apply_timeout method. If timeout occurs the Task is cancelled and a TimeoutError is propagated.

from scarletio import get_or_create_event_loop, sleep

async def wait_forever():
    await sleep(3600)

async def main():
    task = LOOP.create_task(wait_forever())
    task.apply_timeout(1.0)

    try:
        await task
    except TimeoutError:
        print('TIMEOUT!')

LOOP = get_or_create_event_loop()
LOOP.run(main())
LOOP.stop()

# Produces:
#
# TIMEOUT!

We can use repeat_timeout when executing a loop where timeout should be applied on each cycle.

from scarletio import get_or_create_event_loop, repeat_timeout, sleep

async def say_after(to_say, after):
    await sleep(after)
    print(to_say)

async def main():
    after = 2
    try:
        with repeat_timeout(5) as loop:
            for _ in loop: # The timeout is reapplied with every iteration.
                await say_after('hi', after)
                after += 2
    except TimeoutError:
        print('TIMEOUT!')

LOOP = get_or_create_event_loop()
LOOP.run(main())
LOOP.stop()

# Produces:
#
# hi
# hi
# TIMEOUT!
[ ↑ Back to top ↑ ]

Running in threads

While tasks run inside an event loop, it is possible to move their execution into a separate thread or, to be more accurate, into an executor. It can be useful when running IO-bound functions that would otherwise block the event loop.

from threading import current_thread
from time import sleep as blocking_sleep
from scarletio import enter_executor, get_or_create_event_loop

async def main():
    print(f'before entering: {current_thread().ident}')
    async with enter_executor():
        print(f'after entering: {current_thread().ident}')

        blocking_sleep(2)

        print(f'before exiting: {current_thread().ident}')
    print(f'after exiting: {current_thread().ident}')

LOOP = get_or_create_event_loop()
LOOP.run(main())
LOOP.stop()

# Produces: (example)
# 
# before entering: 140664167724800
# after entering: 140664159332096
# before exiting: 140664159332096
# after exiting: 140664167724800
[ ↑ Back to top ↑ ]

Scheduling from other threads

We can create tasks from other threads by using the create_task_thread_safe method. If we want to retrieve their result we use task.sync_wrap().wait().

from scarletio import get_or_create_event_loop, sleep

async def say_after(to_say, after):
    await sleep(after)
    print(to_say)

LOOP = get_or_create_event_loop()

# Create our tasks from a different thread
task_0 = LOOP.create_task_thread_safe(say_after('hello', 1))
task_1 = LOOP.create_task_thread_safe(say_after('world', 2))

# Wait for their execution to finish.
task_0.sync_wrap().wait()
task_1.sync_wrap().wait()

LOOP.stop()

It is also possible to wait for tasks' results from other event loop using await task.async_wrap(loop).

[ ↑ Back to top ↑ ]

Locks

Scarletio Lock can be used to guarantee exclusive access to a shared resource. Should be used with async with statement.

The example will:

from scarletio import Lock, TaskGroup, get_or_create_event_loop, sleep

async def say_after(to_say, after, lock):
    async with lock:
        await sleep(after)
        print(to_say)

async def main():
    task_group = TaskGroup(LOOP)
    lock = Lock(LOOP)

    task_group.create_task(say_after('hello', 1, lock))
    task_group.create_task(say_after('world', 1, lock))
    task_group.create_task(say_after('hello world', 2, lock))

    # Lets await for out tasks' completion.
    await task_group.wait_all()

LOOP = get_or_create_event_loop()
LOOP.run(main())
LOOP.stop()

A scarletio ScarletLock can be used to guarantee access to a shared resource n amount of times.

The example will:

from scarletio import ScarletLock, TaskGroup, get_or_create_event_loop, sleep

async def say_after(to_say, after, lock):
    async with lock:
        await sleep(after)
        print(to_say)

async def main():
    task_group = TaskGroup(LOOP)
    lock = ScarletLock(LOOP, 2)

    task_group.create_task(say_after('hello', 1, lock))
    task_group.create_task(say_after('world', 1, lock))
    task_group.create_task(say_after('hello world', 2, lock))

    # Lets await for out tasks' completion.
    await task_group.wait_all()

LOOP = get_or_create_event_loop()
LOOP.run(main())
LOOP.stop()
[ ↑ Back to top ↑ ]

Events

A scarletio event can be used to notify multiple tasks that some event has happened.

from scarletio import Event, TaskGroup, get_or_create_event_loop, sleep

async def set_event_after(event, after):
    await sleep(after)
    event.set()

async def say_when_set(to_say, event):
    await event
    print(to_say)

async def main():
    task_group = TaskGroup(LOOP)
    event = Event(LOOP)

    task_group.create_task(set_event_after(event, 2))
    task_group.create_task(say_when_set('hello', event))
    task_group.create_task(say_when_set('world', event))

    # Lets await for out tasks' completion.
    await task_group.wait_all()

LOOP = get_or_create_event_loop()
LOOP.run(main())
LOOP.stop()

# Produces:
#
# hello
# world
[ ↑ Back to top ↑ ]