opensearch-project / data-prepper

OpenSearch Data Prepper is a component of the OpenSearch project that accepts, filters, transforms, enriches, and routes data at scale.
https://opensearch.org/docs/latest/clients/data-prepper/index/
Apache License 2.0
264 stars 203 forks source link

Support consumer offset metadata from Kafka source records #5164

Open jcrean opened 3 weeks ago

jcrean commented 3 weeks ago

Currently, the consumer code in KafkaCustomConsumer is grabbing the topic/partition/timestamp info from the source ConsumerRecord and adding them as attributes in the event metadata. It would be helpful to also have access to the offset field.

This would allow us to add calls to getMetadata("kafka_offset") in our pipelines, as we use this for internal tracking/auditing. Looking at the code, it seems like it would be relatively easy to add this.

dlvenable commented 2 weeks ago

@jcrean , Are you looking to get the offset within the partition for each event?

Would you be interested in contributing this change? You can see where we do this for other metadata.

https://github.com/opensearch-project/data-prepper/blob/47cead1a5a111ddbd959f80984f03e49034621e6/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java#L458-L463

jcrean commented 4 days ago

@dlvenable Yes exactly, each ConsumerRecord contains the offset within the partition, which is data that we often use internally for tracking/investigations. I'd be happy to take a stab at a PR. Seems like a fairly straightforward change to make.