elastic / elasticsearch

Free and Open, Distributed, RESTful Search Engine
https://www.elastic.co/products/elasticsearch
Other
68.75k stars 24.43k forks source link

ingest _simulate endpoint for reroute processor does not handle data_stream.type correctly #97289

Open bvader opened 1 year ago

bvader commented 1 year ago

Elasticsearch Version

8.8.2

Installed Plugins

none

Java Version

bundled

OS Version

mac / all

Problem Description

Ingest pipeline _simulate does not properly handle reroute processor. When test / simulate is run in the Kibana Ingest Pipeline Builder and Tester the results are erroneous and causes confusion. Note the correct routing does in fact work when actually indexing into a data stream so the is only the _simulate endpoint

Steps to Reproduce

A simple test the data_stream.type seems to be ignored and reroute seems to be pulling the first token off the index name

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [
      {
        "set": {
          "field": "data_stream.dataset",
          "value": "elasticsearch.server",
          "ignore_failure": false
        }
      },
      {
        "reroute": {
          "ignore_failure": false
        }
      }
    ]
  }, 
  "docs": [
    {
      "_id": "-NYBC4kBqzI6pGDGasKv",
      "_index": ".ds-logs-docker.container_logs-sccoe-2023.06.29-000013",
      "_source": {
        "message": "{\"@timestamp\":\"2023-06-30T06:33:03.795Z\", \"log.level\": \"INFO\", \"message\":\"[.ds-logs-endpoint.events.process-sccoe-2023.06.29-000022/LCINFinVQ2OIt329g2Q-Wg] deleting index\", \"ecs.version\": \"1.2.0\",\"service.name\":\"ES_ECS\",\"event.dataset\":\"elasticsearch.server\",\"process.thread.name\":\"elasticsearch[elastic][masterService#updateTask][T#107]\",\"log.logger\":\"org.elasticsearch.cluster.metadata.MetadataDeleteIndexService\",\"trace.id\":\"a5beabd55bce7f0361141b569a7721a7\",\"elasticsearch.cluster.uuid\":\"uXtSsZ7KShykEYcdwZGqnQ\",\"elasticsearch.node.id\":\"3QW8_ZQRRAOb8tDuwzGX4w\",\"elasticsearch.node.name\":\"elastic\",\"elasticsearch.cluster.name\":\"SCCoE\"}\n",
        "input": {
          "type": "filestream"
        },
        "@timestamp": "2023-06-30T06:33:03.798Z",
        "ecs": {
          "version": "8.0.0"
        },
        "stream": "stdout",
        "data_stream": {
          "namespace": "sccoe",
          "type": "logs",
          "dataset": "docker.container_logs"
        }
      }
    }
  ]
}

# Result
{
  "docs": [
    {
      "doc": {
        "_index": ".ds-elasticsearch.server-sccoe",
        "_id": "-NYBC4kBqzI6pGDGasKv",
        "_version": "-3",
        "_source": {
          "input": {
            "type": "filestream"
          },
          "@timestamp": "2023-06-30T06:33:03.798Z",
          "ecs": {
            "version": "8.0.0"
          },
          "message": """{"@timestamp":"2023-06-30T06:33:03.795Z", "log.level": "INFO", "message":"[.ds-logs-endpoint.events.process-sccoe-2023.06.29-000022/LCINFinVQ2OIt329g2Q-Wg] deleting index", "ecs.version": "1.2.0","service.name":"ES_ECS","event.dataset":"elasticsearch.server","process.thread.name":"elasticsearch[elastic][masterService#updateTask][T#107]","log.logger":"org.elasticsearch.cluster.metadata.MetadataDeleteIndexService","trace.id":"a5beabd55bce7f0361141b569a7721a7","elasticsearch.cluster.uuid":"uXtSsZ7KShykEYcdwZGqnQ","elasticsearch.node.id":"3QW8_ZQRRAOb8tDuwzGX4w","elasticsearch.node.name":"elastic","elasticsearch.cluster.name":"SCCoE"}
""",
          "stream": "stdout",
          "data_stream": {
            "namespace": "sccoe",
            "type": ".ds",  <!--- NOTE 
            "dataset": "elasticsearch.server"
          }
        },
        "_ingest": {
          "timestamp": "2023-06-30T16:57:10.548215516Z"
        }
      }
    }
  ]
}

data_stream.type should get logs

Now I even set the data_stream.type and manually ... still igmored
POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [
      {
        "set": {
          "field": "data_stream.dataset",
          "value": "elasticsearch.server",
          "ignore_failure": false
        }
      },
      {
        "set": {
          "field": "data_stream.type",
          "value": "logs", 
          "ignore_failure": false
        }
      },
      {
        "reroute": {
          "ignore_failure": false
        }
      }
      ]
  }, 
  "docs": [
    {
      "_id": "-NYBC4kBqzI6pGDGasKv",
      "_index": ".ds-logs-docker.container_logs-sccoe-2023.06.29-000013",
      "_source": {
        "message": "{\"@timestamp\":\"2023-06-30T06:33:03.795Z\", \"log.level\": \"INFO\", \"message\":\"[.ds-logs-endpoint.events.process-sccoe-2023.06.29-000022/LCINFinVQ2OIt329g2Q-Wg] deleting index\", \"ecs.version\": \"1.2.0\",\"service.name\":\"ES_ECS\",\"event.dataset\":\"elasticsearch.server\",\"process.thread.name\":\"elasticsearch[elastic][masterService#updateTask][T#107]\",\"log.logger\":\"org.elasticsearch.cluster.metadata.MetadataDeleteIndexService\",\"trace.id\":\"a5beabd55bce7f0361141b569a7721a7\",\"elasticsearch.cluster.uuid\":\"uXtSsZ7KShykEYcdwZGqnQ\",\"elasticsearch.node.id\":\"3QW8_ZQRRAOb8tDuwzGX4w\",\"elasticsearch.node.name\":\"elastic\",\"elasticsearch.cluster.name\":\"SCCoE\"}\n",
        "input": {
          "type": "filestream"
        },
        "@timestamp": "2023-06-30T06:33:03.798Z",
        "ecs": {
          "version": "8.0.0"
        },
        "stream": "stdout",
        "data_stream": {
          "namespace": "sccoe",
          "type": "logs",
          "dataset": "docker.container_logs"
        }
      }
    }
    ]
}

