Closed mir-am closed 2 years ago
@mir-am I found something of interest... the longer FASTEN (legacy) server plugins are running - the lower their consumption speed gets. I can observe this while monitoring transactions/second achieved on a DB. It starts high, then goes down over time. If I delete and recreate a container (eg. quality metadata plugin) speed goes back up to initial speed.
There must be something in the server logic that grows and grows and leads to more iterating. Shall we investigate?
Also tagging @proksch here.
Here you see the behavior from the PG admin dashboard. On the left you see a 50 transactions/sec rate, slowly going down. Then I stopped the plugin container (quality analyzer meta-data), deleted it and restarted. Transaction rate shoots up, then slowly starts going down again. If I just restart the container (no delete) transaction rate remains low.
As this impact potentially all plugins running, also on monster, this warrants an urgent look..!
edit: in this container the local storage dir is inside the container (no volume mount) hence the deleting improves things temporarily.
To rule out: while I am monitoring the plugin, the database is stable in the sense that not more callables are being added at the same time. Only the quality metadata plugin is doing its thing.
Also of interest to @cg122
@MagielBruntink, Thanks for the further investigation. Indeed, we need to find out what exactly causes the slowdown.
The FASTEN server logic is simple and it doesn't do intensive operations. ATM, I speculate that two possible performance issues: 1- There might be a memory leak in the plugin that causes the JVM heap to grow quickly and hence it slows down transactions. 2- Kafka operations are asynchronous, though there are some configurations that might be needed to tweak like buffers, etc.
With --local_storage
removed I observe better behavior:
Quite sure that's the problem... --local-storage
creates a file for each message processed, then loops over all files on every message processed. Quality metadata plugin processes a lot of messages, hence this issue appears.
Quite sure that's the problem...
--local-storage
creates a file for each message processed, then loops over all files on every message processed. Quality metadata plugin processes a lot of messages, hence this issue appears.
Nice catch! --local-storage
can slow down a fast plug-in-like quality metadata as it performs a lot of I/O operations.
@mir-am I think local storage slows down any plugin that has processed a large number of messages, probably also Opal and Metadata Db extension on Monster. Because it iterates over all those locally stored hash-files every time it processes a message. It may be keeping the whole pipeline back at the moment.
@mir-am I think local storage slows down any plugin that has processed a large number of messages, probably also Opal and Metadata Db extension on Monster. Because it iterates over all those locally stored hash-files every time it processes a message. It may be keeping the whole pipeline back at the moment.
For OPAL and metadata DB, we do need to have the --local-storage
option enabled to avoid an infinite crash in the case of OOM, etc.
To speed up local storage, we can use an in-memory DB instead of filesystem.
Will the reverse situation not require a heartbeat then, ie. will there be no consumer eviction on the prio topic if there are only messages on the normal topic?
Good point. To keep prior. connection alive, we should also call sendHeartBeat
for the prior topic. I will address this.
Given that you have found the culprit for the slowdown of the quality plug-in, the sendHeartBeat
method doesn't seem to have overhead.
@mir-am I think local storage slows down any plugin that has processed a large number of messages, probably also Opal and Metadata Db extension on Monster. Because it iterates over all those locally stored hash-files every time it processes a message. It may be keeping the whole pipeline back at the moment.
For OPAL and metadata DB, we do need to have the
--local-storage
option enabled to avoid an infinite crash in the case of OOM, etc. To speed up local storage, we can use an in-memory DB instead of filesystem.
I don’t understand why we don’t let Kafka handle this - it seems to me that is one of its major capabilities. If we consume a message, should we not commit prior to processing it further? Then if the processing crashes the plugin, on its next run it will not consume the same message again.
@mir-am I think local storage slows down any plugin that has processed a large number of messages, probably also Opal and Metadata Db extension on Monster. Because it iterates over all those locally stored hash-files every time it processes a message. It may be keeping the whole pipeline back at the moment.
For OPAL and metadata DB, we do need to have the
--local-storage
option enabled to avoid an infinite crash in the case of OOM, etc. To speed up local storage, we can use an in-memory DB instead of filesystem.I don’t understand why we don’t let Kafka handle this - it seems to me that is one of its major capabilities. If we consume a message, should we not commit prior to processing it further? Then if the processing crashes the plugin, on its next run it will not consume the same message again.
Generally speaking, there are two processing guarantees in Kafka, namely, at-least-once and at-most-once. We currently do the at-least-once strategy, meaning that Kafka guarantees that every message will be processed, produced to .out
, and saved into DB (if needed). However, this strategy does not work in the case of fatal errors like OOM. Therefore, we introduced the --local-storage
option.
Also, the at-least-one strategy is beneficial when using Kuberenetes deployment. At times, we may stop a K8s deployment or pods may get evicted. In these kind of situations, we won't lose a message for processing.
That being said, you can read more about Kafka's processing guarantees here.
Thanks @mir-am for this discussion. I understand our strategy better now and think it’s the right approach if using bare bones Kafka, like us :-)
What I’m observing however is that the local storage files are not deleted after a successful process. This leads to the massive numbers of files in the local storage directories and the slowdown. Where there changes to the deleting code recently?
@mir-am Found the problem. The local storage is only cleared after successful processing on the normal lane, never after processing on the priority lane. So priority messages are left around in the local storage dir until a normal lane message is successfully processed, causing potential slowdown if there are a lot of them :-) This is consistent with the experiments I was doing, which used the priority lane exclusively.
The problematic code is here: https://github.com/fasten-project/fasten/blob/949374ac105fb1939c87f010352061c916f319a4/server/src/main/java/eu/fasten/server/plugins/kafka/FastenKafkaPlugin.java#L281
There is also a TODO in the priority lane handling code that suggests that indeed a lists of processed messages should be kept :-) That list can then be used to call the local storage clearing code.
Isnn't it strange actually that the local storage is not specific to topic?
Nice finding! I will look into the above-described --local-storage
issue tomorrow.
@MagielBruntink I have addressed the local storage issue for the priority records in 8d6f9cd. It should fix the slowdown issue when lots of priority records are processed.
@mir-am great! One thing that's bugging me though: should the local storage not be specific to topic? Right now there is a local storage directory structure for plugin and partition, but not topic. So we can have the normal lane processing clearing locally stored messages from priority lane and vice versa. Or is that no problem?
@mir-am great! One thing that's bugging me though: should the local storage not be specific to topic? Right now there is a local storage directory structure for plugin and partition, but not topic. So we can have the normal lane processing clearing locally stored messages from priority lane and vice versa. Or is that no problem?
I may not have understood your question! Do you mean that we can have separate local storage for both normal and priority lanes? ATM, as you said, the local storage is created based on partition no. and a record's hash, which I think it should be fine. It already prevents an infinite crash in the case of OOM.
That being said, we can add lane's info to the beginning of a record's hash like:
localStorage.store(lane.toString() + record.value(), record.partition());
Well, a plugin can consume from any number of topics. The local storage is not specific to topic at all, only partition. So you would be potentially deleting messages from other topics than the one you are processing. Again, I'm not 100% sure this will be a problem since you hash the message as well, but it feels flaky. If it's not a big deal I think we should change the local storage structure to:
plugin/instance id/topic/partition
where /topic/
is the new, the rest is already there.
Well, a plugin can consume from any number of topics. The local storage is not specific to topic at all, only partition. So you would be potentially deleting messages from other topics than the one you are processing. Again, I'm not 100% sure this will be a problem since you hash the message as well, but it feels flaky. If it's not a big deal I think we should change the local storage structure to:
plugin/instance id/topic/partition
where/topic/
is the new, the rest is already there.
I have implemented your suggestion, i.e., changing the local storage structure to plugin/instance id/topic/partition
in https://github.com/fasten-project/fasten/pull/449/commits/19bf54479fa440cba5ba2a74e91deb4a2da8102c.
Also, it is tested with the DC.
Nice! Shall we proceed and release the FASTEN pipeline? Then I can also deploy the optimized version of the quality analyzer plugin on Monster.
Nice! Shall we proceed and release the FASTEN pipeline? Then I can also deploy the optimized version of the quality analyzer plugin on Monster.
Yes. After the merge of this PR, I can create a new release of fasten.pipeline
.
@MagielBruntink Could you please review this PR and merge it if it's okay?
Description
This PR addresses the performance issue in #448 by making changes to the FASTEN server:
sendHeartBeat
method is now called when processing both priority and normal records to keep both consumers alive.plugin/instance id/topic/partition
.Motivation and context
The FASTEN server becomes slower over time by processing more and more priority records. Because the local storage for priority records was not cleaned up and hence the FASTEN server had to read tons of hash messages from the filesystem.
Testing
Tested with the DC.