Closed Kaua3045 closed 9 months ago
No momento o tratamento de retry ficou assim, ainda precisamos adicionar uma forma de "versionar" os eventos, evitando assim que um evento antigo seja processado se um mais novo já foi processado:
@KafkaListener(
concurrency = "${kafka.consumers.products.concurrency}",
containerFactory = "kafkaListenerFactory",
topics = "${kafka.consumers.products.topics}",
groupId = "${kafka.consumers.products.group-id}",
id = "${kafka.consumers.products.id}",
properties = {
"auto.offset.reset=${kafka.consumers.products.auto-offset-reset}"
}
)
@RetryableTopic(
// backoff delay 2 seconds, multiplier 2
backoff = @Backoff(delay = 2000, multiplier = 2),
attempts = "${kafka.consumers.products.max-attempts}",
autoCreateTopics = "${kafka.consumers.products.auto-create-topics}",
dltTopicSuffix = "-retry-dlt",
topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE
)
public void onMessage(
@Payload final String payload,
final Acknowledgment ack,
final ConsumerRecordMetadata metadata
) {
LOG.atLevel(Level.INFO).log("Message received from Kafka [topic:{}] [partition:{}] [offset:{}]: {}",
metadata.topic(), metadata.partition(), metadata.offset(), payload);
final var aOutBoxEvent = Json.readValue(payload, PRODUCT_MESSAGE).payload().after();
switch (aOutBoxEvent.getEventType()) {
case EventsTypes.PRODUCT_CREATED -> {
final var aProductCreated = Json.readValue(aOutBoxEvent.getData(), ProductCreatedEvent.class);
final var aProductId = aProductCreated.id();
this.productGateway.findById(aProductId)
.ifPresentOrElse(aProduct -> {
this.saveProductUseCase.execute(aProduct);
ack.acknowledge();
LOG.info(EVENT_RECEIVED_MESSAGE, "created", aProductId);
}, () -> LOG.debug(NOT_FOUND_MESSAGE, aProductId));
}
case EventsTypes.PRODUCT_UPDATED -> {
final var aProductUpdated = Json.readValue(aOutBoxEvent.getData(), ProductUpdatedEvent.class);
final var aProductId = aProductUpdated.id();
this.productGateway.findById(aProductId)
.ifPresentOrElse(aProduct -> {
this.saveProductUseCase.execute(aProduct);
ack.acknowledge();
LOG.info(EVENT_RECEIVED_MESSAGE, "updated", aProductId);
}, () -> LOG.debug(NOT_FOUND_MESSAGE, aProductId));
}
case EventsTypes.PRODUCT_DELETED -> {
final var aProductUpdated = Json.readValue(aOutBoxEvent.getData(), ProductDeletedEvent.class);
final var aProductId = aProductUpdated.id();
this.removeProductUseCase.execute(aProductId);
ack.acknowledge();
LOG.info(EVENT_RECEIVED_MESSAGE, "deleted", aProductId);
}
default -> LOG.warn("Event type not supported: {}", aOutBoxEvent.getEventType());
}
}
@DltHandler
public void onDltMessage(
@Payload String payload,
final Acknowledgment acknowledgment,
final ConsumerRecordMetadata metadata
) {
LOG.atLevel(Level.WARN).log("Message received from Kafka at DLT [topic:{}] [partition:{}] [offset:{}]: {}",
metadata.topic(), metadata.partition(), metadata.offset(), payload);
final var aOutBoxEvent = Json.readValue(payload, PRODUCT_MESSAGE).payload().after();
final var aInstantNow = InstantUtils.now();
final var aDuration = Duration.between(aOutBoxEvent.getOccurredOn(), aInstantNow);
final var aDurationInDays = aDuration.toDays();
if (aDurationInDays > 7) {
this.kafkaTemplate.send(PRODUCT_DLT_INVALID, payload);
acknowledgment.acknowledge();
LOG.error("Event reprocessed time exceeded 7 days, sent to DLT: {} and now: {} and duration: {}, payload: {}",
aOutBoxEvent.getOccurredOn(), aInstantNow, aDuration, payload);
return;
}
this.kafkaTemplate.send(PRODUCT_TOPIC, payload);
acknowledgment.acknowledge();
LOG.warn("Event not exceeded 7 days, sent to product topic: {}, occurred on: {} and now: {} and duration: {}, payload: {}",
PRODUCT_TOPIC, aOutBoxEvent.getOccurredOn(), aInstantNow, aDuration, payload);
}
Essa é a classe atual:
@KafkaListener(
concurrency = "${kafka.consumers.products.concurrency}",
containerFactory = "kafkaListenerFactory",
topics = "${kafka.consumers.products.topics}",
groupId = "${kafka.consumers.products.group-id}",
id = "${kafka.consumers.products.id}",
properties = {
"auto.offset.reset=${kafka.consumers.products.auto-offset-reset}"
}
)
@RetryableTopic(
// backoff delay 2 seconds, multiplier 2
backoff = @Backoff(delay = 2000, multiplier = 2),
attempts = "${kafka.consumers.products.max-attempts}",
autoCreateTopics = "${kafka.consumers.products.auto-create-topics}",
dltTopicSuffix = "-retry-dlt",
topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE
)
public void onMessage(
@Payload final String payload,
final Acknowledgment ack,
final ConsumerRecordMetadata metadata
) {
LOG.atLevel(Level.INFO).log("Message received from Kafka [topic:{}] [partition:{}] [offset:{}]: {}",
metadata.topic(), metadata.partition(), metadata.offset(), payload);
final var aOutBoxEvent = Json.readValue(payload, PRODUCT_MESSAGE).payload().after();
switch (aOutBoxEvent.getEventType()) {
case EventsTypes.PRODUCT_CREATED -> {
final var aProductCreated = Json.readValue(aOutBoxEvent.getData(), ProductCreatedEvent.class);
final var aProductId = aProductCreated.id();
processCreatedAndUpdatedEvent(
aProductId,
ack,
"created",
aProductCreated
);
}
case EventsTypes.PRODUCT_UPDATED -> {
final var aProductUpdated = Json.readValue(aOutBoxEvent.getData(), ProductUpdatedEvent.class);
final var aProductId = aProductUpdated.id();
processCreatedAndUpdatedEvent(
aProductId,
ack,
"updated",
aProductUpdated
);
}
case EventsTypes.PRODUCT_DELETED -> {
final var aProductUpdated = Json.readValue(aOutBoxEvent.getData(), ProductDeletedEvent.class);
final var aProductId = aProductUpdated.id();
this.removeProductUseCase.execute(aProductId);
ack.acknowledge();
this.eventValidationService.invalidate(aProductUpdated.aggregateName(), aProductId);
LOG.info(EVENT_RECEIVED_MESSAGE, "deleted", aProductId);
}
default -> LOG.warn("Event type not supported: {}", aOutBoxEvent.getEventType());
}
}
@DltHandler
public void onDltMessage(
@Payload String payload,
final Acknowledgment acknowledgment,
final ConsumerRecordMetadata metadata
) {
LOG.atLevel(Level.WARN).log("Message received from Kafka at DLT [topic:{}] [partition:{}] [offset:{}]: {}",
metadata.topic(), metadata.partition(), metadata.offset(), payload);
final var aOutBoxEvent = Json.readValue(payload, PRODUCT_MESSAGE).payload().after();
final var aInstantNow = InstantUtils.now();
final var aDuration = Duration.between(aOutBoxEvent.getOccurredOn(), aInstantNow);
final var aDurationInDays = aDuration.toDays();
if (aDurationInDays > 15) {
this.kafkaTemplate.send(PRODUCT_DLT_INVALID, payload);
acknowledgment.acknowledge();
LOG.error("Event reprocessed time exceeded 15 days, sent to DLT: {} and now: {} and duration: {}, payload: {}",
aOutBoxEvent.getOccurredOn(), aInstantNow, aDuration, payload);
return;
}
this.kafkaTemplate.send(PRODUCT_TOPIC, payload);
acknowledgment.acknowledge();
LOG.warn("Event not exceeded 15 days, sent to product topic: {}, occurred on: {} and now: {} and duration: {}, payload: {}",
PRODUCT_TOPIC, aOutBoxEvent.getOccurredOn(), aInstantNow, aDuration, payload);
}
private <T extends DomainEvent> void processCreatedAndUpdatedEvent(
final String aId,
final Acknowledgment ack,
final String type,
final T aDomainEvent
) {
if (this.eventValidationService.isInvalid(aDomainEvent, aId)) {
ack.acknowledge();
LOG.warn("Product event is old: {}", aId);
return;
}
this.productGateway.findById(aId)
.ifPresentOrElse(aProduct -> {
this.saveProductUseCase.execute(aProduct);
ack.acknowledge();
LOG.info(EVENT_RECEIVED_MESSAGE, type, aId);
}, () -> LOG.debug(NOT_FOUND_MESSAGE, aId));
}
Essa parte aqui, acaba não sendo executado, pois hoje validamos a data do evento, se o evento que esta no redis for no futuro do que o que recebemos, ele deve marcar como processada, talvez o certo seria enviar pra DLT e deixar expirar lá.
if (aDurationInDays > 15) {
this.kafkaTemplate.send(PRODUCT_DLT_INVALID, payload);
acknowledgment.acknowledge();
LOG.error("Event reprocessed time exceeded 15 days, sent to DLT: {} and now: {} and duration: {}, payload: {}",
aOutBoxEvent.getOccurredOn(), aInstantNow, aDuration, payload);
return;
}
Essa se tornou a versão final no momento:
@KafkaListener(
concurrency = "${kafka.consumers.products.concurrency}",
containerFactory = "kafkaListenerFactory",
topics = "${kafka.consumers.products.topics}",
groupId = "${kafka.consumers.products.group-id}",
id = "${kafka.consumers.products.id}",
properties = {
"auto.offset.reset=${kafka.consumers.products.auto-offset-reset}"
}
)
@RetryableTopic(
// backoff delay 2 seconds, multiplier 2
backoff = @Backoff(delay = 2000, multiplier = 2),
attempts = "${kafka.consumers.products.max-attempts}",
autoCreateTopics = "${kafka.consumers.products.auto-create-topics}",
dltTopicSuffix = "-retry-dlt",
topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE
)
public void onMessage(
@Payload final String payload,
final Acknowledgment ack,
final ConsumerRecordMetadata metadata
) {
LOG.atLevel(Level.INFO).log("Message received from Kafka [topic:{}] [partition:{}] [offset:{}]: {}",
metadata.topic(), metadata.partition(), metadata.offset(), payload);
final var aOutBoxEvent = Json.readValue(payload, PRODUCT_MESSAGE).payload().after();
switch (aOutBoxEvent.getEventType()) {
case EventsTypes.PRODUCT_CREATED -> {
final var aProductCreated = Json.readValue(aOutBoxEvent.getData(), ProductCreatedEvent.class);
final var aProductId = aProductCreated.id();
processCreatedAndUpdatedEvent(
aProductId,
ack,
"created",
aProductCreated,
payload
);
}
case EventsTypes.PRODUCT_UPDATED -> {
final var aProductUpdated = Json.readValue(aOutBoxEvent.getData(), ProductUpdatedEvent.class);
final var aProductId = aProductUpdated.id();
processCreatedAndUpdatedEvent(
aProductId,
ack,
"updated",
aProductUpdated,
payload
);
}
case EventsTypes.PRODUCT_DELETED -> {
final var aProductUpdated = Json.readValue(aOutBoxEvent.getData(), ProductDeletedEvent.class);
final var aProductId = aProductUpdated.id();
this.removeProductUseCase.execute(aProductId);
ack.acknowledge();
this.eventValidationService.invalidate(aProductUpdated.aggregateName(), aProductId);
LOG.info(EVENT_RECEIVED_MESSAGE, "deleted", aProductId);
}
default -> LOG.warn("Event type not supported: {}", aOutBoxEvent.getEventType());
}
}
@DltHandler
public void onDltMessage(
@Payload String payload,
final Acknowledgment acknowledgment,
final ConsumerRecordMetadata metadata
) {
LOG.atLevel(Level.WARN).log("Message received from Kafka at DLT [topic:{}] [partition:{}] [offset:{}]: {}",
metadata.topic(), metadata.partition(), metadata.offset(), payload);
final var aOutBoxEvent = Json.readValue(payload, PRODUCT_MESSAGE).payload().after();
final var aInstantNow = InstantUtils.now();
this.kafkaTemplate.send(PRODUCT_TOPIC, payload);
acknowledgment.acknowledge();
LOG.warn("Event sent to product topic for retry: {}, occurred on: {} and now: {}, payload: {}",
PRODUCT_TOPIC, aOutBoxEvent.getOccurredOn(), aInstantNow, payload);
}
private <T extends DomainEvent> void processCreatedAndUpdatedEvent(
final String aId,
final Acknowledgment ack,
final String type,
final T aDomainEvent,
final String aPayload
) {
if (this.eventValidationService.isInvalid(aDomainEvent, aId)) {
this.kafkaTemplate.send(PRODUCT_DLT_INVALID, aPayload);
ack.acknowledge();
LOG.error("Product event is old, sent to DLT: {} and now: {}, payload: {}",
aDomainEvent.occurredOn(), InstantUtils.now(), aPayload);
return;
}
this.productGateway.findById(aId)
.ifPresentOrElse(aProduct -> {
this.saveProductUseCase.execute(aProduct);
ack.acknowledge();
LOG.info(EVENT_RECEIVED_MESSAGE, type, aId);
}, () -> LOG.debug(NOT_FOUND_MESSAGE, aId));
}
No futuro o correto é adicionar um parâmetro na classe Entity chamado de version
que seria um Instant
e junto adicionar um método getVersion()
e upgradeVersion()
o método upgrade pega a version atual e passa um novo valor que seria a hora atual. Essa version deve ser salvo como uma coluna de cada agreggate root e só deve ser modificada por esse método.
Bom, hoje o listener do kafka não esta lidando com reprocessamento de mensagens que falharam, o certo é adicionar um
@RetryableTopic
que envia pra uma fila de erro e nessa fila de erro devemos fazer 2 checagem antes de decidir enviar a mensagem pra fila principal ou pra DLT, se já se passou um certo tempo, por exemplo 7 dias e precisamos achar a outra forma de identificar, como uma versão do evento ou então um updated_at do produto, o certo vai ser analisar e achar a melhor solução, salvamos em um redis e verificamos antes de enviar a mensagem pra fila principal de novo, essa verificação da "versão" deve ser feita em ambos os consumers, o consumer da fila de erro e o consumer da fila principal. Assim que for achando formas de resolver isso vou adicionando aqui.