# Results still ignored
{
  "docs": [
    {
      "doc": {
        "_index": ".ds-elasticsearch.server-sccoe",
        "_id": "-NYBC4kBqzI6pGDGasKv",
        "_version": "-3",
        "_source": {
          "input": {
            "type": "filestream"
          },
          "@timestamp": "2023-06-30T06:33:03.798Z",
          "ecs": {
            "version": "8.0.0"
          },
          "message": """{"@timestamp":"2023-06-30T06:33:03.795Z", "log.level": "INFO", "message":"[.ds-logs-endpoint.events.process-sccoe-2023.06.29-000022/LCINFinVQ2OIt329g2Q-Wg] deleting index", "ecs.version": "1.2.0","service.name":"ES_ECS","event.dataset":"elasticsearch.server","process.thread.name":"elasticsearch[elastic][masterService#updateTask][T#107]","log.logger":"org.elasticsearch.cluster.metadata.MetadataDeleteIndexService","trace.id":"a5beabd55bce7f0361141b569a7721a7","elasticsearch.cluster.uuid":"uXtSsZ7KShykEYcdwZGqnQ","elasticsearch.node.id":"3QW8_ZQRRAOb8tDuwzGX4w","elasticsearch.node.name":"elastic","elasticsearch.cluster.name":"SCCoE"}
""",
          "stream": "stdout",
          "data_stream": {
            "namespace": "sccoe",
            "type": ".ds", <!--- NOTE
            "dataset": "elasticsearch.server"
          }
        },
        "_ingest": {
          "timestamp": "2023-06-30T16:59:27.824588637Z"
        }
      }
    }
  ]
}

Note this does actually work when reroute is used to actually index a document

DELETE _data_stream/logs-elasticsearch.server-sccoe

PUT _ingest/pipeline/reroute-test
{
  "processors": [
    {
        "set": {
          "field": "data_stream.dataset",
          "value": "elasticsearch.server",
          "ignore_failure": false
        }
      },
    {
      "reroute": {
        "ignore_failure": false
      }
    }
  ]
}

# Test normal direct message 
POST logs-elasticsearch.server-sccoe/_doc
{
  "message": "message 1 - Direct ",
  "input": {
    "type": "filestream"
  },
  "@timestamp": "2023-06-30T06:33:03.798Z",
  "ecs": {
    "version": "8.0.0"
  },
  "stream": "stdout",
  "data_stream": {
    "namespace": "sccoe",
    "type": "logs",
    "dataset": "elasticsearch.server"
  }
}

# Post rerouted doc
POST logs-docker.container_logs-sccoe/_doc?pipeline=reroute-test
{
  "message": "message re-route1",
  "input": {
    "type": "filestream"
  },
  "@timestamp": "2023-06-30T06:33:03.798Z",
  "ecs": {
    "version": "8.0.0"
  },
  "stream": "stdout",
  "data_stream": {
    "namespace": "sccoe",
    "type": "logs",
    "dataset": "docker.container_logs"
  }
}

# Note this result looks good
{
  "_index": ".ds-logs-elasticsearch.server-sccoe-2023.06.30-000001",
  "_id": "86haDYkBI8KwcWx0ojtu",
  "_version": 1,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "_seq_no": 1,
  "_primary_term": 1
}

# Then Run Search and They Are There!!
GET logs-elasticsearch.server-sccoe/_search

# Result both are there

{
  "took": 0,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 2,
      "relation": "eq"
    },
    "max_score": 1,
    "hits": [
      {
        "_index": ".ds-logs-elasticsearch.server-sccoe-2023.06.30-000001",
        "_id": "8qhaDYkBI8KwcWx0lTvG",
        "_score": 1,
        "_source": {
          "message": "message 1 - Direct ",
          "input": {
            "type": "filestream"
          },
          "@timestamp": "2023-06-30T06:33:03.798Z",
          "ecs": {
            "version": "8.0.0"
          },
          "stream": "stdout",
          "data_stream": {
            "namespace": "sccoe",
            "type": "logs",
            "dataset": "elasticsearch.server"
          }
        }
      },
      {
        "_index": ".ds-logs-elasticsearch.server-sccoe-2023.06.30-000001",
        "_id": "86haDYkBI8KwcWx0ojtu",
        "_score": 1,
        "_source": {
          "input": {
            "type": "filestream"
          },
          "@timestamp": "2023-06-30T06:33:03.798Z",
          "ecs": {
            "version": "8.0.0"
          },
          "message": "message re-route1",
          "stream": "stdout",
          "data_stream": {
            "namespace": "sccoe",
            "type": "logs",
            "dataset": "elasticsearch.server"
          }
        }
      }
    ]
  }
}

Logs (if relevant)

No response

elasticsearchmachine commented 1 year ago

Pinging @elastic/es-data-management (Team:Data Management)

mbudge commented 1 month ago

Hit this issue yesterday.

There’s no option to set data_stream.type in the Kibana ingest reroute processor ui.

I was trying to reroute the elastic cloud audit logs to different data streams with different ILM policies, but couldn’t get the documents to reroute. The default audit log data stream doesn’t have a data_stream.type.

This means type is optional.