kytos-ng / kytos

Kytos SDN Platform. Kytos is designed to be easy to install, use, develop and share Network Apps (NApps).
https://kytos-ng.github.io/
MIT License
2 stars 7 forks source link

Pacing Blueprint #454

Closed Ktmi closed 4 weeks ago

Ktmi commented 4 months ago

This is a refinement on #245 and kytos-ng/mef_eline#372, and created due to outside discussion regarding #453 and kytos-ng/mef_eline#432.

Rate limits within the kytos core are not sufficent for our use cases. Our use cases include:

245 tried to address these use cases, but introduced a large amount of complexity with little gain.

Additionally, after speaking with the operations team, rate limits on the API would also be desirable. With the current direction we are heading with the Event Buffers rate limit, we should be able to make this configurable at the NApp level per API endpoint.

italovalcy commented 4 months ago

Great points and a good summary of the previous discussion, David! My half bit of contribution follows:

Ktmi commented 3 months ago

From my research the best method available for the API ratelimit is to add in some ASGI rate limiting middleware. For middleware implementations, there are two options: asgi-ratelimit and slowapi. Between the two I would suggest asgi-ratelimit, it's configurable from the top level, removing the need to individually decorate functions, and it seems very versatile.

Ktmi commented 3 months ago

For event rate limiting, this is feeling still a bit out of reach. The main issue is the mixture of async and non async contexts. We could get around this by making the async code check the limit in a separate thread, but that feels like a hack. I think first we need to target moving everything over to async, before we move forward with event rate limits.

Ktmi commented 3 months ago

@viniarck and @italovalcy Here's a blueprint for specifying API rate limits. It doesn't include any implementation details, beyond how the data should be interpreted.

API Rate Limit Specification Format

This document is for how rate limits shall be specified for API endpoints. These rate limits are global, shared across all users, and are meant to be used to address technical constraints. Rate limits will be provided as a list[dict], which each enclosed dict containing match, limit, and method attributes.

The value of the limit attribute determines the rate limit for the matched URLs.

The value of the method attribute is a list of HTTP methods to rate limit for.

The value of the match attribute will be used to construct a regular expression which will attempt to match against a URL. Any subgroups of the regex will be used for providing finer rate limit controls, e.g. rate limiting per object ID.

The list of rate limits will be evaluated in sequential order. If a requested URL matches the regular expression and HTTP method, it will then generate a hit on that limit. If the rate limit is exceeded, the server will return code 503, service unavailable. If the rate limit is not exceeded, the request is allowed to go through, without evaluating any more possible matches.

api_rate_limits = [
    {
        "method": ["GET"],
        "match": "`^/switches/(?P<switch_id>\w*)/$`",
        "limit": "amount/time unit"
    },
    ...
]
viniarck commented 3 months ago

@Ktmi,

I'll recommend an actual blueprint PR to officialize the requirements/proposal, but in the meantime here's my feedback:

Do we have an immediate use case for rate limit API endpoints that will be used in prod in this upcoming version 2024.1? and what problem and for whom it'll rate limit? As you know, we're also trying to move away one day from requests between NApps (as we're discussing on this issue discussion), who are we trying to rate limit globally, front-end and/or any authenticated user? (which many NApps aren't still also using the @authenticated decorator)

We need these answers first before deciding on a solution, even if what you're proposing is reasonable and makes sense, it's coming before the actual requirements and the driving use case or problem that's being solved. We could also come up with requirements from the development and come up with a generalized rate limit solution, but will it be good enough and for which problems? Which again will go back to the chicken and egg problem, the intention is great, but especially for rate limit in API endpoints and also rate limit on consumer side we do need to sort and better map the requirements and specific problems and the use cases, and whether or not it's something that needs to be solved for 2024.1.

Now, the use cases for rate limit on queues with KytosEvents are clear to me:

That said, I'll recommend you to think about it, map the clear use cases and problems, and eventually consolidate and open a blueprint PR, and we'll continue there, and things that are uncertain, it's OK to leave them as uncertain in the blueprint just so they can be iterated later on in a next prioritization, meaning out of scope for now (if any).

viniarck commented 3 months ago

Side note

