opensearch-project / data-prepper

Data Prepper is a component of the OpenSearch project that accepts, filters, transforms, enriches, and routes data at scale.
https://opensearch.org/docs/latest/clients/data-prepper/index/
Apache License 2.0
258 stars 188 forks source link

Use Kafka as a buffer #3322

Closed JonahCalvo closed 10 months ago

JonahCalvo commented 1 year ago

Use-case

Currently, the only buffer available with Data Prepper is the bounding_blocking buffer, which stores events in memory. This can lead to data loss if a pipeline crashes, or the buffer overflows. A disk based buffer is required to prevent this data loss.

This proposal is to implement a Kafka buffer. Kafka offers robust buffering capabilities by persistently storing data on disk across multiple nodes, ensuring high availability and fault tolerance.

Basic Configuration

The buffer will:

Detailed Process

Encryption

The Kafka buffer will offer optional encryption via KMS:

Metrics

The Kafka buffer will incorporate the standard buffer metrics, as well as the metrics reported by Kafka Source/Sink:

dlvenable commented 1 year ago

@JonahCalvo , This is an exciting feature. Thanks for putting this proposal together.

What is the workers property? The Data Prepper pipeline worker reads from the Buffer. And it can have multiple threads which are defined in the pipeline. So it seems that each request to the Buffer should be happening on one of those threads, not in another "worker" thread.

dlvenable commented 11 months ago

@JonahCalvo, We can also simplify the buffer name. Use kafka instead of kafka_buffer.

dlvenable commented 10 months ago

This feature is completed by quite a few PRs.

canob commented 9 months ago

Hi everyone,

Can somebody give us a working sample configuration of Kafka Buffer to test it? Because I tried with the example in the first post, but is not working, :-(

From my tests and reading the logs:

Thanks for your help!

canob commented 9 months ago

Hi everyone,

Can somebody give us a working sample configuration of Kafka Buffer to test it? Because I tried with the example in the first post, but is not working, :-(

From my tests and reading the logs:

  • The correct name of the buffer plugin is "kafka", not "kafka_buffer".
  • The "acknowledgments" flag not exist.
  • the "topic" flag not exist, and the correct flag is "topics", and supposedly you need to define it as an array, but is not working with "topics": ["topicname"]

Thanks for your help!

Answering myself, the correct way to config the Kafka Buffer is this one:

  buffer:
    kafka:
      bootstrap_servers:
        - redpanda-0:9092
      topics:
        - name: dns-ip-pipeline-buffer
          group_id: data-prepper-group

Regards!