eventuate-tram / eventuate-tram-core

Transactional messaging for microservices
Other
1.01k stars 183 forks source link

Reactive Tram Consumer #144

Open dartartem opened 3 years ago

dartartem commented 3 years ago

Goals & Background:

The goal of reactive processing is handle data when it is ready, without waiting for readiness. In other words do not block processing (thread) for waiting IO operations.

There are 2 ways to make eventuate tram consumer reactive.

1) Replace kafka consumer by reactive version. https://projectreactor.io/docs/kafka/release/reference/

Advantages (according to docs):

Disadvantages:

Thoughts:

We already have back-pressure handling and functional style is not important for internal details of eventuate kafka consumer (eventuate-messaging-kafka project)

2) Adapt existing API for reactive processing. What do we have now:

MessageConsumerKafkaImpl uses simple java consumer for event handling:

public KafkaSubscription subscribe(String subscriberId, Set<String> channels, KafkaMessageHandler handler) {

link

public interface KafkaMessageHandler extends Consumer<KafkaMessage> {
}

link

Internally is wrapped up by EventuateKafkaConsumerMessageHandler

public interface EventuateKafkaConsumerMessageHandler
        extends BiFunction<ConsumerRecord<String, byte[]>, BiConsumer<Void, Throwable>, MessageConsumerBacklog> {
}
EventuateKafkaConsumerMessageHandler kcHandler = (record, callback) ->
        swimlaneBasedDispatcher.dispatch(new RawKafkaMessage(record.value()),
                record.partition(),
                message -> handle(message, callback, handler));

  public void handle(RawKafkaMessage message, BiConsumer<Void, Throwable> callback, KafkaMessageHandler kafkaMessageHandler) {
    try {
      if (eventuateKafkaMultiMessageConverter.isMultiMessage(message.getPayload())) {
        eventuateKafkaMultiMessageConverter
                .convertBytesToMessages(message.getPayload())
                .getMessages()
                .stream()
                .map(EventuateKafkaMultiMessage::getValue)
                .map(KafkaMessage::new)
                .forEach(kafkaMessageHandler);
      } else {
        kafkaMessageHandler.accept(new KafkaMessage(EventuateBinaryMessageEncoding.bytesToString(message.getPayload())));
      }
      callback.accept(null, null);
    } catch (Throwable e) {
      callback.accept(null, e);
      throw e;
    }
  }

handler has callbacks to notify message processor that message handled:

  public void process(ConsumerRecord<String, byte[]> record) {
    throwFailureException();
    offsetTracker.noteUnprocessed(new TopicPartition(record.topic(), record.partition()), record.offset());
    MessageConsumerBacklog consumerBacklog = handler.apply(record, (result, t) -> {
      if (t != null) {
        logger.error("Got exception: ", t);
        failed.set(new KafkaMessageProcessorFailedException(t));
      } else {
        logger.debug("Adding processed record to queue {} {}", subscriberId, record.offset());
        processedRecords.add(record);
      }
    });
    if (consumerBacklog != null)
      consumerBacklogs.add(consumerBacklog);
  }

It is necessary to commit offsets of processed events

  public Map<TopicPartition, OffsetAndMetadata> offsetsToCommit() {
    int count = 0;
    while (true) {
      ConsumerRecord<String, byte[]> record = processedRecords.poll();
      if (record == null)
        break;
      count++;
      offsetTracker.noteProcessed(new TopicPartition(record.topic(), record.partition()), record.offset());
    }
    logger.trace("removed {} {} processed records from queue", subscriberId, count);
    return offsetTracker.offsetsToCommit();
  }

by consumer

  private void maybeCommitOffsets(KafkaMessageConsumer consumer, KafkaMessageProcessor processor) {
    Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = processor.offsetsToCommit();
    if (!offsetsToCommit.isEmpty()) {
      consumerCallbacks.ifPresent(ConsumerCallbacks::onTryCommitCallback);
      logger.debug("Committing offsets {} {}", subscriberId, offsetsToCommit);
      consumer.commitOffsets(offsetsToCommit);
      logger.debug("Committed offsets {}", subscriberId);
      processor.noteOffsetsCommitted(offsetsToCommit);
      consumerCallbacks.ifPresent(ConsumerCallbacks::onCommitedCallback);
    }
  }

It is necessary because consumer uses polling loop that does not wait finishing of record handling.

So, to integrate reactive tram consumer is necessary only to modify KafkaMessageHandler by something that can be processed in background with notification that processing is finished (to commit offsets).

It is relatively simple and ensures reactive processing.

cer commented 3 years ago

Here is a suggestion:

A Java CompletableFuture is something that contains a value or a Throwable.

Therefore:

dartartem commented 3 years ago

@cer yes, I planned to propose something similar, I just accidentally created partially filed issue. I updated it.

cer commented 3 years ago

Also, I think it's important to define the goal here:

public interface ReactiveMessageConsumer {
  MessageSubscription subscribe(String subscriberId, Set<String> channels, ReactiveMessageHandler handler);
  String getId();
  void close();
}

public interface ReactiveMessageHandler extends Function<Message, Mono<Void> {
}

Reactive event handling:

  public DomainEventDispatcher(String eventDispatcherId, ReactiveDomainEventHandlers domainEventHandlers, MessageConsumer messageConsumer, DomainEventNameMapping domainEventNameMapping) {
 public ReactiveDomainEventHandlers domainEventHandlers() {
    return ReactiveDomainEventHandlersBuilder
            .forAggregateType("io.eventuate.examples.tram.ordersandcustomers.customers.domain.Customer")
            .onEvent(CustomerCreditReservedEvent.class, this::handleCustomerCreditReservedEvent)
            ...
            .build();
  }

  private Mono<Void> handleCustomerCreditReservedEvent(DomainEventEnvelope<CustomerCreditReservedEvent> domainEventEnvelope) {
    ...
  }