chrysn / aiocoap

The Python CoAP library
Other
267 stars 120 forks source link

Flow control of TCP socket handling a large number of requests #297

Open ChrisCuts opened 1 year ago

ChrisCuts commented 1 year ago

Hello there,

we use a CoAP TCP server in our embedded Linux application for an internal task handler and event queue. It figured out during stress tests, that when some additional tasks run asynchronous to the requests processing, that the transport pumps in messages faster, than the application can process. It ends up with a lot of rendering tasks created by the context and that the memory growth until it is exceeded.

It might be an edge case for an IoT device, but are there any ways to slow down the transport from application side? Or any other mechanisms to limit the task creation?

With kind regards Chris

chrysn commented 1 year ago

That's a good point, and the behavior you're observing is consistent with the behaviors I'd analytically expect from aiocoap.

Before we rush to fixes, let's consider the solution space. It mostly boils down to making pressure propagate back to the clients for them to deal with it. We'd have options:

  1. We could not propagate backpressure, but leave messages in the network buffer. Requests exceeding some number of worker tasks would only consume buffer space, and not be turned into (much larger) parsed messages and workers. This would mitigate the problem a bit, but (especially under load test conditions) not solve it.

    (This, by the way, gives me extra appreciation for asyncio -- if you hit the limits of how many workers one can handle, I don't want to think about what that'd mean if we spawned threads...)

  2. We could leverage TCP; given you mentioned that protocol, it may be what you have in mind. We'd use Python's stream APIs to stop the new-bytes events until our workers are fewer. There are two problems with it, though:

    2a. It doesn't work the same way for UDP (it would for WebSockets, though).

    2b. It really only moves the problem to the client side, at least when aiocoap is involved. (Which may be worth its own issue). An aiocoap TCP client will happily keep writing to the TCP socket, and ignore the very signals aiocoap as a server doesn't send either. If the client uncontrolledly sends requests (ie. without awaiting the completion of at least some of them), it will rack up application requests (as it does now), but in addition have its TCP write buffer grow indefinitely.

  3. We could leverage CoAP error codes to turn down requests intermittently. 5.03 (Service Unavailable) and 4.29 (Too Many Requests) come to mind, each with an appropriate Max-Age.

    Effectively, the context would keep track of its total of worker tasks, and if that exceeds some number, start rejecting requests.

    (Which of them to pick will depend on a more thorough reading of RFC8516, and on whether one client is sending many requests, or many clients are sending requests).

    I think this one is preferable because it works for all transports. It still has two downsides:

    3a. Unlike the TCP backpressure, it has no "OK, now hit me again" mechanism. The Max-Age needs to be timed suitably. I'm pretty sure there's either a simple published algorithm or one we can come up with on the spot (anything fast and O(1) that estimates the rate of rejected requests).

    3b. In aiocoap there is no automated mechanism on the client side yet to retry failed requests if the error is transient. That means that once a server starts doing any of this, currently the application needs retry logic.

    I'd guess that this is not too much of a problem in your load testing case.

Would the last version work for you case? I guess that if you're doing load testing, you also have an actual application in there. Whether the server guided back-off works well will depend on what you're sending: If you're sending individual events and expect each one to be eventually delivered, you might still run into limitations at some point (if aiocoap performance is a bottleneck, let's talk, but your original post sounds to me like whatever the resource handler does is the slow part). If it's all just clients that eventually reach a happy state (like, they're all fetching an update), things should be fine. If the clients are sending events that can be lumped together, it may actually be beneficial if the application does the back-off, for if an initial request to send one event fails, by the time the back-off period expires, it might have several events to send in a single request (which would be handled more efficiently later), as long as there is still a mechanism for graceful degradation.

ChrisCuts commented 1 year ago

Mmm,

I also thought about handling this on the CoAP level, but the client side sends out the requests asynchronously. So, there are 40k requests send out, before it could react on the servers missing ACK.

