unioslo / zabbix-auto-config

MIT License
3 stars 6 forks source link

Unbounded source collector queues can lead to memory starvation #60

Closed pederhan closed 1 year ago

pederhan commented 1 year ago

If ZAC fails to process all items in the source collector host queue between collection cycles, the application can enter a situation where the unprocessed queue grows with each collection cycle. This leads to high memory usage, decreased performance, and faster queue growth until the application runs out of memory and shuts down.

This issue results from overly frequent host collections, which is hard for users to debug due to the lack of clear signs of excessive queue growth. Each cycle only adds one item to the queue (which is actually a dict with a list of potentially thousands of hosts) - a fact that isn't obviously problematic in the logs.

https://github.com/unioslo/zabbix-auto-config/blob/36155319b3a72e0919a53b7902621374a432df61/zabbix_auto_config/processing.py#L128-L133

https://github.com/unioslo/zabbix-auto-config/blob/36155319b3a72e0919a53b7902621374a432df61/zabbix_auto_config/processing.py#L170

A simplistic solution would be to allow configurable queue size for each source collector. However, this isn't as practical as it seems. Since each collector only adds one item per cycle to the queue, determining an appropriate queue size is challenging, especially for users unfamiliar with the internal workings of the queue.

Read on to find out why this solution is unfeasible without a rewrite of the queue system, or jump directly ahead to the next section for the actual proposed solution.

Expand ## Configurable queue size A seemingly logical but impractical solution would be to establish a maximum size for each source collector queue that exceeds the anticipated number of collected hosts. For instance, if we expect to collect 8000 hosts, logically the max size would be set to >8000. ### Problems with this approach The current queue expects a dict in the form of `{"source": "", "hosts": [host1, host2, ..., hostn]}`, and as such we can't use the built-in `maxsize` parameter of `multiprocessing.Queue`, since each collection cycle would only add 1 item to the queue, not 1 item for each host. This more or less breaks this idea completely, since we can't easily check the number of hosts in the queue.

Limiting the queue with maxsize=1

It seems like the simplest solution to this problem is to simply disallow new items from entering the queue while the queue is still populated with the dict it receives from the source collector.

This can be achieved by setting maxsize=1 on the queue, which will block the source collector from adding new items to the queue until the previous item has been processed by the source handler process. By using Queue.put_nowait(obj), we immediately skip the current collection cycle if the queue is already populated with hosts.

class SourceCollectorProcess(BaseProcess):
    # ...
    def collect(self) -> None:
        # ...
        source_hosts = {
            "source": self.name,
            "hosts": valid_hosts,
        }
        try:
            self.source_hosts_queue.put_nowait(source_hosts)
        except queue.Full:
            logging.warning(
                f"Queue is full, skipping hosts from source %s", self.name
            )
        else:
            logging.info("Done collecting %d hosts from source, '%s', in %.2f seconds. Next update: %s", len(valid_hosts), self.name, time.time() - start_time, self.next_update.isoformat(timespec="seconds"))

We could alternatively opt to verify if Queue.empty() before initiating the collection process, ensuring that the previous queue has been dealt with. This approach is more resourceful as it prevents unnecessary collection of hosts that would probably be dismissed. However, this method might cause a delay in updating hosts, particularly if the source handler would have managed to empty the queue by the time the collection process ends.

class SourceCollectorProcess(BaseProcess):
    # ...
    def work(self):
        # ...
        if not self.source_hosts_queue.empty():
            logging.warning(
                "Queue is not empty, skipping collection of hosts from source %s",
                self.name,
            )
            return

        logging.info("Collection starting")

        try:
            self.collect()
        except Exception as e:
            # ...
        # ...

Future work

The queue system should probably be rewritten to use a single shared queue where we put each host individually, instead of putting a dict containing a list of hosts, e.g. something like this:

class CollectedHost(NamedTuple):
    source: str
    host: Host

This would allow us to process more hosts (especially if we turn the collector functions into generators instead of returning a list of hosts), and would also allow us to use the maxsize parameter of multiprocessing.Queue to limit the queue size.

paalbra commented 1 year ago

This sounds like #3

I'd consider making the producers remove old data from the queue (or what ever object/storage is used between processes). The newly produced data should be more current than what ever is on the queue from before.

pederhan commented 1 year ago

It's a good compromise, but I would still want to implement maxsize=1 just to ensure nothing can cause unbounded growth.

paalbra commented 1 year ago

Yup. I've previously thought about maybe a better object than Queue might exist for process communication? Since a maxsize of 1 is more reasonable. I never had the time to look into it though.