Speaking of the core @authenticated decorator it also forced that NApps needs a decorator to have authentication, so if also one day we might end up with API rate limit (and potentially also per user or just global), it might be worth discussing whether or not it should be decorated too with specific parameters and having it all configured via decorators parametrized via NApps settings? Certainly the regex here is very powerful, but there are tradeoffs to be considered and usability of how this is configured. Also, regarding the libraries asgi-ratelimit and slowapi (that also uses limits), I don't look forward to different rate limit libraries for different parts (I can see that not only making difficult to reason and understand but also potential slighly different underlying algorithms differences that can turn into complaints for us to keep dealing with - for instance, let's recall the discussions about limits algorithm strategies that you tried out and some of the surprises we've already had to deal with, so it's crucial that we also only expose strategies and we are OK with and willing to support - so let's also pick one that is good enough for general rate limit use cases and see if it's suitable for the primary use cases on producer/consumer of queues, and then also keep in mind if can be used for API endpoints if one day we need it. So, please continue your research and see the best proposal you can come up with, but let's keep these things in mind. Thanks, David.

Ktmi commented 3 months ago

The concrete general use case for rate limiting is to maintain system stability, particularly in the case of OpenFlow message out, where over use of the endpoint can result in the connection dropping.

Message out, now that I think about it is another one of these 1 to 1 calls, that shouldn't be an event, but instead just a direct call. How about we start from the top of the stack of issues, and first resolve our technical debt, regarding the internal calling procedure. If we were to build rate limiting for events, but then change to a non event bus driven internal call procedure, then the work on rate limits would be for naught.

Ktmi commented 2 months ago

@viniarck @italovalcy Here's what I got for pacing. You should be able to try it out by checking out the three following branches:

This document is intended on a guide for both Developers and Network Operators for how to implement and use Pacing within Kytos. Pacing is intended as a mechanism to control how often certain actions will be executed. This differs from rate limiting, as the action is not cancelled, and instead the caller is temporarily paused until the action is allowed to be executed again. For an action to be paced, it needs to be instrumented by developers to support pacing.

Setting A Pace

For network operators, setting the pace of actions can be done through the action_paces configuration option in ktyos.conf. The action_paces is a json object. In this jsob object, the keys represent the type of action to be paced, with the values representing the pace of the actions. The value itself is another json object containing the items max_concurrent which is the maximum amount of that action allowed in the pacing window, and refresh_period which is the time until the next refresh of the pacing window.

action_paces =
  {
    "flow_manager.send_flow_mod": {"max_concurrent": 50, "refresh_period": 2.0}
  }

Pacing an Action

For developers to pace an action, they need to do is get the Pacing controller from the Kytos Controller, located at controller.pacer. The pacing controller has two methods of interest to implement action pacing; that being hit for threaded code, and ahit for asynchronous code.


class Pacer:
    ...
    def hit(self, action_name, *keys):
        """
        Pace execution, based on the pacing config for the given `action_name`.
        `keys` can be included to allow multiple objects
        to be be paced separately on the same action.
        `keys` should all be hashable objects like strings.

        This should not be called from the same thread serving
        the pacing.
        """
        ...

    async def ahit(self, action_name, *keys):
        """
        Asynchronous variant of `hit`.

        This can be called from the serving thread safely.
        """
        ...

Here's an example of how to use the pacer. In this example we have are about to send out an event, but we want to pace how often we send out the event based on the switch ID.


class Main(KytosNApp):
    ...

    def send_switch_event(self, switch: Switch):
        """Send an event about the switch."""
        self.controller.pacer.hit("mynapp.switch_event", switch.dpid)
        self.controller.buffers.app.put(
            ...
        )

    async def send_switch_event(self, switch: Switch):
        """Asynchronously send an event about the switch."""
        await self.controller.pacer.ahit("mynapp.switch_event", switch.dpid)
        await self.controller.buffers.app.aput(
            ...
        )

As is shown above, the hit or ahit method is called prior to the action which we want to pace, where the pacer then handles when to let the action resume. It should be noted that the pacer is fair, and that actions will be resumed in the order they called the pacer.

viniarck commented 2 months ago

