Open nscuro opened 1 year ago
The problem is also documented here: https://medium.com/@andy.bryant/kafka-streams-work-allocation-4f31c24753cc
Using Smallrye reactive messaging would linder this pain, but introduce others. With SRM, consumer threads can be scaled independently. Major downsides are that we'd loose the convenience of Kafka-backed state stores (stateful processing / "checkpointing" requires Redis or database access), also the batching functionality provided by Smallrye is sub-optimal as it returns too few records most of the time.
Wondering how much this actually matters considering that a "completed" analysis would require results from all scanners anyway. Makes it questionable whether OSS Index being significantly faster than Snyk makes a noticeable difference in the end-to-end use case. If multiple analyzers are enabled, the slowest of them will always be the bottleneck, no matter how fast all others are.
The effect will also be less noticeable once caching kicks in.
SRM behaves in very unintuitive ways:
@Blocking
is used
@Blocking
is required for synchronous things, e.g. database interactions and REST calls@Blocking
causes messages to be processed on one vert-x worker thread@Blocking(ordered = false)
throttled
commit strategy, which can cause processed record offsets to not be committed at all (when the application is shut down), or commit offsets even for records that have not been successfully processed
As per Kafka Streams' threading model, a topology is broken down into stream tasks. The number of tasks created depends on how many sub-topologies there are, and from how many partitions those sub-topologies consume. One or more tasks are assigned to a single stream thread. The number of stream threads in an application instance is defined by
kafka-streams.num.stream.threads
.If
num.stream.threads
is lower than the number of stream tasks generated for the topology, there will be some threads working on multiple tasks. In Kafka Streams, there is no way to influence how tasks are assigned. This could lead to situations where one thread is assigned to tasks from both:dtrack.vuln-analysis.component
)This means that the upstream task (capable of processing multiple hundreds of records per second) will be significantly delayed, which also has a negative impact on other tasks depending on its results.
In the screenshot below, consumers of the
dtrack.vuln-analysis.component
anddtrack.vuln-analysis.component.purl
topics have a significant lag, despite the respective sub-topologies being capable of processing hundreds of records per second. This should not be the case. Instead, consumers of those topics should be able to keep up with new records in almost realtime.For the vulnerability analyzer, a slow processor like Snyk can additionally lead to sub-optimal batching in other processors. Because fewer records arrive in a given time window, OSS Index will not be able to use the maximum batch size of 128 efficiently:
This is not an issue when there is one stream thread per stream task. For example, when both OSS Index and Snyk are enabled, but the internal analyzer is disabled, there are 18 partitions the application consumes from, leading to 18 stream tasks. Spawning one application instance with
num.stream.threads=18
, or three instances withnum.stream.threads=6
, leads to optimal processing conditions.Effectively, scaling up is unproblematic, but scaling down always comes with the danger of significantly slowing down the entire system.
Possible solutions:
KafkaStreams
instances instead of just one (Quarkus only supports oneKafkaStreams
per application, we'd need more code to make this work, and we have to give up the dev mode integration)