kytos-ng / kytos

Kytos SDN Platform. Kytos is designed to be easy to install, use, develop and share Network Apps (NApps).
https://kytos-ng.github.io/
MIT License
2 stars 7 forks source link

`MainThread` can get blocked when enqueuing events if the queue capacity is reached #423

Closed viniarck closed 5 months ago

viniarck commented 8 months ago

In summary:

Potential solutions (while ensuring events don't get lost during startup):

  1. As a best pratice, avoid high frequency events setup()
  2. Potentially introduce a new buffer core|app_meta or something along these lines and move a few events from the core such as notify_loaded(). This assumes that MainThread wouldn't get blocked by NApps.
  3. Offload NApps setup() initialization not to run on MainThread but on a worker thread while still synchronously waiting for it, and still , just so asyncio event loop also don't get blocked when the core consumers get started.
  4. Decouple each buffers qsize to be configured independently, as mentioned on today's weekly meeting meeting, the first iteration was using a queue length the same size as thread pool, and by default each queue size being 2 times larger, this should be reasonable without increasing too much delay.
  5. Add a new task/thread monitor to check qsizes periodically and if they're reaching close to 80% start logging warning messages.

How to reproduce the issue

2023-10-23 17:56:41,141 - INFO [kytos.core.db] [db.py:152:db_conn_wait] (MainThread) Starting DB connection
2023-10-23 17:56:41,142 - INFO [kytos.core.db] [db.py:137:_mongo_conn_wait] (MainThread) Trying to run 'hello' command on MongoDB...
2023-10-23 17:56:41,149 - INFO [kytos.core.db] [db.py:139:_mongo_conn_wait] (MainThread) Ran 'hello' command on MongoDB successfully. It's ready!
2023-10-23 17:56:41,160 - INFO [kytos.core.api_server] [api_server.py:429:_start_endpoint] (MainThread) Started /api/kytos/core/auth/login/ - GET
2023-10-23 17:56:41,160 - INFO [kytos.core.api_server] [api_server.py:429:_start_endpoint] (MainThread) Started /api/kytos/core/auth/users/ - GET
2023-10-23 17:56:41,160 - INFO [kytos.core.api_server] [api_server.py:429:_start_endpoint] (MainThread) Started /api/kytos/core/auth/users/{username} - GET
2023-10-23 17:56:41,160 - INFO [kytos.core.api_server] [api_server.py:429:_start_endpoint] (MainThread) Started /api/kytos/core/auth/users/ - POST
2023-10-23 17:56:41,160 - INFO [kytos.core.api_server] [api_server.py:429:_start_endpoint] (MainThread) Started /api/kytos/core/auth/users/{username} - DELETE
2023-10-23 17:56:41,161 - INFO [kytos.core.api_server] [api_server.py:429:_start_endpoint] (MainThread) Started /api/kytos/core/auth/users/{username} - PATCH
2023-10-23 17:56:41,161 - INFO [kytos.core.controller] [controller.py:282:create_pidfile] (MainThread) /home/viniarck/repos/kytos/.direnv/python-3.9/var/run/kytos
2023-10-23 17:56:41,162 - INFO [kytos.core.controller] [controller.py:340:start_controller] (MainThread) Starting Kytos - Kytos Controller
2023-10-23 17:56:41,164 - INFO [kytos.core.controller] [controller.py:349:start_controller] (MainThread) Starting TCP server: <kytos.core.atcp_server.KytosServer object at 0x7fee640b67c0
>
2023-10-23 17:56:41,164 - INFO [kytos.core.atcp_server] [atcp_server.py:75:serve_forever] (MainThread) Kytos listening at 0.0.0.0:6653
2023-10-23 17:56:41,164 - INFO [kytos.core.controller] [controller.py:362:start_controller] (MainThread) Loading Kytos NApps...
2023-10-23 17:56:41,167 - INFO [kytos.core.napps.napp_dir_listener] [napp_dir_listener.py:40:start] (MainThread) NAppDirListener Started...
2023-10-23 17:56:41,173 - INFO [kytos.core.controller] [controller.py:861:load_napps] (MainThread) Loading NApp kytos/of_core
2023-10-23 17:56:41,254 - INFO [kytos.core.napps.base] [base.py:248:run] (of_core) Running NApp: <Main(of_core, started 140657947563712)>
2023-10-23 17:56:41,255 - INFO [kytos.core.controller] [controller.py:861:load_napps] (MainThread) Loading NApp kytos/flow_manager
2023-10-23 17:56:41,317 - INFO [kytos.core.napps.base] [base.py:248:run] (flow_manager) Running NApp: <Main(flow_manager, started 140657930778304)>
2023-10-23 17:56:41,319 - INFO [kytos.core.api_server] [api_server.py:429:_start_endpoint] (MainThread) Started /api/kytos/flow_manager/v2/flows/{dpid} - GET
2023-10-23 17:56:41,319 - INFO [kytos.core.api_server] [api_server.py:429:_start_endpoint] (MainThread) Started /api/kytos/flow_manager/v2/flows - GET
2023-10-23 17:56:41,319 - INFO [kytos.core.api_server] [api_server.py:429:_start_endpoint] (MainThread) Started /api/kytos/flow_manager/v2/stored_flows - GET
2023-10-23 17:56:41,319 - INFO [kytos.core.api_server] [api_server.py:429:_start_endpoint] (MainThread) Started /api/kytos/flow_manager/v2/flows/{dpid} - POST
2023-10-23 17:56:41,319 - INFO [kytos.core.api_server] [api_server.py:429:_start_endpoint] (MainThread) Started /api/kytos/flow_manager/v2/flows - POST
2023-10-23 17:56:41,319 - INFO [kytos.core.api_server] [api_server.py:429:_start_endpoint] (MainThread) Started /api/kytos/flow_manager/v2/flows/{dpid} - DELETE
2023-10-23 17:56:41,319 - INFO [kytos.core.api_server] [api_server.py:429:_start_endpoint] (MainThread) Started /api/kytos/flow_manager/v2/flows - DELETE
2023-10-23 17:56:41,319 - INFO [kytos.core.api_server] [api_server.py:429:_start_endpoint] (MainThread) Started /api/kytos/flow_manager/v2/delete/{dpid} - POST
2023-10-23 17:56:41,319 - INFO [kytos.core.api_server] [api_server.py:429:_start_endpoint] (MainThread) Started /api/kytos/flow_manager/v2/delete - POST
2023-10-23 17:56:41,320 - INFO [kytos.core.controller] [controller.py:861:load_napps] (MainThread) Loading NApp kytos/of_lldp
2023-10-23 17:56:41,425 - INFO [kytos.napps.kytos/of_lldp] [main.py:58:thread_app_qsize] (Thread-55) buffers.app qsize 2
2023-10-23 17:56:41,425 - INFO [kytos.napps.kytos/of_lldp] [main.py:66:fill_app_queue] (MainThread) Filled event i 0
2023-10-23 17:56:41,426 - INFO [kytos.napps.kytos/of_lldp] [main.py:66:fill_app_queue] (MainThread) Filled event i 50
2023-10-23 17:56:41,427 - INFO [kytos.napps.kytos/of_lldp] [main.py:66:fill_app_queue] (MainThread) Filled event i 100
2023-10-23 17:56:41,427 - INFO [kytos.napps.kytos/of_lldp] [main.py:66:fill_app_queue] (MainThread) Filled event i 150
2023-10-23 17:56:41,428 - INFO [kytos.napps.kytos/of_lldp] [main.py:66:fill_app_queue] (MainThread) Filled event i 200
2023-10-23 17:56:41,428 - INFO [kytos.napps.kytos/of_lldp] [main.py:66:fill_app_queue] (MainThread) Filled event i 250
2023-10-23 17:56:41,429 - INFO [kytos.napps.kytos/of_lldp] [main.py:66:fill_app_queue] (MainThread) Filled event i 300
2023-10-23 17:56:41,429 - INFO [kytos.napps.kytos/of_lldp] [main.py:66:fill_app_queue] (MainThread) Filled event i 350
2023-10-23 17:56:41,430 - INFO [kytos.napps.kytos/of_lldp] [main.py:66:fill_app_queue] (MainThread) Filled event i 400
2023-10-23 17:56:41,430 - INFO [kytos.napps.kytos/of_lldp] [main.py:66:fill_app_queue] (MainThread) Filled event i 450
2023-10-23 17:56:41,431 - INFO [kytos.napps.kytos/of_lldp] [main.py:66:fill_app_queue] (MainThread) Filled event i 500
2023-10-23 17:56:41,317 - INFO [kytos.core.napps.base] [base.py:248:run] (flow_manager) Running NApp: <Main(flow_manager, started 140657930778304)>
2023-10-23 17:56:43,427 - INFO [kytos.napps.kytos/of_lldp] [main.py:58:thread_app_qsize] (Thread-55) buffers.app qsize 512
2023-10-23 17:56:44,429 - INFO [kytos.napps.kytos/of_lldp] [main.py:58:thread_app_qsize] (Thread-55) buffers.app qsize 512
2023-10-23 17:56:45,431 - INFO [kytos.napps.kytos/of_lldp] [main.py:58:thread_app_qsize] (Thread-55) buffers.app qsize 512
2023-10-23 17:56:46,433 - INFO [kytos.napps.kytos/of_lldp] [main.py:58:thread_app_qsize] (Thread-55) buffers.app qsize 512
diff --git a/main.py b/main.py
index 810e74b..1c4ea42 100644
--- a/main.py
+++ b/main.py
@@ -1,5 +1,7 @@
 """NApp responsible to discover new switches and hosts."""
+import time
 import struct
+import threading

 import httpx
 import tenacity
@@ -47,6 +49,21 @@ class Main(KytosNApp):
         Link.register_status_func(f"{self.napp_id}_liveness",
                                   LivenessManager.link_status_hook_liveness)
         self.table_group = {"base": 0}
+        t = threading.Thread(target=self.thread_app_qsize, daemon=True)
+        t.start()
+        self.fill_app_queue(512)
+
+    def thread_app_qsize(self, freq=1):
+        while True:
+            log.info(f"buffers.app qsize {self.controller.buffers.app.qsize()}")
+            time.sleep(1)
+
+    def fill_app_queue(self, n=512) -> None:
+        """docstring."""
+        for i in range(n):
+            self.controller.buffers.app.put(KytosEvent(name="some", content={}))
+            if i % 50 == 0:
+                log.info(f"Filled event i {i}")

cc'ing @italovalcy the hypothesis was confirmed that we discussed, indeed sounds exactly like you've described, let me know if you think it reflects what you've seen in prod. With APM, we should also be able to get some extra insights hopefully seeing which NApp might end up over publishing events during startup or if anything else blocking the MainThread with a lock.

italovalcy commented 8 months ago

@viniarck thanks for looking into this! One question please: why do we have limits on the buffers qsize ? DO you remember why we decided to change the maxsize from 0 to the default threadpool size?

Regarding the question on which Napp created those events, it was Topology, while loading the topology from DB. For each interface created, one event is publish (at least). I'm not sure if removing that process from setup would be a good idea, because loading the topology is something critical and needed. What do you think?

viniarck commented 8 months ago

why do we have limits on the buffers qsize ? DO you remember why we decided to change the maxsize from 0 to the default threadpool size?

Yes. It boils down to being able to have a reponsive queuing system while not consuming too much memory (https://github.com/kytos-ng/kytos/pull/149, prior to it could consume as much memory as it wanted), even though pretty much we're having queue -> fanout pubsub. An unbounded queue, in the worst case, when publishers don/t or won't always have rate limit in the publishing side, if we were to use an unbounded queue it can lead to either a) starvation or b) too much delay where consumers can't keep up with the rate producers are sending events. Under normal circumstances on Kytos-ng after all NApps have loaded we shouldn't have too much queued messages for a long period of time. The starvation part though we've solved with PriorityQueue, but as you know we're only using it for south bound OF message.

An unbounded queue will also consume as much memory as possible, and timeout won't work, so with an bounded queue at least it provides with the possibility to leverage a timeout in certain circumstances, which we haven't implemented yet, but will be addressed now. So, for instance any NApp publishing events during setup() should be calling put(timeout=X) and if it can't enqueue the event then we know that we have somewhere a misbehaving NApp or a threapool and/or queue size that needs to be ajusted. That couple with the fact that if a put(timeout=X) isn't handled during a setup() it'll be categorized as unhandled exception, which in turn will exit kytosd based on the lastest changes, and that would clearly sign to network operators to see if threadpool needs to be increased or just queue size.

I'm not sure if removing that process from setup would be a good idea, because loading the topology is something critical and needed. What do you think?

Definitely, the idea is to maintain setup() with a similar behavior, except still being able to handle when it gets stuck when trying to enqueue messages. Initially, I suggested to run in a Future backed wiah a ThreadPool but still synchronously to avoid conflicts with MainThread and event loop that shares MainThread, but that on its own also still needs to be coupled with a timeout when waiting for it. There's opportunity to even make it simpler with just a timeout on app.put(timeout=X) as mentioned in the prior paragraph. I've also confirmed that just a timeout in the future without queue.put(timeout=x) isn't sufficient either.

Let me know if that answer your questions @italovalcy, or if you have any other concerns or suggestions.

Side note: Also, speaking of RAM usage we still also need to further profile the core and NApps, we mapped an issue https://github.com/kytos-ng/kytos/issues/404 sometime ago that kytos container was using a large amount of memory. In the future, let's try to prioritize that and also if possible check out some RAM usag charts with the container in prod.

viniarck commented 8 months ago

Let me know if that answer your questions @italovalcy, or if you have any other concerns or suggestions.

Although with a running task monitoring queue sizes based on how large thread pools are, then unbounded queues can theoretically work well too, and then when needed, it's a matter of adjusting the thread pool based on the current events work load (in a next kytosd restart). Back then, RAM usage could linearly increase because it was two fold, a) both unbounded queue and b) unbounded threads; unbounded queue with thread pool can still be decent too (although bounded queue and thread pool is more conservative). I'll also try it out first and run some tests to confirm this hypothesis if it works well with unbounded queue and thread pool, then it's even less variables to tweak, simplifying for network operators too. Arguably, also unbounded queues work better with PriorityQueue too. Either way though, a misbehaving high frequency publisher will require priorities to be set (in the future), but for the app queue we still have plenty of room with just thread pool size alone assuming a base case that AmLight is using in prod for instance.

@italovalcy, we can also assess if it might be worth exposing queue/thread pool size usage data on a core endpoint. Although, I believe with log.warning and the current elastic search setup it already gives great visibility, and as long as the task monitoring is reliable enough then it should be OK. But with an endpoint it could get polled from Zabbix. It's something to continue the discussion, let me know what you think and if its a stats that you might need.

viniarck commented 8 months ago

I've managed to stress test locally publishing 1500 events per sec in the app buffer queue. I've explored the following cases to further confirm in practice how the two variables buffer queue size and thread pool size performed in terms of CPU and RAM usage and also overall kytosd responsiveness or any other side effect:

a) bounded app queue size 512, app thread pool size 512:

bounded_1500_50_2

b) unbounded app queue, app thread pool size 512:

unbounded_1500_50_2

c) bounded app queue size 2048, app thread pool size 2048:

bounded_1500_50_2_t2048

d) unbounded app queue, app thread pool size 2048:

unbounded_1500_50_2_t2048

Analysis

a) and b) both have the same app thread pool executor size but with app queue size 512 and inf respectively. Comparing the charts unbounded queue leads to higher rate of memory usage over the first 20 secs, almost twice as much. Also at the peak usage, it was observed with unbounded queue that at their peak of resources usage it resulted in a side effect impacting the event loop on MainThread temporarily losing connectivity with switches, whereas with bounded queues it remained consuming less memory while not having this side effect of losing connections. Bounded queues contributed to more stability since it's a more conservative approach since publishers eventually will get blocked as thread pools are doing their job. Also, the reached tps in the threadpool was close to 500 which is very close to the expected rate.

20231025_164601

c) and d) same as the prior paragraph, except CPU usage was more spread out and high higher tps close to 1666 (a little bit less than the total threads available, probably due to high context switching overhead costs)

20231025_165242

Conclusion

Using unbounded queue when you aren't always using PriorityQueue (and all messages have been prioritized), and publishers aren't rate limited before putting to the queue can lead to more instability that has been observed in practice, including other side effects as impacting on the MainThread and interfering with the asyncio event loop that's handling critical IO operations.