@Ktmi, nicely done. Great abstraction for the existing problem, I liked the flexibility it provides to be used from either sync or async contexts, and considering the existing codebase although we're trying to move to async as much as we can, it's great for that and I can it see it being suitable for pretty much any place in our NApps code base. The only minor drawback is that it's not as rich in terms of moving window strategies compared to limits, but for the existing and foreseeable pacing/rate limiting type of problems it's a good trade off here so let's proceed with it, and just a concurrency control with sleep is a good compromise for the foreseeable future, while also avoiding a dependency that we don't immediately need.

Points to document/consider/discuss when consolidating this proposal and sending the blueprint PR:

1) Potentially configure the pacers on NApps settings instead of kytos.conf. What you have here can work, but kytos.conf is usually good when we can also provide defaults and usually for core settings, otherwise during upgrade not necessarily a new kytos.conf is generated, and in those cases we'd be having to always recommend for users to regenerate, which isn't too bad (and we do mention that on release notes), but on settings.py that'd can be easier since that's always would be the latest and usually users only replace values there with the latest file, and then NApps could create or get an existing pacer that'd be managed in the core. Let me know if you see something else simpler to maintain/use or have other suggestions.

2) Specify the time unit of refresh_period, reading the implemetation I see that's in secs.

3) Fairness makes a lot of sense how it's been implemented but notice that sometimes another pacer might be needed "close" to the producer source. For instance, on this flow_manager example, it's great that since flow_manager is responsible for sending the flow mods, it's a good point to control the concurrency there, but notice that any potential misbehaving NApp/client sending too many events to flow_manager would impact on the share of the scheduling queue, as expected and by design, but let's explain that in the document too, just so users/developers have also more examples to understand how to use and where to use.

4) Implementation-wise let's not allow a Pacer to crash if unhashable keys are used, and let's make sure it can be gracefully shut down.

Points to explore and share the results on this issue:

5) Since the pace is being applied to flow mods currently, let's take the opportunity to stress test this and share wireshark flow mods / sec chart to see the expected result. 6) Use a pacer in both a sync and async endpoint context publishing a particular event that needs to be paced, and then let's see a APM chart of the published event (since the APM will instrument handlers that should be easy to get a chart of how many times that event fired per second), the throughput of the events should be limited to the max concurrency of the pacer. This is similar to the flow mods you're pacing in the prior example, but the idea is to also use it from both sync and async endpoints context to exercise the pacer.

Ktmi commented 2 months ago

@viniarck So I've done some testing of the pacing. Just to see how performant it is I created the following program.

#!/usr/bin/env python

import asyncio
import time

from kytos.core.pacing import Pacer

async def worker(pacer, action_name, *keys):
    await pacer.ahit(action_name, *keys)

async def task(pacer, action_name, worker_count, group_count):
    workers = [
        worker(pacer, action_name, j)
        for i in range(worker_count)
        for j in range(group_count)
    ]

    start = time.time()

    await asyncio.gather(*workers)

    end = time.time()

    elapsed = end - start
    for j in range(group_count):
        print(f'Performed  action ({action_name}, {j}) {worker_count} times in {elapsed} seconds')

async def main():
    async with asyncio.TaskGroup() as tg:
        pacer = Pacer()
        pacer.inject_config(
            {
                'test': {
                    'max_concurrent': 1_000_000,
                    'refresh_period': 1.0,
                }
            }
        )

        pacer_task = tg.create_task(pacer.serve())

        await task(pacer, 'test', 20_000, 5)

        pacer_task.cancel()

asyncio.run(main())

The program is meant to stress test how quickly actions can be processed by a pacer in an async context, when not limited by the max_concurrent.

For my test of 5 groups of 20,000 workers with a max_concurrent of 1,000,00 I got the following results:

Performed  action (test, 0) 20000 times in 28.44497847557068 seconds
Performed  action (test, 1) 20000 times in 28.44497847557068 seconds
Performed  action (test, 2) 20000 times in 28.44497847557068 seconds
Performed  action (test, 3) 20000 times in 28.44497847557068 seconds
Performed  action (test, 4) 20000 times in 28.44497847557068 seconds

This amounts to a processing rate of about 3,500 actions per second, in a theoretically unrestricted scenario.

For my next test, I did 10 groups of 10,000 workers, with a max_concurrent of 500 and refresh_period of 1.0 seconds. This is meant to test a scenario where we exceed the maximum of the theoretically unrestricted scenario, and also have the possibility to exceed the pacing limit.

