zmoog / public-notes

Apache License 2.0
0 stars 1 forks source link

Demo how to route documents using reroute processor and routing rules #68

Closed zmoog closed 8 months ago

zmoog commented 8 months ago

I want to prepare a simple practical example to demonstrate how we can use the reroute processor and routing rules to route documents to a different data stream based on in document data.

zmoog commented 8 months ago

Requisites

Bootstrap a test cluster using elastic-package and install the Azure Logs integration

Bootstrap a test cluster

cd packages/azure

# Using the current dev version, no need to pick this one, you can use the latest release.
elastic-package build && elastic-package stack up -d -v --version 8.13.0-SNAPSHOT

Install the Azure Logs integration

You can use use placeholder for all the integration settings, we are not going to assign this policy to an agent.

CleanShot 2024-01-09 at 21 52 40@2x

zmoog commented 8 months ago

Add a reroute processor using a custom pipeline

We can give the reroute processor a try by adding a custom ingest pipeline.

CleanShot 2024-01-09 at 21 54 34@2x

For this test, we we'll use a trivial routing condition to route documents to the prod namespace if ctx.env == "prod":

CleanShot 2024-01-09 at 21 57 12@2x

zmoog commented 8 months ago

Test using the pipeline simulator

# request
POST _ingest/pipeline/logs-azure.eventhub-1.9.0/_simulate
{
  "docs": [
    {
      "_index": "logs-azure.eventhub-default",
      "_source": {
        "env": "dev"
      }
    }
  ]
}

# response
{
  "docs": [
    {
      "doc": {
        "_index": "logs-azure.eventhub-default",
        "_version": "-3",
        "_id": "_id",
        "_source": {
          "env": "dev",
          "ecs": {
            "version": "8.0.0"
          },
          "event": {
            "kind": "event"
          }
        },
        "_ingest": {
          "timestamp": "2024-01-09T21:05:03.290911341Z"
        }
      }
    }
  ]
}
# request
POST _ingest/pipeline/logs-azure.eventhub-1.9.0/_simulate
{
  "docs": [
    {
      "_index": "logs-azure.eventhub-default",
      "_source": {
        "env": "prod"
      }
    }
  ]
}

# response
{
  "docs": [
    {
      "doc": {
        "_index": "logs-azure.eventhub-prod",
        "_version": "-3",
        "_id": "_id",
        "_source": {
          "env": "prod",
          "ecs": {
            "version": "8.0.0"
          },
          "event": {
            "kind": "event"
          },
          "data_stream": {
            "namespace": "prod",
            "type": "logs",
            "dataset": "azure.eventhub"
          }
        },
        "_ingest": {
          "timestamp": "2024-01-09T21:05:09.711010386Z"
        }
      }
    }
  ]
}
zmoog commented 8 months ago

Test posting an actual document

# request
POST logs-azure.eventhub-default/_doc
{
  "@timestamp": "2024-01-09T21:09:09.000Z",
  "env": "prod"
}

# response
{
  "_index": ".ds-logs-azure.eventhub-prod-2024.01.09-000001",
  "_id": "UfsO8IwBVoLvb9xL8uzc",
  "_version": 1,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "_seq_no": 0,
  "_primary_term": 1
}

Query the target data stream to see if the document it's there:

# request
GET logs-azure.eventhub-prod/_search

# response
{
  "took": 1,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 1,
      "relation": "eq"
    },
    "max_score": 1,
    "hits": [
      {
        "_index": ".ds-logs-azure.eventhub-prod-2024.01.09-000001",
        "_id": "UfsO8IwBVoLvb9xL8uzc",
        "_score": 1,
        "_source": {
          "@timestamp": "2024-01-09T21:09:09.000Z",
          "env": "prod",
          "ecs": {
            "version": "8.0.0"
          },
          "event": {
            "agent_id_status": "missing",
            "ingested": "2024-01-09T21:09:20Z",
            "kind": "event"
          },
          "data_stream": {
            "namespace": "prod",
            "type": "logs",
            "dataset": "azure.eventhub"
          }
        }
      }
    ]
  }
}
zmoog commented 8 months ago

Add a routing_rules.yml file

Add a new routing_rules.yml file at packages/azure/data_stream/eventhub with the following content:

# Route azure logs events to the correct dataset and namespace
# based on document annotations.
- source_dataset: azure.eventhub
  rules:
    - target_dataset:
        - "{{annotations.dataset}}"
        - "{{data_stream.dataset}}"
      namespace:
        - "{{annotations.namespace}}"
        - "{{data_stream.namespace}}"
      if: "ctx.annotations != null"

Bump the integration version:

diff --git a/packages/azure/changelog.yml b/packages/azure/changelog.yml
index b7e611cb9..8ea80a91a 100644
--- a/packages/azure/changelog.yml
+++ b/packages/azure/changelog.yml
@@ -1,3 +1,8 @@
+- version: "1.9.0"
+  changes:
+    - description: Add caller_ip_address field in pipeline for Azure sign-in logs.
+      type: enhancement
+      link: https://github.com/elastic/integrations/pull/8813
 - version: "1.8.3"
   changes:
     - description: Add caller_ip_address field in pipeline for Azure sign-in logs.
diff --git a/packages/azure/data_stream/eventhub/manifest.yml b/packages/azure/data_stream/eventhub/manifest.yml
index 2dc49caa7..ac6ca1fa1 100644
--- a/packages/azure/data_stream/eventhub/manifest.yml
+++ b/packages/azure/data_stream/eventhub/manifest.yml
@@ -1,4 +1,5 @@
 type: logs