A bounded queue, essentially is a sub case of an unbounded queue, so as long as the value is large enough you can also have room to accommodate more events while still not drastically consuming RAM usage. So, a bounded queue with a decent and configurable queue size value will provide the most flexibility while maximizing overall stability on Kytos-ng platform out of the gate.

Proposed solution

  1. Decouple each buffers queue size to be configured independently, by default each queue size being X times larger (2 or 4 probably), and expose this on kytos.conf
  2. Add a new task/thread monitor to check buffer queue sizes, buffer queue sizes that are getting close to its capacity after 1 minute interval or so it should log as a warning.
  3. Any NApp that are publishing events during setup() will need to call with a timeout buffers.<queue>.put(timeout=x), and let the exception raise just so kytosd will exit accordingly, and then you know out of the gate that you have NApps publishing too many events initially, and even later on during the runtime you need either a larger queue size and/or larger thread pool size too. After NApps are loaded, then it's not required to call with timeout unless you can't afford to get your publisher blocked until a new slot in the queue is available.
  4. Introduce a new buffer meta for core higher level events such as {self.username}/{self.name}.loaded (including future ones)
  5. On Kytos core, refactor the rest of networking related events to use the conn buffer

Future (still needs to refine):

Let me know if you guys have any questions or suggestions.

viniarck commented 5 months ago

The queue(s) task monitor will be delivered on https://github.com/kytos-ng/kytos/issues/439