eventuate-tram / eventuate-tram-core

Transactional messaging for microservices
Other
1.05k stars 186 forks source link

The event suddenly died and could not be handled #203

Open ThienPhuc92 opened 8 months ago

ThienPhuc92 commented 8 months ago

Problem

I register event consumers according to the following structure My version core and message is 0.32.0.RELEASE

public DomainEventHandlers domainEventHandlers() {
        log.debug("Building Domain Event Handler for Engine Event Consumer");
        String engineEventKafkaTopic = globalSetting.get(AppSettingKey.EVENTUATE.CMD_TOPIC);
        return DomainEventHandlersBuilder
            .forAggregateType(engineEventKafkaTopic)
            .onEvent(CmdUpdatedStatusEvent.class, this::handleCmdUpdatedStatusEvent)
           .build();
}

private void handleCmdUpdatedStatusEvent(DomainEventEnvelope<CmdUpdatedStatusEvent> event) {
        log.info("Received CmdUpdatedStatusEvent: {}", event.getEvent());
        try {
            cmdEventHandler.handleEvent(event.getEvent());
        } catch (Exception exception) {
            log.debug("Error when handle cmd updated status event!", exception);
            saveErrorMessage(event, exception.getMessage());
        }
    }

private <T extends DomainEvent> void saveErrorMessage(DomainEventEnvelope<T> event, String messageLog) {
        try {
            Message message = event.getMessage();

            CreateReceivedMessagesErrorCommand createReceivedMessagesErrorCommand = new CreateReceivedMessagesErrorCommand();
            createReceivedMessagesErrorCommand.setMessageId(message.getId());
            createReceivedMessagesErrorCommand.setEventType(event.getEvent().getClass().getSimpleName());
            createReceivedMessagesErrorCommand.setHeader(objectMapper.writeValueAsString(message.getHeaders()));
            createReceivedMessagesErrorCommand.setPayload(message.getPayload());
            createReceivedMessagesErrorCommand.setLog(messageLog);
            createReceivedMessagesErrorCommand.setStatus(ReceivedMessagesErrorStatus.PENDING);
            createReceivedMessagesErrorCommand.setCreationTime(Instant.now());

            saveReceivedMessagesErrorUseCase.create(createReceivedMessagesErrorCommand);
        } catch (Exception e) {
            log.error("An error occurred while trying convert and send command to create received message error", e);
        }
    }

Everything was fine, and then the event suddenly died for no apparent reason When I restart the application the application continues to consume events and after a period of a few days it automatically stops consuming

Below is the processing log of eventuate

Log When eventuate is consumed, this segment usually only has 1 offset. But when the event has a problem, all the offsets are gathered here

To commit CI_DOP_V3_EVENT_CONSUMER io.eventuate.messaging.kafka.basic.consumer.OffsetTracker@59d137a1[state={CI_DOP_V3_EVENT-0=io.eventuate.messaging.kafka.basic.consumer.TopicPartitionOffsets@1f75b245[unprocessed=[52274]

image

I using mysql and I see that it always locked on before the event break down OBJECT_NAME received_messages LOCK_TYPE TABLE LOCK_MODE IX

Do you have any ideals about this issue? Thank you very much!