opensearch-project / data-prepper

OpenSearch Data Prepper is a component of the OpenSearch project that accepts, filters, transforms, enriches, and routes data at scale.
https://opensearch.org/docs/latest/clients/data-prepper/index/
Apache License 2.0
263 stars 198 forks source link

Pipeline DLQ #3857

Open kkondaka opened 10 months ago

kkondaka commented 10 months ago

Is your feature request related to a problem? Please describe. Provide a way to send all failed events to a global/pipeline-level DLQ. Failed events any where in the pipeline (sources, processors, and sinks) are sent directly to this DLQ. This will eventually replace sink level DLQs we have today.

Describe the solution you'd like Preferred solution (based on @dlvenable's initial thoughts and a discussion meeting)

  1. Option to define failure pipeline in the YAML file like
    my-failure-pipeline:
    type: failure
    sink:
    - s3:
        bucket: "..."
        codec:
          ndjson:

And each sub-pipeline in the yaml may have an entry pointing to this as follows

sample-pipeline:
  failure-pipeline: my-failure-pipeline
  source:
     ...
  processor:
     ...
  sink:
     ...

In addition, there may be an option to have a default pipeline which is used if no failure pipeline is mentioned in a sub-pipeline

default-failure-pipeline:
  type: failure
  sink:
    - s3:
        bucket: "..."
        codec:
          ndjson:

And finally an implicit failure pipeline which is created without any entries in the YAML file. The implicit failure pipeline will send all failed events to stdout

This requires changes to code in many places and so it is better to introduce a new API (For example, executeWithFailures() API in processors which will return both output records and failed records). Data Prepper core code can then take the failed records and send them to appropriate failure pipeline (configured failure pipeline or default failure pipeline or implicit failure pipeline). Similarly new API at source and sink level maybe added. Once the API is added, code may be modified slowly so that all sources/sinks/processors use this new API.

Having a separate pipeline for failure, allows the same pipeline to be used by multiple pipelines. And also makes it possible to write sub-pipelines under it and do conditional routing etc.

Describe alternatives you've considered (Optional) Instead of new API in processors/sources/sinks, we could have a global singleton for DLQEvents managed by DLQEventsManager which each source uses in its constructor and failed events are handed over to this DLQEventsManager which will route the events to failure pipeline. I think this approach is also ok.

Additional context Add any other context or screenshots about the feature request here.

jw-amazon commented 10 months ago

If I understand correctly, right now, only sink failure are sent to DLQ, that might explains a bug I am seeing in my pipeline.

dlvenable commented 10 months ago

If I understand correctly, right now, only sink failure are sent to DLQ, that might explains a bug I am seeing in my pipeline.

@jw-amazon , Yes, only sinks failures are sent to the DLQ currently.

chenqi0805 commented 9 months ago

My proposal of approach to this is different: We can define a dlq as extension such that any pipeline plugin can integrate with: data-prepper-config.yaml

extensions:
    dlq:
       store:
            s3:
                bucket: test-bucket
                key_path_prefix: dlq/

In pipeline definition, user no longer need to explicitly specify dlq details within any plugin (except for enabling or disabling dlq). The plugin integration with DLQ extension will be taken care of in the code logic.

Alternatively, we can move DLQ out of the extensions and make it a standalone global config in data-prepper-config.yaml:

dlq:
     store:
          s3:
              bucket: test-bucket
              key_path_prefix: dlq/
dlvenable commented 9 months ago

@chenqi0805 , The proposal in this issue creates a specific pipeline for handling failed events. This can allow for processing data before putting in the DLQ.

I think there could be some overlap between your idea of extensions and the pipeline DLQ. For one, this proposal has a default failure pipeline.

Perhaps that can be configurable via an extension.

extensions:
  dlq:
    default-failure-pipeline:
      type: failure
      sink:
      - s3:
          bucket: "..."
          codec:
            ndjson:
dlvenable commented 1 week ago

In terms of implementation, I propose that we create a new interface in data-prepper-api: FailurePipeline.

public interface FailurePipeline {
  void sendFailedEvents(Collection<Record<Event>> events);
}

We may also want some way to include additional information on the failures.

Then, sinks or processors call write failed events. For example:

try {
  doSomething(events);
} catch(Exception e) {
  failurePipeline.sendFailedEvents(events);
}

Within data-prepper-core, this interface can have an implementation which acts as the source for the failure pipeline.

class FailurePipelineSource implements Source<Record<Event>>, FailurePipeline {

  private Buffer buffer;

  @Override
  public void start(Buffer buffer) {
    this.buffer = buffer;
  }

  @Override
  void sendFailedEvents(Collection<Record<Event>> events) {
    buffer.writeAll(events);
  }
}

When creating the pipeline in data-prepper-core, we can add an instance of FailurePipelineSource into the Pipeline class. I think that pipeline authors should be freed from having to think about writing the source: configuration.