Performed  action (test, 0) 10000 times in 43.393627405166626 seconds
Performed  action (test, 1) 10000 times in 43.393627405166626 seconds
Performed  action (test, 2) 10000 times in 43.393627405166626 seconds
Performed  action (test, 3) 10000 times in 43.393627405166626 seconds
Performed  action (test, 4) 10000 times in 43.393627405166626 seconds
Performed  action (test, 5) 10000 times in 43.393627405166626 seconds
Performed  action (test, 6) 10000 times in 43.393627405166626 seconds
Performed  action (test, 7) 10000 times in 43.393627405166626 seconds
Performed  action (test, 8) 10000 times in 43.393627405166626 seconds
Performed  action (test, 9) 10000 times in 43.393627405166626 seconds

In the above scenario, I was able to get a processing rate of 2,300 actions per second. During that test, I was also getting warn messages from the pacer that the pace limit had been reached. So its possible that there where times where my system was turboing fast enough to reach 5,000 actions per second.

I'll run another set of tests using rate limits, and see what I get there.

Ktmi commented 2 months ago

@viniarck So I did further testing with limits. I tested using limits, while not implementing fairness, and using limits while implementing fairness, to hone in on any bottlenecks.

Here's a gist of the new implementation: https://gist.github.com/Ktmi/a632abd0bb502e4787e702cb60cf658f

For the first tests I check the unbounded performance between all three implementations, with 10,000 workers per group on 10 groups, with a max_concurrent of 100,000, and refresh_period of 1.0.

Running semaphore tests
Performed  action (test, 0) 10000 times in 28.382429122924805 seconds
Performed  action (test, 1) 10000 times in 28.382429122924805 seconds
Performed  action (test, 2) 10000 times in 28.382429122924805 seconds
Performed  action (test, 3) 10000 times in 28.382429122924805 seconds
Performed  action (test, 4) 10000 times in 28.382429122924805 seconds
Performed  action (test, 5) 10000 times in 28.382429122924805 seconds
Performed  action (test, 6) 10000 times in 28.382429122924805 seconds
Performed  action (test, 7) 10000 times in 28.382429122924805 seconds
Performed  action (test, 8) 10000 times in 28.382429122924805 seconds
Performed  action (test, 9) 10000 times in 28.382429122924805 seconds
Running limits tests
Performed  action (test, 0) 10000 times in 1.0355315208435059 seconds
Performed  action (test, 1) 10000 times in 1.0355315208435059 seconds
Performed  action (test, 2) 10000 times in 1.0355315208435059 seconds
Performed  action (test, 3) 10000 times in 1.0355315208435059 seconds
Performed  action (test, 4) 10000 times in 1.0355315208435059 seconds
Performed  action (test, 5) 10000 times in 1.0355315208435059 seconds
Performed  action (test, 6) 10000 times in 1.0355315208435059 seconds
Performed  action (test, 7) 10000 times in 1.0355315208435059 seconds
Performed  action (test, 8) 10000 times in 1.0355315208435059 seconds
Performed  action (test, 9) 10000 times in 1.0355315208435059 seconds
Running fair limits tests
Performed  action (test, 0) 10000 times in 24.183369159698486 seconds
Performed  action (test, 1) 10000 times in 24.183369159698486 seconds
Performed  action (test, 2) 10000 times in 24.183369159698486 seconds
Performed  action (test, 3) 10000 times in 24.183369159698486 seconds
Performed  action (test, 4) 10000 times in 24.183369159698486 seconds
Performed  action (test, 5) 10000 times in 24.183369159698486 seconds
Performed  action (test, 6) 10000 times in 24.183369159698486 seconds
Performed  action (test, 7) 10000 times in 24.183369159698486 seconds
Performed  action (test, 8) 10000 times in 24.183369159698486 seconds
Performed  action (test, 9) 10000 times in 24.183369159698486 seconds

It seems that the biggest overhead is the implementation of fairness. Limits seems extremely fast, but this test doesn't show what happens when the rate limit is reached.

For my next test, I did 10 groups of 10,000 workers, with a max_concurrent of 500 and refresh_period of 1.0 seconds.

