opensearch-project / data-prepper

Data Prepper is a component of the OpenSearch project that accepts, filters, transforms, enriches, and routes data at scale.
https://opensearch.org/docs/latest/clients/data-prepper/index/
Apache License 2.0
238 stars 176 forks source link

Enhanced Kafka source logging through the use of MDC and thread naming #4663

Closed dlvenable closed 6 days ago

dlvenable commented 1 week ago

Description

This PR continues the work done in #4131 by adding similar support for the Kafka source.

This adds logging MDC to the kafka source to disambiguate it against the kafka buffer Entry points into the Sink interface set the MDC value. Also, the threads which the kafka source directly creates will have MDC and also have a useful thread name.

This MDC is now available for both Data Prepper loggers and Apache Kafka loggers.

Results

First, I updated the logging pattern to include the MDC key:

appender.console.layout.pattern = %d{ISO8601} [%t] [%X{kafkaPluginType}] %-5p %40C - %m%n

Then I ran. The logs look like:

2024-06-25T17:44:34,212 [logs-kafka-source-1] [source] INFO  org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-data-prepper-1, groupId=data-prepper] Subscribed to topic(s): logs
2024-06-25T17:44:34,215 [logs-pipeline-sink-worker-2-thread-1] [source] INFO  org.apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 7.6.0-ccs
2024-06-25T17:44:34,215 [logs-pipeline-sink-worker-2-thread-1] [source] INFO  org.apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka commitId: 1991cb733c81d679
2024-06-25T17:44:34,215 [logs-pipeline-sink-worker-2-thread-1] [source] INFO  org.apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka startTimeMs: 1719355474215
2024-06-25T17:44:34,215 [logs-kafka-source-2] [source] INFO  org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-data-prepper-2, groupId=data-prepper] Subscribed to topic(s): logs
2024-06-25T17:44:34,216 [logs-pipeline-sink-worker-2-thread-1] [source] INFO  org.opensearch.dataprepper.plugins.kafka.source.KafkaSource - Started Kafka source for topic logs
2024-06-25T17:44:34,216 [logs-pipeline-sink-worker-2-thread-1] [] INFO  org.opensearch.dataprepper.pipeline.Pipeline - Pipeline [logs-pipeline] - Submitting request to initiate the pipeline processing
2024-06-25T17:44:34,349 [logs-kafka-source-2] [source] INFO         org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-data-prepper-2, groupId=data-prepper] Cluster ID: _yZHgyDeT6ynMobR262CaA
2024-06-25T17:44:34,349 [logs-kafka-source-1] [source] INFO         org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-data-prepper-1, groupId=data-prepper] Cluster ID: _yZHgyDeT6ynMobR262CaA
2024-06-25T17:44:34,349 [logs-kafka-source-1] [source] INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler - [Consumer clientId=consumer-data-prepper-1, groupId=data-prepper] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2024-06-25T17:44:34,349 [logs-kafka-source-2] [source] INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler - [Consumer clientId=consumer-data-prepper-2, groupId=data-prepper] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2024-06-25T17:44:34,350 [logs-kafka-source-1] [source] INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-data-prepper-1, groupId=data-prepper] (Re-)joining group
2024-06-25T17:44:34,350 [logs-kafka-source-2] [source] INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-data-prepper-2, groupId=data-prepper] (Re-)joining group
2024-06-25T17:44:34,362 [logs-kafka-source-2] [source] INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-data-prepper-2, groupId=data-prepper] Request joining group due to: need to re-join with the given member-id: consumer-data-prepper-2-ae860ae1-e687-4e2c-9bad-c5b56ab911a2

Issues Resolved

Resolves #4126

Check List

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license. For more information on following Developer Certificate of Origin and signing off your commits, please check here.