+dataset: azure.eventhub
 title: Azure Event Hub Input
 streams:
   - input: "azure-eventhub"
diff --git a/packages/azure/manifest.yml b/packages/azure/manifest.yml
index 21257e415..3ea0bfb5a 100644
--- a/packages/azure/manifest.yml
+++ b/packages/azure/manifest.yml
@@ -1,6 +1,6 @@
 name: azure
 title: Azure Logs
-version: 1.8.3
+version: 1.9.0
 description: This Elastic integration collects logs from Azure
 type: integration
 icons:

Finally, install the new integration version:

elastic-package build && elastic-package stack up -d -v --services package-registry
zmoog commented 8 months ago

Test routing rules by posting an actual document

POST logs-azure.eventhub-default/_doc
{
  "@timestamp": "2024-01-09T21:09:09.000Z",
  "annotations": {
    "namespace": "whatever"
  }
}

Search for the document:

GET logs-azure.eventhub-whatever/_search

{
  "took": 1,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 1,
      "relation": "eq"
    },
    "max_score": 1,
    "hits": [
      {
        "_index": ".ds-logs-azure.eventhub-whatever-2024.01.09-000001",
        "_id": "s_RJ8IwBJ1mZT2UtkZNf",
        "_score": 1,
        "_source": {
          "annotations": {
            "namespace": "whatever"
          },
          "@timestamp": "2024-01-09T21:09:09.000Z",
          "ecs": {
            "version": "8.0.0"
          },
          "event": {
            "agent_id_status": "missing",
            "ingested": "2024-01-09T22:13:22Z",
            "kind": "event"
          },
          "data_stream": {
            "namespace": "whatever",
            "type": "logs",
            "dataset": "azure.eventhub"
          }
        }
      }
    ]
  }
}
zmoog commented 8 months ago

Inspect the ingest pipeline

Let's see the ingest pipeline source:

GET _ingest/pipeline/logs-azure.eventhub-1.9.0

As you can see, Fleet added a reroute processor after the custom pipeline to allow users to execute their custom processors before the rerouting happens:

{
  "logs-azure.eventhub-1.9.0": {
    "description": "Pipeline for parsing azure activity logs.",
    "processors": [
      {
        "set": {
          "field": "ecs.version",
          "value": "8.0.0"
        }
      },
      {
        "rename": {
          "field": "azure",
          "target_field": "azure-eventhub",
          "ignore_missing": true
        }
      },
      {
        "set": {
          "field": "event.kind",
          "value": "event"
        }
      },
      {
        "pipeline": {
          "if": "ctx?.tags != null && ctx.tags.contains('parse_message')",
          "name": "logs-azure.eventhub-1.9.0-parsed-message"
        }
      },
      {
        "pipeline": {
          "name": "global@custom",
          "ignore_missing_pipeline": true
        }
      },
      {
        "pipeline": {
          "name": "logs@custom",
          "ignore_missing_pipeline": true
        }
      },
      {
        "pipeline": {
          "name": "logs-azure@custom",
          "ignore_missing_pipeline": true
        }
      },
      {
        "pipeline": {
          "name": "logs-azure.eventhub@custom",
          "ignore_missing_pipeline": true
        }
      },
      {
        "reroute": {
          "tag": "azure.eventhub",
          "dataset": [
            "{{annotations.dataset}}",
            "{{data_stream.dataset}}"
          ],
          "namespace": [
            "{{annotations.namespace}}",
            "{{data_stream.namespace}}"
          ],
          "if": "ctx.annotations != null"
        }
      }
    ],
    "on_failure": [
      {
        "set": {
          "field": "error.message",
          "value": "{{ _ingest.on_failure_message }}"
        }
      }
    ],
    "_meta": {
      "managed_by": "fleet",
      "managed": true,
      "package": {
        "name": "azure"
      }
    }
  }
}
kcreddy commented 8 months ago

@zmoog Just curious what happens in the scenario when the routing rule fails? For example, if the conditional if is never checked, and annotations is indeed null, where do the document end up?

# Route azure logs events to the correct dataset and namespace
# based on document annotations.
- source_dataset: azure.eventhub
  rules:
    - target_dataset:
        - "{{annotations.dataset}}"
        - "{{data_stream.dataset}}"
      namespace:
        - "{{annotations.namespace}}"
        - "{{data_stream.namespace}}"
      if: "ctx.annotations != null"
zmoog commented 8 months ago

The if condition is required by the routing rules spec, so it's always there.

However, one may check the wrong condition and feed the processor with a null value. In this case, the reroute processor falls back to the original dataset or namespace.

Continuing with the previous examples, here's what happens:

#
# routing_rules.yml
#

# Route azure logs events to the correct dataset and namespace
# based on document annotations.
- source_dataset: azure.eventhub
  rules:
    - target_dataset:
        - "{{annotations.dataset}}"
        - "{{data_stream.dataset}}"
      namespace:
        - "{{annotations.namespace}}"
        - "{{data_stream.namespace}}"
      if: "ctx.annotations != null"

#
# request
#

POST logs-azure.eventhub-default/_doc
{
  "@timestamp": "2024-01-09T09:41:00.000Z",
  "annotations": {
    "namespacez": "hey" // the `namespace` is misspelled as `namespacez`
  }
}

#
# response
#

{
  "_index": ".ds-logs-azure.eventhub-default-2024.01.11-000001",  // falls back to the `default` namespace
  "_id": "BPWN-IwBJ1mZT2UtjHyP",
  "_version": 1,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "_seq_no": 3,
  "_primary_term": 1
}