apache / camel-kafka-connector

Camel Kafka Connector allows you to use all Camel components as Kafka Connect connectors
https://camel.apache.org
Apache License 2.0
154 stars 102 forks source link

Azure EventHubs offset checkpoint store #1662

Open jakubmalek opened 3 months ago

jakubmalek commented 3 months ago

The camel-azure-eventhubs-source-kafka-connector currently requires configuration of the Azure Blob Storage, used for storing the checkpoints.

I think there should be an option to store checkpoints in the Kafka Connect offset topic. The source connector state is usually stored there by including the source offset into the SourceRecord with the given source partition used as key for lookup. The checkpoints can then be loaded on the task start using the OffsetStorageReader without need for external storage.

As far as I understand, the related Camel Component allows to set the a custom CheckpointStore bean instance to be used by the Azure SDK client. So it should be possible to provide the CheckpointStore implementation based on Kafka Connect API and use it instead of the BlobCheckpointStore.

I was thinking of doing so with a PR but I'm not sure how to deal with the Kamelet, which currently requires that all parameters related to the Blob Storage are set.

In general I see two possible options:

Option A. Make Azure Blob parameters in the Kamelet optional In this option, the CamelAzureeventhubssourceSourceTask can be adopted to initialize CheckpointStore based on the offsets if the Blob storage is not configured. The CamelSourceTask could be extended to provide option to customize the created Endpoint so the initialized CheckpointStore by the task on start can be set via EventHubsEndpoint e.g. eventHubsEndpoint.getConfiguration().setCheckpointStore(checkpointStore)

Option B. Add new Kamelet variant for Azure EventHubs source without parameters for Blob storage This options works similarly to Option A, just as a dedicated connector and dedicated Kamelet descriptor.

What do you think about the proposed solutions ?

jakubmalek commented 3 months ago

To get the rough understanding what I have in mind when it comes to Option A, you check this commit. It's still work-in-progress, but it shows how it could work.

oscerd commented 3 months ago

If you override the source task it will be override once we Regen the connector from the kamelets released catalog. My suggestion would be to mark the parameters as optional and not required, but this would require defining a bean with the checkpoint store. Otherwise the workflow will fail. Create another kamelet is a bit overkilling for this. Thanks

jakubmalek commented 3 months ago

The problem is that CheckpointStore requires OffsetStoreReader that can be obtained only during task startup. As I understand I can define a bean with something like this: #class:org.apache.camel.kafkaconnector.azureeventhubssource.checkpoint.EventHubOffsetCheckpointStore But I'm not sure how can I initialize it, in CamelSourceTask with SourceTaskContext during task startup.

Another problem is how to apply sourcePartition and sourceOffset to created SourceRecord which is needed to store the checkpoints in the topic.

jakubmalek commented 3 months ago

OK I think I know how this can be implemented now.

  1. The checkpoint configuration could be provided as this #class:org.apache.camel.kafkaconnector.azureeventhubssource.checkpoint.EventHubOffsetCheckpointStore string so it's initialized by Camel
  2. The EventHubOffsetCheckpointStore would implement CamelContextAware interface, so it can access CamelContext
  3. The CamelSourceTask would bind a bean providing access to SourceTaskContext and option to configure provider of the source partition and source offset from the Exchange

I still need some time to put it all together, but I think I got the rough idea how it can be setup without need for modification of the generated code.