Currently, I made a hotfix, that creates the render tasks in another thread and blocks the transport callback with a semaphore, until we are fine to process new requests. I tried to block the main task of the loop, but didn't know how to achieve this. Certainly, this fix does not work for UDP transports, since it breaks the more complex CON handling and confuses the message handler completely because it uses the wrong loop. And I have a real bad feeling about it for sustainability reasons.

I was wondering that there is now handling in the asyncio transports, since other projects should have the same problems. It seems data_received is called whenever there is free processing time and there is no pause_reading for a bi-directional transport.

chrysn commented 1 year ago

OK, an open-loop CoAP client is brutal, but I guess that's what you're going for in a load test (effectively simulating DoSing clients).

How come there are missed ACKs in play, I thought you're going CoAP-over-TCP?

The pause_reading would be approach number 2 in the above list (now numbered; sorry, GitHub completely breaks formatting when dealing with mail messages) -- exert backpressure on the TCP side. This keeps the load off the system under attack, but the corresponding mechanism on the UDP side is just dropping the packets, and there we're off the ideal path.

We'll have to somehow limit how many worker tasks are under way, however that'll be handled in the transports. Both making the underlying transport block (if it can) and responding with errors will achieve this, ... let's see what desired behaviors are.

To determine them: What's the limiting factor in your case? (Not that we shouldn't address all, but let's focus on what is needed). Is it just the resources of the handlers, or wouldn't get the error responses out as fast as the requests come in? (That'd indicate an asymmetric uplink).


On the implementation side, the point where I'd start addressing this is TokenManager, which should not push things arbitrarily into incoming_requests but reject things once that's over a configured size. A CoAP level rejection would be easy because it can just do that; a transport level rejection is trickier because a) it needs new API between the transports and the TokenManager, and b) it'd need to catch incoming_requests shrinking and unthrottle connections one by one.

ChrisCuts commented 1 year ago

How come there are missed ACKs in play, I thought you're going CoAP-over-TCP?

<< I meant the response, not an ACK. Sorry for confusion. So, that an error response (like 4.29) would not be evaluated on client side.

The pause_reading would be approach number 2 in the above list (now numbered; sorry, GitHub completely breaks formatting when dealing with mail messages) -- exert backpressure on the TCP side. This keeps the load off the system under attack, but the corresponding mechanism on the UDP side is just dropping the packets, and there we're off the ideal path.

<< I think there is no pause_reading on those transports. I think it would be easier from application side to throttle the complete protocol, otherwise on a system level, it could be nice to slow down only single transports or sockets, to block only the issuer and having something like DOS attacks in mind.

<< Dropping UDP packets should be not problem in my opinion, since the CoAP layer would catch this and trigger a retransmission.

We'll have to somehow limit how many worker tasks are under way, however that'll be handled in the transports. Both making the underlying transport block (if it can) and responding with errors will achieve this, ... let's see what desired behaviors are.

<< I read, that the buffer size of the received buffer could be set to zero, since this will be send out to the client in TCP to send no more packets. For normal, the asyncio transports shall handle this automatically. Furthermore, the TCP response with this information could be send out to late, since the response needs to be processed before. Maybe an empty packet could be send out, but I am not so deep into TCP.

To determine them: What's the limiting factor in your case? (Not that we shouldn't address all, but let's focus on what is needed). Is it just the resources of the handlers, or wouldn't get the error responses out as fast as the requests come in? (That'd indicate an asymmetric uplink).

<< Our problem is the increasing memory due to async tasks. The more messages we send, the faster it grows. In a real world scenario we've got 6 MB per second. Our memory exceeds after 2 hours stress.

On the implementation side, the point where I'd start addressing this is TokenManager, which should not push things arbitrarily into incoming_requests but reject things once that's over a configured size. A CoAP level rejection would be easy because it can just do that; a transport level rejection is trickier because a) it needs new API between the transports and the TokenManager, and b) it'd need to catch incoming_requests shrinking and unthrottle connections one by one.

