saltyrtc / saltyrtc-server-python

SaltyRTC signalling server implementation.
MIT License
59 stars 13 forks source link

Yet another task queue refactoring #110

Closed lgrahl closed 5 years ago

lgrahl commented 5 years ago

Yay... again... this time we try to simplify tasks and jobs by giving them hard constraints.

A task:

A job:

The job queue runner:

To do:

codecov-io commented 5 years ago

Codecov Report

Merging #110 into master will increase coverage by 0.47%. The diff coverage is 87.22%.

Impacted file tree graph

@@            Coverage Diff            @@
##           master    #110      +/-   ##
=========================================
+ Coverage   82.33%   82.8%   +0.47%     
=========================================
  Files          10      11       +1     
  Lines        1698    1774      +76     
  Branches      198     202       +4     
=========================================
+ Hits         1398    1469      +71     
- Misses        234     241       +7     
+ Partials       66      64       -2
Impacted Files Coverage Δ
saltyrtc/server/exception.py 94.11% <ø> (ø) :arrow_up:
saltyrtc/server/common.py 91.6% <ø> (-0.06%) :arrow_down:
saltyrtc/server/__init__.py 100% <100%> (ø) :arrow_up:
saltyrtc/server/bin.py 39.86% <100%> (ø) :arrow_up:
saltyrtc/server/typing.py 88.37% <75%> (-4.32%) :arrow_down:
saltyrtc/server/util.py 75.63% <84.21%> (+3.55%) :arrow_up:
saltyrtc/server/task.py 86.22% <86.22%> (ø)
saltyrtc/server/server.py 86.78% <87.14%> (+0.89%) :arrow_up:
saltyrtc/server/protocol.py 91.31% <96.66%> (+0.18%) :arrow_up:
... and 4 more

Continue to review full report at Codecov.

Legend - Click here to learn more Δ = absolute <relative> (impact), ø = not affected, ? = missing data Powered by Codecov. Last update 442f5b2...00c6771. Read the comment docs.

lgrahl commented 5 years ago

Unfortunately, we will have to drop support for PyPy as it seems to suffer from race conditions in its task cancellation propagation.

Example:

import asyncio
import random
import sys

loop = asyncio.get_event_loop()
loop.set_debug(True)

async def cancels_waiting_task(task):
    await asyncio.sleep(0.01 + (0.5 - random.random()) / 100)
    task.cancel()

async def sets_exception(future):
    await asyncio.sleep(0.01)
    future.set_exception(ValueError('Huh?!'))

async def transforms_future_exception(future):
    try:
        await asyncio.shield(future)
    except ValueError as exc:
        raise ValueError('{} What is going on?!'.format(exc))

async def waits_for_future(future):
    try:
        result = await asyncio.wait_for(transforms_future_exception(future), timeout=0.1)
    except Exception:
        pass

async def main():
    while True:
        future = asyncio.Future()
        waits_for_future_task = loop.create_task(waits_for_future(future))
        await asyncio.wait([
            waits_for_future_task,
            sets_exception(future),
            cancels_waiting_task(waits_for_future_task),
        ])

exception_handler_called = False
main_task = loop.create_task(main())

def exception_handler(loop, context):
    global exception_handler_called
    if exception_handler_called:
        return
    loop.default_exception_handler(context)
    print('^^^^^^^^ WOOT?!')
    main_task.cancel()
    exception_handler_called = True

loop.set_exception_handler(exception_handler)
loop.run_until_complete(main_task)