elastic / elasticsearch

Free and Open Source, Distributed, RESTful Search Engine
https://www.elastic.co/products/elasticsearch
Other
1.35k stars 24.87k forks source link

Conditional Processor Execution #56800

Open cataclysdom opened 4 years ago

cataclysdom commented 4 years ago

A processor dedicated to conditional execution is essential to managing large pipelines. Ideally this would take place in a processor exclusively for conditional routing.

For many users migrating to Ingest nodes from Logstash, the limitation of not being able to efficiently nest conditional logic is unnerving. For instance, most Logstash users generally get comfortable doing something to this:

if "bar" in [foo] {
  grok { .. }
  geoip { .. }
  date { .. }
}
else {
  mutate { .. }
  date { .. }
}

However, Ingest nodes don't have any such processor that directly supports this logic flow. The closest processor that might support this would be the pipeline processor, which logically routes data to a new pipeline when certain conditions exist. An example that is pulled from Conditional Execution in Pipelines » Conditionals with the Pipeline Processor would look like this:

{
  "description": "A pipeline of pipelines for log files",
  "version": 1,
  "processors": [
    {
      "pipeline": {
        "if": "ctx.service?.name == 'apache_httpd'",
        "name": "httpd_pipeline"
      }
    },
    {
      "pipeline": {
        "if": "ctx.service?.name == 'syslog'",
        "name": "syslog_pipeline"
      }
    },
    {
      "fail": {
        "if": "ctx.service?.name != 'apache_httpd' && ctx.service?.name != 'syslog'",
        "message": "This pipeline requires service.name to be either `syslog` or `apache_httpd`"
      }
    }
  ]
}

While this might satisfy some surface-level requirements, it doesn't have a strict else condition and promotes excessive pipeline definitions (i.e. dependency hell).

An additional on_success function may work well with the current on_failure function under a conditional processor. on_failure accepts an array, which allows for the nested logic proposed. Here's a rudimentary example:

{
  "processors" : [
    {
      "condition" : {
        "field" : "foo",
        "equals" : "bar",
        "has" : "bar",
        "exists" : true,
        "on_success" : [
          {
            "grok" : { .. }
          },
          {
            "geoip" : { .. }
          },
          {
            "date" : { .. }
          }
        ],
        "on_failure" : [
          {
            "set" : { .. }
          },
          {
            "date" : { .. }
          }
        ]
      }
    }
  ]
}

In that example, equals, has, and exists allows you a few different options of equality checking to conditionally route your records.

elasticmachine commented 4 years ago

Pinging @elastic/es-core-features (:Core/Features/Ingest)

shawnz commented 4 years ago

It would be very useful to have an "on_success" in other processors for cleaning up fields already processed, for example:

    {
      "json": {
        "field": "message",
        "target_field": "json",
        "if": "ctx.message != null",
        "ignore_failure": true,
        "on_success": [
          {
            "remove": {
              "field": "message"
            }
          }
        ]
      }
    }
shawnz commented 4 years ago

Another possible solution might be extending the "pipeline" processor to support some kind of "anonymous pipelines", for example:

{
  "processors" : [
    {
      "pipeline" : {
        "processors" : [
          {
            "fail": {
              "if": "ctx.foo != ctx.bar",
              "message": "It's not a bar-type foo"
            }
          },
          {
            "grok" : { .. }
          },
          {
            "geoip" : { .. }
          },
          {
            "date" : { .. }
          }
        ],
        "on_failure" : [
          {
            "set" : { .. }
          },
          {
            "date" : { .. }
          }
        ]
      }
    }
  ]
}