<< In my approach, it worked well to pause the thread in which the transport runs. Is there no way in an event loop to wait for it's tasks?

ChrisCuts commented 1 year ago

I've looked it up in the aiohttp implementation. They are using pause_reading for the flow control. It is implemented here:

https://github.com/aio-libs/aiohttp/blob/master/aiohttp/base_protocol.py#L40

They set up a queue for the data flow, which regulates the stream.. https://github.com/aio-libs/aiohttp/blob/master/aiohttp/streams.py#L624

I tested it with a minimal example and it seems to work out well:

import asyncio

async def process_request(transport, data):

    await asyncio.sleep(2)
    print('process')
    transport.write(data)

class EchoServerProtocol(asyncio.Protocol):

    def __init__(self, loop):

        self.loop = loop
        self._reading_paused = False

    def connection_made(self, transport):
        peername = transport.get_extra_info('peername')
        print('Connection from {}'.format(peername))
        self.transport = transport

    def pause_reading(self) -> None:
        if not self._reading_paused and self.transport is not None:
            try:
                self.transport.pause_reading()
            except (AttributeError, NotImplementedError, RuntimeError):
                pass
            self._reading_paused = True

    def resume_reading(self) -> None:
        if self._reading_paused and self.transport is not None:
            try:
                self.transport.resume_reading()
            except (AttributeError, NotImplementedError, RuntimeError):
                pass
            self._reading_paused = False

    def data_received(self, data):

        self.loop.create_task(process_request(self.transport, data))

        if len(asyncio.all_tasks()) % 100 == 0:
            self.pause_reading()
            print(len(asyncio.all_tasks()), 'Tasks', flush=True)

async def main():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    server = await loop.create_server(
        lambda: EchoServerProtocol(loop),
        '127.0.0.1', 8888)

    async with server:
        await server.serve_forever()

asyncio.run(main())
ChrisCuts commented 1 year ago

Hi,

I modified the following context member functions to get a TCP flow control done. I don't know if this is also applicable for UDP. Furthermore, I don't know if this makes sense for UDP at all, since UDP makes no sense if you have to manage high data rates and a full load on a server. It makes the server process a lot smoother.

class ControlledCoapContext(Context):
    '''Context with task limitation.'''

    _DEFAULT_ASYNC_TASK_LIMIT = 50

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._throttled_transports = set()
        self._async_task_limit = self._DEFAULT_ASYNC_TASK_LIMIT

    def _resume_processing(self, task, observation_request):
        '''Called after request processing.

        - Remove task from running renderings
        - Check if transport could resume
        '''
        self._running_renderings.remove(task)

        if self._throttled_transports:
            if len(self._running_renderings) < self._async_task_limit:
                for transport in self._throttled_transports:
                    transport.resume_reading()
                self._throttled_transports.clear()

        if observation_request:
            self._async_task_limit -= 1

    def render_to_plumbing_request(self, plumbing_request):
        '''
        Replace render_to_plumbing_request to limit tasks.
        '''
        task = self.loop.create_task(
            self._render_to_plumbing_request(plumbing_request))

        self._running_renderings.add(task)

        observation_request = plumbing_request.request.opt.observe == 0
        if observation_request:
            # Consider observers in task limiter
            self._async_task_limit += 1

        if len(self._running_renderings) > self._async_task_limit:
            if isinstance(plumbing_request.request.remote, TcpConnection):
                # pylint: disable=protected-access
                transport = plumbing_request.request.remote._transport
                transport.pause_reading()
                self._throttled_transports.add(transport)

        remove_task = functools.partial(self._resume_processing, task, observation_request)

        task.add_done_callback(lambda result, cb=remove_task: cb())

We're still on 0.4.1 at the moment and plan to migrate to 0.4.5 recently. In 0.4.5 this belongs into run_driving_pipe(*).