micronaut-projects / micronaut-kafka

Integration between Micronaut and Apache Kafka
Apache License 2.0
83 stars 104 forks source link

Support KafkaListener to fail on records and stop consuming #892

Open frisi opened 11 months ago

frisi commented 11 months ago

Feature description

Current behaviour

If the Listener fails while processing a record at offset N, it resumes at the offset N+1

I guess this is due to the decision made in https://github.com/micronaut-projects/micronaut-kafka/issues/372 to chose availability over consistency.

Expected behaviour

There should be a way to configure KafkaListener to stop consuming records for a topic-partition if a record failed.

If configured to RETRY_ON_ERROR it should re-try retryCount times. If it still fails, log an error and stop processing.

IIUC as of micronaut-kafka 5.1.2 there is no way to tell the listener to stop consuming orther records if if failed (or re-try indefinititely if errorStrategy RETRY_ON_ERROR is configured) Or is there?

cristianbriscaru commented 11 months ago

I am having the same issue, if there is an exception when consuming a record it's just logged as an error and the consumer moves to the next record. Have you managed to find a fix for this ?

frisi commented 10 months ago

as a workaround you could use KafkaConsumer directly @cristianbriscaru

sthg like this should work (kotlin project)

consumer.subscribe(topicPattern)

consumer.use {
        while (true) {
            // poll for new records
            val records = try {
                it.poll(Duration.ofMillis(1000L))
            } catch (e: WakeupException) {
                // log and ignore
                logger.info { "Preemptive shutdown: ${e.message}" }
                break
            } catch (e: KafkaException) {
                logger.error(e) { "Kafka error: ${e.message}" }
                throw e
            }
            // process records
            try {
                records.forEach { r ->
                    val key = r.key()
                    val version = r.timestamp()
                    val body = r.value()

                    // process record here
                }

                // commit offsets manually after finishing processing
                consumer.commitSync()
                if (records.count() > 0) {
                    logger.info { "finished batch of ${records.count()} records - commiting offsets" }
                }
            }
        }
graemerocher commented 10 months ago

@cristianbriscaru not sure I understand. The ErrorStrategy is for this https://micronaut-projects.github.io/micronaut-kafka/latest/guide/index.html#kafkaErrors

@frisi if you don't define a retry count it will retry indefinitely, what behaviour are you after?

cristianbriscaru commented 10 months ago

Hi @graemerocher , I am using ErrorStrategy but when the kafka listener is reactive the message is not retired on error

@Slf4j
@RequiredArgsConstructor
@KafkaListener(
        value = "${kafka.consumers.ledger-company.group.id}",
        offsetReset = OffsetReset.EARLIEST,
        offsetStrategy = OffsetStrategy.SYNC,
        errorStrategy = @ErrorStrategy(
                value = ErrorStrategyValue.RETRY_ON_ERROR,
                retryDelay = "100ms",
                retryCount = 999999999
        )
)
public class CompanyListener {

    private final LedgerService ledgerService;

    @Topic("${kafka-topics.company}")
    public Mono<Void> receive(ConsumerRecord<String, DomainEventValue<CompanyProto>> message) {
        var eventType = message.value().getEventType();

        if (eventType.equals(CompanyEventTypes.CREATED)) {
                return ledgerService.create(LedgerExternalId.from(message.value().getEntity().get().getId()))
                    .then();
        }

        return Mono.empty();
    }
}

In this case the listener will throw an "Error processing record ..." once and move on without retrying , if I .block() the reactive stream than the message is retried according to the ErrorStrategy

What is the proper way to have a reactive kafka listener without using .block() and have a working error strategy ?