ConduitIO / conduit

Conduit streams data between data stores. Kafka Connect replacement. No JVM required.
https://conduit.io
Apache License 2.0
363 stars 42 forks source link

Error processor #1598

Closed lovromazgon closed 1 month ago

lovromazgon commented 1 month ago

Description

Implements an error processor, that acts similar to the filter processor, only that it negatively acknowledges all records passed to it. This allows the user to stop the pipeline (or send records to the DLQ) if a record matches a certain condition.

I realized we need this when we updated the webhook processor to not fail on HTTP statuses less than 500. If a user wants to still configure the pipeline to fail if the status code is for example 404, they can configure an error processor now, e.g.:

version: 2.2
pipelines:
  - id: example
    status: running
    connectors:
      ...
    processors:
      - id: request
        plugin: webhook.http
        settings:
          response.status: '.Metadata["http_status"]'
      - id: fail-on-404
        plugin: error
        condition: `{{ eq .Metadata.http_status "404" }}`

[!IMPORTANT] ~Depends on https://github.com/ConduitIO/conduit-processor-sdk/pull/46, validate action will fail until we merge that PR and update the processor-sdk dependency.~ Updated.

Quick checks:

lovromazgon commented 1 month ago

@raulb a blog post would be nice, feel free to take the initiative on that one, I'm kinda swamped right now.