Running semaphore tests
Performed  action (test, 0) 10000 times in 45.985026836395264 seconds
Performed  action (test, 1) 10000 times in 45.985026836395264 seconds
Performed  action (test, 2) 10000 times in 45.985026836395264 seconds
Performed  action (test, 3) 10000 times in 45.985026836395264 seconds
Performed  action (test, 4) 10000 times in 45.985026836395264 seconds
Performed  action (test, 5) 10000 times in 45.985026836395264 seconds
Performed  action (test, 6) 10000 times in 45.985026836395264 seconds
Performed  action (test, 7) 10000 times in 45.985026836395264 seconds
Performed  action (test, 8) 10000 times in 45.985026836395264 seconds
Performed  action (test, 9) 10000 times in 45.985026836395264 seconds
Running limits tests
Performed  action (test, 0) 10000 times in 19.400213956832886 seconds
Performed  action (test, 1) 10000 times in 19.400213956832886 seconds
Performed  action (test, 2) 10000 times in 19.400213956832886 seconds
Performed  action (test, 3) 10000 times in 19.400213956832886 seconds
Performed  action (test, 4) 10000 times in 19.400213956832886 seconds
Performed  action (test, 5) 10000 times in 19.400213956832886 seconds
Performed  action (test, 6) 10000 times in 19.400213956832886 seconds
Performed  action (test, 7) 10000 times in 19.400213956832886 seconds
Performed  action (test, 8) 10000 times in 19.400213956832886 seconds
Performed  action (test, 9) 10000 times in 19.400213956832886 seconds
Running fair limits tests
Performed  action (test, 0) 10000 times in 42.699138879776 seconds
Performed  action (test, 1) 10000 times in 42.699138879776 seconds
Performed  action (test, 2) 10000 times in 42.699138879776 seconds
Performed  action (test, 3) 10000 times in 42.699138879776 seconds
Performed  action (test, 4) 10000 times in 42.699138879776 seconds
Performed  action (test, 5) 10000 times in 42.699138879776 seconds
Performed  action (test, 6) 10000 times in 42.699138879776 seconds
Performed  action (test, 7) 10000 times in 42.699138879776 seconds
Performed  action (test, 8) 10000 times in 42.699138879776 seconds
Performed  action (test, 9) 10000 times in 42.699138879776 seconds

Limits continued to perform well in this test.

However, i had a suspicion on what the cause of the bottlenecks was. The usage of the janus queue seemed a likely cause. So I replaced it with an async queue, and reran the previous test. The gist was updated to include these additional tests.

