Closed nscuro closed 1 year ago
After doing more research, and experimenting a bit, I don't think retry topics are a good fit for us.
I found this blog post from Uber to be the easiest to understand how such an architecture would look like.
A few reasons for why retry topics are not an option:
topic-retry-1000, topic-retry-2000, topic-retry-4000
for retries with 1000, 2000, and 4000 millisecond delays)Thread.sleep
or similar throttling performed in a thread, the overall throughput would suffer. Additionally, if the thread would be sleeping for too long, no heartbeat would be sent to the broker, ultimately causing a rebalance. Not good.As an alternative, I think we can use state stores for this purpose. State stores are backed by changelog topics and can be replicated to other Kafka Streams instances. To-be-retried messages would be enriched with retry metadata (retryAt
, retriesRemaining
, etc.) and written to the store. Punctuators iterate over all stored values in a fixed interval (e.g. every second), and determine if they are eligible for retry. When eligible, push the record to the current batch.
Here's a bare-bones implementation of retries, combined with batching, for OSS Index:
package org.acme.analyzer;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.type.TypeReference;
import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;
import io.quarkus.kafka.client.serialization.ObjectMapperSerde;
import io.quarkus.kafka.client.serialization.ObjectMapperSerializer;
import org.acme.model.Component;
import org.acme.model.VulnerabilityResult;
import org.apache.kafka.clients.producer.internals.BuiltInPartitioner;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.enterprise.context.ApplicationScoped;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import static org.apache.kafka.streams.state.Stores.keyValueStoreBuilder;
import static org.apache.kafka.streams.state.Stores.persistentKeyValueStore;
@ApplicationScoped
public class OssIndexProcessorSupplier implements ProcessorSupplier<String, Component, String, VulnerabilityResult> {
private static final String BATCH_STORE_NAME = "ossindex-batch";
private static final String RETRY_STORE_NAME = "ossindex-retry";
private static final StoreBuilder<KeyValueStore<String, List<Component>>> BATCH_STORE_BUILDER;
private static final StoreBuilder<KeyValueStore<UUID, Retryable>> RETRY_STORE_BUILDER;
static {
BATCH_STORE_BUILDER = keyValueStoreBuilder(persistentKeyValueStore(BATCH_STORE_NAME),
Serdes.String(), Serdes.serdeFrom(new ObjectMapperSerializer<>(),
new ObjectMapperDeserializer<>(new TypeReference<>() {
})));
RETRY_STORE_BUILDER = keyValueStoreBuilder(persistentKeyValueStore(RETRY_STORE_NAME),
Serdes.UUID(), new ObjectMapperSerde<>(Retryable.class));
}
private final OssIndexAnalyzer ossIndexAnalyzer;
public OssIndexProcessorSupplier(final OssIndexAnalyzer ossIndexAnalyzer) {
this.ossIndexAnalyzer = ossIndexAnalyzer;
}
@Override
public Processor<String, Component, String, VulnerabilityResult> get() {
return new OssIndexProcessor(ossIndexAnalyzer);
}
@Override
public Set<StoreBuilder<?>> stores() {
return Set.of(BATCH_STORE_BUILDER, RETRY_STORE_BUILDER);
}
private static class OssIndexProcessor extends ContextualProcessor<String, Component, String, VulnerabilityResult> {
private static final Logger LOGGER = LoggerFactory.getLogger(OssIndexProcessor.class);
private final Analyzer analyzer;
private KeyValueStore<String, List<Component>> batchStore;
private KeyValueStore<UUID, Retryable> retryStore;
private Cancellable batchPunctuator;
private Cancellable retryPunctuator;
private OssIndexProcessor(final Analyzer analyzer) {
this.analyzer = analyzer;
}
@Override
public void init(final ProcessorContext<String, VulnerabilityResult> context) {
super.init(context);
batchStore = context().getStateStore(BATCH_STORE_NAME);
retryStore = context().getStateStore(RETRY_STORE_NAME);
batchPunctuator = context().schedule(Duration.ofSeconds(5), PunctuationType.WALL_CLOCK_TIME, this::punctuateBatch);
retryPunctuator = context().schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, this::punctuateRetry);
}
@Override
public void process(final Record<String, Component> record) {
final Optional<Integer> partitionId = context().recordMetadata().map(RecordMetadata::partition);
if (partitionId.isEmpty()) {
LOGGER.error("Unable to get partition ID for record; Skipping");
return;
}
if (isScheduledForRetry(record.value().getUuid())) {
LOGGER.warn("Component {} is already scheduled for retry; Skipping", record.value());
return;
}
addToBatch(partitionId.get(), record.value());
}
@Override
public void close() {
batchPunctuator.cancel();
retryPunctuator.cancel();
}
private void addToBatch(final int partitionId, final Component component) {
List<Component> batch = batchStore.get(Integer.toString(partitionId));
if (batch == null) {
batch = List.of(component);
} else {
batch.add(component);
}
if (batch.size() >= 128) {
analyze(batch);
batchStore.put(Integer.toString(partitionId), null);
} else {
batchStore.put(Integer.toString(partitionId), batch);
}
}
private void analyze(final List<Component> components) {
final List<VulnerabilityResult> results;
try {
results = analyzer.analyze(components);
} catch (RuntimeException e) {
LOGGER.warn("Analysis of {} components failed; Scheduling for retry", components.size(), e);
for (final Component component : components) {
retryStore.put(component.getUuid(), new Retryable(component, context().currentSystemTimeMs() + Duration.ofSeconds(90).toMillis()));
}
return;
}
for (final VulnerabilityResult result : results) {
context().forward(new Record<>(result.getComponent().getPurl().getCoordinates(), result, context().currentSystemTimeMs()));
}
}
private void punctuateBatch(final long timestamp) {
LOGGER.debug("Batch punctuator triggered at {}", timestamp);
try (final KeyValueIterator<String, List<Component>> valueIterator = batchStore.all()) {
while (valueIterator.hasNext()) {
final KeyValue<String, List<Component>> kv = valueIterator.next();
if (kv.value != null && !kv.value.isEmpty()) {
LOGGER.info("Forwarding batch (key: {}, components: {})", kv.key, kv.value.size());
analyze(kv.value);
batchStore.put(kv.key, null);
}
}
}
}
private void punctuateRetry(final long timestamp) {
LOGGER.debug("Retry punctuator triggered at {}", timestamp);
try (final KeyValueIterator<UUID, Retryable> valueIterator = retryStore.all()) {
while (valueIterator.hasNext()) {
final KeyValue<UUID, Retryable> kv = valueIterator.next();
if (kv.value.nextRetry <= timestamp) {
LOGGER.info("Retrying component {}", kv.value.component);
addToBatch(BuiltInPartitioner.partitionForKey(kv.value.component().getPurl().getCoordinates().getBytes(), 3), kv.value.component());
retryStore.delete(kv.key);
}
}
}
}
private boolean isScheduledForRetry(final UUID uuid) {
return retryStore.get(uuid) != null;
}
}
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
public record Retryable(Component component, long nextRetry) {
}
}
In this example, there's a fixed retry delay of 90 seconds, and no maximum attempt enforcement.
State stores are bound to stream threads, so we can be sure that "main" messages and retry messages will be processed by the same thread.
IBM does something similar in kafka-retry. They too use state stores to store the retry information, however when a message is due for retry, they re-publish it back to the original topic. Which would be an option for us too, I think.
Note that this would require a dedicated topic for each analyzer.
As a majority of what the applications in this repo are doing consists of reaching out to external services, they are of course very vulnerable to instabilities in those external services affecting overall system performance and reliability.
There are various reasons for external requests going bad:
At the moment, whenever the streams applications encounter an error, the stream thread is shut down, and will not be available for further task processing (that's the default behavior in Kafka Streams). Not good for obvious reasons.
Simply ignoring such errors is also not a good option.
According to some initial research, the idiomatic way of implementing retries in Kafka Streams is to have dedicated retry topics, and eventually a "dead letter topic" for when the maximum retries have exceeded. See https://www.confluent.io/blog/error-handling-patterns-in-kafka/
Having retries implemented in stream processors is most likely sub-optimal, as it will reduce message throughput of the consumer.