Augmented listen_to decorator pool kwarg to support 'dynamic_single'. In this mode, each decorated handler will be associated with an exclusive ThreadPoolExecutor with a single worker, ensuring FIFO event processing, which can be suitable for non IO-bound handlers.
With this feature we can revert (part of) certain specific fixes that had to fix on NApps, for instance:
For future developers readers, if you can afford and don't have blocking dependencies, you can also achieve a similar result with alisten_to decorator for async functions since you have explicit control when the task yields, so you can also try to leverage alisten_to if you don't have threading.Lock, since a threading.Lock wasn't designed to be used with coroutines (unlike asyncio.Lock). This is what we've done on of_coreon_raw_in handler
Local Tests
Example using noviflow NApp with @listen_to("kytos.topology.updated", pool="dynamic_single"):
This is the case where out of order events can happen, since the time.sleep here is forcning a simulation of a thread being suspended and then resumed (and this has been a source of potential out of order event issues that we've been dealing with):
Closes #397
Summary
listen_to
decoratorpool
kwarg to support'dynamic_single'
. In this mode, each decorated handler will be associated with an exclusiveThreadPoolExecutor
with a single worker, ensuring FIFO event processing, which can be suitable for non IO-bound handlers.With this feature we can revert (part of) certain specific fixes that had to fix on NApps, for instance:
Side node
For future developers readers, if you can afford and don't have blocking dependencies, you can also achieve a similar result with
alisten_to
decorator for async functions since you have explicit control when the task yields, so you can also try to leveragealisten_to
if you don't havethreading.Lock
, since athreading.Lock
wasn't designed to be used with coroutines (unlikeasyncio.Lock
). This is what we've done onof_core
on_raw_in
handlerLocal Tests
noviflow
NApp with@listen_to("kytos.topology.updated", pool="dynamic_single")
:This example is forcing a random time sleep for 1-5 secs, if you check the logs you can see that
handling topo_update ts:
respects the FIFO ordernoviflow
NApp with@listen_to("kytos.topology.updated")
with the default"app"
pool:This is the case where out of order events can happen, since the
time.sleep
here is forcning a simulation of a thread being suspended and then resumed (and this has been a source of potential out of order event issues that we've been dealing with):End-to-End Tests
I'll run e2e tests with a related test on pathfinder, I'll post the results here.