eventuate-foundation / eventuate-messaging-kafka

Other
15 stars 11 forks source link

Support external offset storage (Elasticsearch, JDBC, etc) #34

Open dialtahi opened 4 years ago

dialtahi commented 4 years ago

For the read side we could be using Elasticsearch to index the data for searching. In case of lost of data in Elasticsearch because some restore required it's difficult to sync the command side and the read side. One strategy is to store the offsets in the same datastore of the production data so we could use the hot spots of the Kafka Consumer to store the offsets not only in Kafka/Zookeeper but in the other datastore.

For the regular subscription of the KafkaConsumer, KafkaConsumer.subscribe, we can attach a Listener to get notifications on partition assignment/revocation for the subscribed topics and set and get the position to the last offset/topic/partition stored in the datastore.

https://javadoc.io/doc/org.apache.kafka/kafka-clients/latest/org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe-java.util.Collection-org.apache.kafka.clients.consumer.ConsumerRebalanceListener-

https://javadoc.io/doc/org.apache.kafka/kafka-clients/latest/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html

The usage of the Eventuate Consumer should provide the possibility to specify an implementation or configuration to choose the ExtenalOffsetStorage due to the consumer of the messages and the offset storage should put the data in the same datastore.

@Bean("customMessageConsumer")
MessageConsumer customMessageConsumer(MessageConsumerBuilder builder, RestHighLevelClient client, ObjectMapper objectMapper) {
    builder.build(new ElasticsearchOffsetStorage(client, objectMapper))
}

The subscription would be directly to the custom message consumer

@Autowired
@Qualifier("customMessageConsumer")
private final MessageConsumer customMessagesConsumer
cer commented 4 years ago

Another possibility is to still have a global MessageConsumer @Bean (actually its MessageConsumerKafkaImpl) but define a ElasticsearchOffsetStorage @Bean that gets injected into it and automatically overrides the default behavior

cer commented 4 years ago

Here are some design diagrams.

message-consumer-design subscribing

cer commented 4 years ago

Here are some thoughts:

EventuateKafkaConsumer is responsible for creating the KafkaConsumer and calling subscribe(). I'd propose that it do that using:

interface KafkaConsumerConfigurer {
   KafkaConsumer make(consumerProperties)
   subscribe(KafkaConsumer, subscriberId, topics);
}

The MessageConsumerKafkaImpl @Bean would be injected with a KafkaConsumerConfigurer @Bean and pass that down the call chain to EventuateKafkaConsumer