Running semaphore tests
Performed  action (test, 0) 10000 times in 45.468226194381714 seconds
Performed  action (test, 1) 10000 times in 45.468226194381714 seconds
Performed  action (test, 2) 10000 times in 45.468226194381714 seconds
Performed  action (test, 3) 10000 times in 45.468226194381714 seconds
Performed  action (test, 4) 10000 times in 45.468226194381714 seconds
Performed  action (test, 5) 10000 times in 45.468226194381714 seconds
Performed  action (test, 6) 10000 times in 45.468226194381714 seconds
Performed  action (test, 7) 10000 times in 45.468226194381714 seconds
Performed  action (test, 8) 10000 times in 45.468226194381714 seconds
Performed  action (test, 9) 10000 times in 45.468226194381714 seconds
Running limits tests
Performed  action (test, 0) 10000 times in 19.957125425338745 seconds
Performed  action (test, 1) 10000 times in 19.957125425338745 seconds
Performed  action (test, 2) 10000 times in 19.957125425338745 seconds
Performed  action (test, 3) 10000 times in 19.957125425338745 seconds
Performed  action (test, 4) 10000 times in 19.957125425338745 seconds
Performed  action (test, 5) 10000 times in 19.957125425338745 seconds
Performed  action (test, 6) 10000 times in 19.957125425338745 seconds
Performed  action (test, 7) 10000 times in 19.957125425338745 seconds
Performed  action (test, 8) 10000 times in 19.957125425338745 seconds
Performed  action (test, 9) 10000 times in 19.957125425338745 seconds
Running fair limits tests
Performed  action (test, 0) 10000 times in 43.26968741416931 seconds
Performed  action (test, 1) 10000 times in 43.26968741416931 seconds
Performed  action (test, 2) 10000 times in 43.26968741416931 seconds
Performed  action (test, 3) 10000 times in 43.26968741416931 seconds
Performed  action (test, 4) 10000 times in 43.26968741416931 seconds
Performed  action (test, 5) 10000 times in 43.26968741416931 seconds
Performed  action (test, 6) 10000 times in 43.26968741416931 seconds
Performed  action (test, 7) 10000 times in 43.26968741416931 seconds
Performed  action (test, 8) 10000 times in 43.26968741416931 seconds
Performed  action (test, 9) 10000 times in 43.26968741416931 seconds
Running semaphore no janus tests
Performed  action (test, 0) 10000 times in 23.207394123077393 seconds
Performed  action (test, 1) 10000 times in 23.207394123077393 seconds
Performed  action (test, 2) 10000 times in 23.207394123077393 seconds
Performed  action (test, 3) 10000 times in 23.207394123077393 seconds
Performed  action (test, 4) 10000 times in 23.207394123077393 seconds
Performed  action (test, 5) 10000 times in 23.207394123077393 seconds
Performed  action (test, 6) 10000 times in 23.207394123077393 seconds
Performed  action (test, 7) 10000 times in 23.207394123077393 seconds
Performed  action (test, 8) 10000 times in 23.207394123077393 seconds
Performed  action (test, 9) 10000 times in 23.207394123077393 seconds
Running fair limits no janus tests
Performed  action (test, 0) 10000 times in 21.252352237701416 seconds
Performed  action (test, 1) 10000 times in 21.252352237701416 seconds
Performed  action (test, 2) 10000 times in 21.252352237701416 seconds
Performed  action (test, 3) 10000 times in 21.252352237701416 seconds
Performed  action (test, 4) 10000 times in 21.252352237701416 seconds
Performed  action (test, 5) 10000 times in 21.252352237701416 seconds
Performed  action (test, 6) 10000 times in 21.252352237701416 seconds
Performed  action (test, 7) 10000 times in 21.252352237701416 seconds
Performed  action (test, 8) 10000 times in 21.252352237701416 seconds
Performed  action (test, 9) 10000 times in 21.252352237701416 seconds

It seems that the usage of the janus queue is the main bottle neck with the current implementation. The usage of the semaphore seems fine.

Also here's the results for 100,000 max_concurrent:

Running semaphore tests
Performed  action (test, 0) 10000 times in 26.386483430862427 seconds
Performed  action (test, 1) 10000 times in 26.386483430862427 seconds
Performed  action (test, 2) 10000 times in 26.386483430862427 seconds
Performed  action (test, 3) 10000 times in 26.386483430862427 seconds
Performed  action (test, 4) 10000 times in 26.386483430862427 seconds
Performed  action (test, 5) 10000 times in 26.386483430862427 seconds
Performed  action (test, 6) 10000 times in 26.386483430862427 seconds
Performed  action (test, 7) 10000 times in 26.386483430862427 seconds
Performed  action (test, 8) 10000 times in 26.386483430862427 seconds
Performed  action (test, 9) 10000 times in 26.386483430862427 seconds
Running limits tests
Performed  action (test, 0) 10000 times in 1.0367469787597656 seconds
Performed  action (test, 1) 10000 times in 1.0367469787597656 seconds
Performed  action (test, 2) 10000 times in 1.0367469787597656 seconds
Performed  action (test, 3) 10000 times in 1.0367469787597656 seconds
Performed  action (test, 4) 10000 times in 1.0367469787597656 seconds
Performed  action (test, 5) 10000 times in 1.0367469787597656 seconds
Performed  action (test, 6) 10000 times in 1.0367469787597656 seconds
Performed  action (test, 7) 10000 times in 1.0367469787597656 seconds
Performed  action (test, 8) 10000 times in 1.0367469787597656 seconds
Performed  action (test, 9) 10000 times in 1.0367469787597656 seconds
Running fair limits tests
Performed  action (test, 0) 10000 times in 23.834824562072754 seconds
Performed  action (test, 1) 10000 times in 23.834824562072754 seconds
Performed  action (test, 2) 10000 times in 23.834824562072754 seconds
Performed  action (test, 3) 10000 times in 23.834824562072754 seconds
Performed  action (test, 4) 10000 times in 23.834824562072754 seconds
Performed  action (test, 5) 10000 times in 23.834824562072754 seconds
Performed  action (test, 6) 10000 times in 23.834824562072754 seconds
Performed  action (test, 7) 10000 times in 23.834824562072754 seconds
Performed  action (test, 8) 10000 times in 23.834824562072754 seconds
Performed  action (test, 9) 10000 times in 23.834824562072754 seconds
Running semaphore no janus tests
Performed  action (test, 0) 10000 times in 7.44641637802124 seconds
Performed  action (test, 1) 10000 times in 7.44641637802124 seconds
Performed  action (test, 2) 10000 times in 7.44641637802124 seconds
Performed  action (test, 3) 10000 times in 7.44641637802124 seconds
Performed  action (test, 4) 10000 times in 7.44641637802124 seconds
Performed  action (test, 5) 10000 times in 7.44641637802124 seconds
Performed  action (test, 6) 10000 times in 7.44641637802124 seconds
Performed  action (test, 7) 10000 times in 7.44641637802124 seconds
Performed  action (test, 8) 10000 times in 7.44641637802124 seconds
Performed  action (test, 9) 10000 times in 7.44641637802124 seconds
Running fair limits no janus tests
Performed  action (test, 0) 10000 times in 7.1090991497039795 seconds
Performed  action (test, 1) 10000 times in 7.1090991497039795 seconds
Performed  action (test, 2) 10000 times in 7.1090991497039795 seconds
Performed  action (test, 3) 10000 times in 7.1090991497039795 seconds
Performed  action (test, 4) 10000 times in 7.1090991497039795 seconds
Performed  action (test, 5) 10000 times in 7.1090991497039795 seconds
Performed  action (test, 6) 10000 times in 7.1090991497039795 seconds
Performed  action (test, 7) 10000 times in 7.1090991497039795 seconds
Performed  action (test, 8) 10000 times in 7.1090991497039795 seconds
Performed  action (test, 9) 10000 times in 7.1090991497039795 seconds
viniarck commented 2 months ago

Interesting findings @Ktmi. I'm surprised that janus queue had a relatively significant impact. We're already paying the price with janus in the event bus, but it's certainly not desirable to compound. janus has been maintained by a cpython core developer, the implementation looks like very lightweight and the author is very profilic in maintaining other asyncio libraries, but I wonder if there are maybe successors to that library, something to research in the future (low priority now).

So, what's the bottom line here? Should we just go for limits exposing sync|async strategies that we've confirmed that work well like fixed window, and get rid of the idea of the queue with fairness? What's your recommendation?

From what I followed, it was a good attempt to try to expose both a sync and async interface, but based on the findings here the lowest latency one here would only to expose either a Pacer or APacer for either synchronous and asynchronous context respectively.Looking into the current use cases that need pacing, mef_eline, of_multi_table only need in a synchronous context, whereas telemetry_int only need in a asynchronous context, and since a producer rate limit is supposed to limit ideially as close to the source as possible I can see Pacer and APacer working well without trying to do too much, and then let's make sure it has concurrency safety. This can still still achieve fairness, for instance, mef_eline using pacer "a" 100/sec, of_multi_table using a pacer "b" 100/sec, telemetry_int using a pacer "c" 100/sec, and then flow_manager also pacing at "300+/sec". Anyway, just an example, but it can be configurable in a way to achieve fairness when it makes sense. But, also, broadly speaking about fairness or prioritization the rest we can solve with priority queues and future pacing on consumer side.

Ktmi commented 2 months ago

@viniarck The performance characteristics of janus, it's likely caused by the juggling act of trying to manage both sync and async synchronization primitives. Maybe I could improve performance by switching out put for put_nowait.

Going for limits isn't a bad idea, the only real issue I had with it is the split between async and sync, but after my experience with janus, I understand the reason why.

Fairness is important for cases where the producer you are pacing is supposed to be following a certain priorities, or other cases where there is an expectation to the order of events completing. In cases where there is no expectation of ordering, just that the action is eventually executed, then fairness is unnecessary.

I'll see if I can squeeze a little bit more of performance out of janus, and if it nears the performance of the pure asyncio approach, then we will keep fairness. If not, then we can move on to the separate Pacer and APacer. Whether we keep fairness or not, I'll be switching to rate limits as well.