elastic / integrations

Elastic Integrations
https://www.elastic.co/integrations
Other
194 stars 422 forks source link

Event routing: open questions and prototyping #3185

Closed felixbarny closed 5 months ago

felixbarny commented 2 years ago

We're making progress on event routing via the data_stream_router processor:

I think now is the time to discuss remaining open questions on how integrations will actually use event routing rules and to start prototyping.

Open questions:

To answer these questions and to validate the overall approach, we should start building out a prototype. We can do that even with the data_stream_router processor not being merged yet. Just trying to define a concrete routing pipeline with a couple of example integrations should help us thinking through remaining open questions and validate the approach.

We can also start measuring the performance impact of the routing pipeline conditions and what the expected log ingestion rate is. Do do that, we should define Rally tracks that simulate a typical event routing scenario.

cc @ruflin

ruflin commented 2 years ago

Should there be one pipeline per input or a single global pipeline

I think of this differently. There is one pipeline per data stream. It is well possible that 2 inputs are configured shipping data to the same data stream and the same routing would apply.

k8s routing

I would expect it to be based on the container name in most scenarios and then some sub routing based on a grok pattern. An example here is nginx with error and access logs. It is possible that one goes to stderr and the other stdout, so this meta data would have to be in the event itself. Agree, routing conditions should be as efficient as possible but there will be cases where grok will be needed.

felixbarny commented 2 years ago

there will be cases where grok will be needed.

As long as the routing conditions for the nginx integration don't slow down the conditions for mysql, for example, that's fine I think.

It is well possible that 2 inputs are configured shipping data to the same data stream and the same routing would apply. It is possible that one goes to stderr and the other stdout, so this meta data would have to be in the event itself.

How would a pipeline for logs-router-default look like that gets data both from an nginx on k8s and a traditionally deployed nginx? Would there be two different instances of the data_stream_router with different input-specific conditions?

- data_stream_router:
  if: ctx.input?.type == 'filestream' && ctx.log?.file?.path?.contains('nginx')
  dataset: nginx
- data_stream_router:
  if: ctx.input?.type == 'container' && ctx.kubernetes?.container?.name?.contains('nginx')
  dataset: nginx
ruflin commented 2 years ago

I don't think there should be a global "router" data stream, but instead one for each purpose. So we would have logs-router.k8s-default and logs-router.docker-default etc. and each would do its magic. Otherwise the number of rules will start to impact each other and the chance of false positives increases.

The nginx example above would look a bit different assuming this is all in k8s:

- data_stream_router:
  if: ctx.input?.type == 'container' && ctx.kubernetes?.container?.name?.contains('nginx') && log.type == stdout
  dataset: nginx.access
- data_stream_router:
  if: ctx.input?.type == 'container' && ctx.kubernetes?.container?.name?.contains('nginx') && log.type == stderr
  dataset: nginx.error

I just made up log.type, the info might be in a different field but outcome should be the same.

I know better understand on what you mean with "pipeline per input". I don't think of k8s as an input as in the end it is basically reading files from disk. It is more about the meta data that is attached to it. In some scenarios it is an input, like syslog, in others it is an environment like k8s / docker where the data comes from. There should be one routing pipeline for each environment data can come from.

If we have a global generic router, it would route data based on the meta data and not content of the message itself:

- data_stream_router:
  if: ctx.input?.type == 'container' && exists(ctx.kubernets.container.name)
  dataset: router.k8s
- data_stream_router:
  if: ctx.input?.type == 'syslog'
  dataset: router.syslog
- data_stream_router:
  if: ctx.input?.type == 'logfile'
  dataset: router.filepath
felixbarny commented 2 years ago

It is more about the meta data that is attached to it.

That makes sense. It's not about the input type but about what kind of metadata is available. For example, when using a Kafka input, some events may have k8s metadata while others may need to be routed by looking at their log.file.path.

That speaks for a global routing pipeline that doesn't check input.type but checks for the existence of certain types of metadata.

Still, each integration will have to provide different routing conditions based on available metadata.

I just made up log.type, the info might be in a different field but outcome should be the same.

I'm not sure if you're able to tell from k8s logs whether something was logged to stdout or stderr. Do you know if that's possible? At least we don't capture a similar field currently.

ruflin commented 2 years ago

Still, each integration will have to provide different routing conditions based on available metadata.

Agree. And the condition can be very different for each metadata type. I can forsee that some integrations will only provide a routing rule for k8s at first but not syslog for example.

I'm not sure if you're able to tell from k8s logs whether something was logged to stdout or stderr. Do you know if that's possible? At least we don't capture a similar field currently.

If we don't have it, it means nginx will just be forwarded to logs-nginx-default and there a routing rule exists likely with grok/dissect to make the differentiation.

ravikesarwani commented 2 years ago

cc: @aspacca @kaiyan-sheng We should review the changes and see how this can work with elastic-serverless-forwarder and Firehose integrations. Both integrations currently put the onus on the customer for them to tell what data stream this data needs to be sent to and that creates big friction point in the onboarding journey.

felixbarny commented 2 years ago

Here's a diagram of how the multi-stage event routing may look like:

Screen Shot 2022-05-06 at 08 03 38
joshdover commented 2 years ago

@felixbarny Thank you for putting this diagram together.

This generally matches the mental model I was envisioning as well. Multiple levels of routing pipelines that fan out into finer levels of granularity in order to reduce impact of routing conditionals between integrations/data sources.

Still, each integration will have to provide different routing conditions based on available metadata.

Agree. And the condition can be very different for each metadata type. I can forsee that some integrations will only provide a routing rule for k8s at first but not syslog for example.

Integrations are likely going to need to provide some default routing rules, but I also think we’re like to need a way for users to customize these rules, without editing ingest pipelines directly (as the pipelines should be fully managed by the system). A few obvious examples I can think of that would require this:

Another aspect we need consider is inspectability of routing rules. While the final routing decision will be obvious from the resulting destination index, we need to provide the ability to easily test these routing rules in our managed pipelines. This would especially be helpful during in any UI we build for customizing these routing rules. As the simulate pipeline API works today, this is not possible, because the simulate API does not run the pipeline of the new _index, even if that index has a default_pipeline configured. A quick example:

PUT _ingest/pipeline/route1
{
  "processors": [
    {
      "set": {
        "field": "_index",
        "value": "route2"
      }
    }
  ]
}

PUT _ingest/pipeline/route2
{
  "processors": [
    {
      "set": {
        "field": "foo",
        "value": "routed!"
      }
    }
  ]
}

PUT route1
{
  "settings": {
    "index.default_pipeline": "route1"
  }
}

PUT route2
{
  "settings": {
    "index.default_pipeline": "route2"
  }
}

POST /_ingest/pipeline/route1/_simulate
{
  "docs": [
    {
      "_index": "route1",
      "_id": "id",
      "_source": {
        "foo": "bar"
      }
    },
    {
      "_index": "index",
      "_id": "id",
      "_source": {
        "foo": "rab"
      }
    }
  ]
}

Results only reflect the simulated pipeline and not the subsequent index’s pipeline:


{
  "docs" : [
    {
      "doc" : {
        "_index" : "route2",
        "_id" : "id",
        "_source" : {
          "foo" : "bar"
        },
        "_ingest" : {
          "timestamp" : "2022-05-16T11:50:50.181749248Z"
        }
      }
    },
    {
      "doc" : {
        "_index" : "route2",
        "_id" : "id",
        "_source" : {
          "foo" : "rab"
        },
        "_ingest" : {
          "timestamp" : "2022-05-16T11:50:50.181766322Z"
        }
      }
    }
  ]
}

We could likely emulate this behavior from the Kibana side and call the next pipeline’s simulate until the _index no longer changes. Not sure if there are any inaccuracies with this approach.

felixbarny commented 2 years ago

As the simulate pipeline API works today, this is not possible, because the simulate API does not run the pipeline of the new _index, even if that index has a default_pipeline configured.

Maybe we could introduce something like a simulate mode for the index/bulk API that returns the final document instead of actually indexing it.

aspacca commented 2 years ago

We should review the changes and see how this can work with elastic-serverless-forwarder and Firehose integrations. Both integrations currently put the onus on the customer for them to tell what data stream this data needs to be sent to and that creates big friction point in the onboarding journey.

from the point of view of elastic-serverless-forwarder a multi-stage event routing will be required, as showed by the diagram from @felixbarny

the case is similar to the kafka one

If data is coming from Kafka, the metadata available on the events is not implicit, therefore, it needs to be sent to the general logs-router-default.

I just wonder what can be the impact on performance for the general logs-router-default router where we need routing rules for every integrations. in the case of elastic-serverless-forwarder, specifically, the if conditions would probably require a lot of parsing and guessing through painless scripting. would it be better to have a logs-router.elastic-serverless-forwarder with a subset of the supported integrations for autodiscovery specific to elastic-serverless-forwarder? does the total number of data_stream_router in the pipeline impact on the overall performance?

@felixbarny ?

felixbarny commented 2 years ago

I just wonder what can be the impact on performance for the general logs-router-default router where we need routing rules for every integrations.

The idea is that the general logs-router-default router just determines what kind of metadata is available and then routes to a more specific pipeline. For example, if it detects that the logs contain k8s metadata, the documents will be sent to the logs-router.k8s-default. This pipeline will then try to find out whether a document can be parsed by an integration. For example, based on the container name.

in the case of elastic-serverless-forwarder, specifically, the if conditions would probably require a lot of parsing and guessing through painless scripting.

Could you provide an example of a raw Nginx log and how we'd find out that it's indeed an Nginx log? Is there any metadata available that we can use to quickly identify, with a relatively high confidence, that the log is indeed an Nginx log and should be parsed by the Nginx integration?

If the answer is that we need to see whether a grok expression matches on every document, without being able to exclude most of the documents with checks that are cheaper than grok, it feels like that would be prohibitively expensive.

does the total number of data_stream_router in the pipeline impact on the overall performance?

It mostly depends on how expensive the checks within the if conditions are. If it's simple string comparisons, such as equals, starts with, ends with, or contains, I'm not too worried about a lot of routers in the pipeline. But if every document has to go through an if condition that applies a grok expression, that's a different story.

felixbarny commented 2 years ago

I had a discussion with @ruflin about what the default data stream should be if no rule matches. We didn't reach a final conclusion yet but I think it's an interesting question to surface to a wider group.

Options, from coarse-grained to fine-grained:

  1. One global default index, such as logs-generic-default
  2. One default per router, such as logs-syslog-default, logs-k8s-default
  3. One data stream based on an input-specific field that segments different sources of the logs. For example logs-${kubernetes.container.name}-default for the k8s router or logs-${app_name}-default for syslog. The hypothesis is that most multiplexed log streams have a field that groups logs from the same service or source.

I think we should go with option 3. It fits the ECS definition of data_stream.dataset very well:

The field can contain anything that makes sense to signify the source of the data.

A similar quote from the data stream blog

dataset: Describes the data ingested and its structure

Data from the same container or application have a structure that is often distinct form other services. Frequently, the services are owned by different teams and have different retention requirements.

Also, segmenting the data in this way makes it much easier to add structure to the data after ingestion via runtime fields. Adding structure via runtime fields is much harder if the logs of multiple sources are mixed together in a single default index, such as logs-generic-default or logs-k8s-default.

aspacca commented 2 years ago

hi @felixbarny

please, let me give some context about the topic from the elastic-serverless-forwarder point of view, as potential consumer of the event routing pipeline.

currently we support auto-discovering AWS logs integrations: the customers does not need to provide any information (as a datastream where to ingest to, or any other specific hints about the integrations the logs belong to) and for 80% of the AWS integration we are able to send to the correct datastream in the default namespace. this is indeed supported only in the case of an S3 logs source, where we rely on the path of the s3 object key for the autodiscovery. we have also kinesis data stream, cloudwatch logs and sqs queue as ingestion sources, where the only option for discovering the integration is from inspecting the payload.

my initial thought was about relying instead on the event routing pipeline, especially if there will be a generic/AWS default one

I just wonder what can be the impact on performance for the general logs-router-default router where we need routing rules for every integrations.

The idea is that the general logs-router-default router just determines what kind of metadata is available and then routes to a more specific pipeline. For example, if it detects that the logs contain k8s metadata, the documents will be sent to the logs-router.k8s-default. This pipeline will then try to find out whether a document can be parsed by an integration. For example, based on the container name.

what when metadata are not available? as in the kafka/elastic-serverless-forwarder case?

in the case of elastic-serverless-forwarder, specifically, the if conditions would probably require a lot of parsing and guessing through painless scripting.

Could you provide an example of a raw Nginx log and how we'd find out that it's indeed an Nginx log? Is there any metadata available that we can use to quickly identify, with a relatively high confidence, that the log is indeed an Nginx log and should be parsed by the Nginx integration?

I will give an example about an AWS integration (native AWS integrations are the main support case for the elastic-serverless-forwarder): here cloudfront_logs grok expression (from: https://github.com/elastic/integrations/blob/main/packages/aws/data_stream/cloudfront_logs/elasticsearch/ingest_pipeline/default.yml#L24-L31)

  - grok:
      field: event.original
      patterns:
        - '%{TIMESTAMP:_tmp.time}\s%{EDGE_LOCATION:aws.cloudfront.edge_location}\s%{INT:http.response.bytes:long}\s%{IP:source.address}\s%{WORD:http.request.method}\s%{HOSTNAME:aws.cloudfront.domain}\s%{UNIXPATH:url.path}\s%{INT:http.response.status_code:long}\s(-|%{DATA:http.request.referrer})\s%{DATA:_tmp.user_agent}\s(-|%{DATA:url.query})\s(-|%{DATA:aws.cloudfront.cookies})\s%{WORD:aws.cloudfront.edge_result_type}\s%{DATA:http.request.id}\s%{HOSTNAME:destination.address}\s%{WORD:network.protocol}\s%{INT:http.request.bytes:long}\s%{NUMBER:_tmp.duration:float}\s(-|%{IP:network.forwarded_ip})\s(-|%{TLS:tls.version_protocol}v%{NUMBER:tls.version})\s(-|%{DATA:tls.cipher})\s%{WORD:aws.cloudfront.edge_response_result_type}\s%{DATA:_tmp.protocol}\s(-|%{WORD:aws.cloudfront.fle_status})\s(-|%{DATA:aws.cloudfront.fle_encrypted_fields})\s(-|%{POSINT:source.port:long})\s(-|%{NUMBER:aws.cloudfront.time_to_first_byte:float})\s(-|%{WORD:aws.cloudfront.edge_detailed_result_type})\s%{DATA:aws.cloudfront.content_type}\s(-|%{INT:http.response.body.bytes:long})\s(-|%{DATA:aws.cloudfront.range_start})\s(-|%{DATA:aws.cloudfront.range_end})'
      pattern_definitions:
        TIMESTAMP: '%{YEAR}-%{MONTHNUM}-%{MONTHDAY}%{SPACE}%{HOUR}:%{MINUTE}:%{SECOND}'
        TLS: '(TLS|SSL)'
        EDGE_LOCATION: '[A-Z]{3}\d+(-[A-Z]+\d+)?'

If the answer is that we need to see whether a grok expression matches on every document, without being able to exclude most of the documents with checks that are cheaper than grok, it feels like that would be prohibitively expensive.

80 % of the AWS integrations will rely on a grok expression match. the other 20% are json based and we can just check on a minimum set of not colliding fields in the json

does the total number of data_stream_router in the pipeline impact on the overall performance?

It mostly depends on how expensive the checks within the if conditions are. If it's simple string comparisons, such as equals, starts with, ends with, or contains, I'm not too worried about a lot of routers in the pipeline. But if every document has to go through an if condition that applies a grok expression, that's a different story.

in the specific case of the elastic-serverless-forwarder we have to understand what side of the process the performance impacts more. the forwarder is a lambda with limited resources and (most important) a timeout execution period that cannot exceed 15 mins. there is indeed a continuing mechanism in case an ingestion "batch" cannot be fulfilled in the 15 mins: so no events should be lost, but the longer they are "continued", the bigger the ingestion delay will be. at the same time the lambda has parallel executions, up to 1000 "instance" of the same lambda at time (the limit is per AWS account and region: users having their own lambda won't be able to reserve the limit exclusively to the elastic-serverless-forwarder)

from my stress tests the bottleneck were on the cluster side: if the cluster is not scaled to support the ingestion rate of the lambda, bulk requests will start to fail or timeout (that's worse from the lambda point of view, because it will timeout as well waiting for the bulk request response). on the contrary the lower is the compute performance of the lambda (fractions of vCPUs are assigned incrementally in relation of the total amount of RAM reserved for the lambda, from 128MB to 10GB/6vCPUs) the longer it take to process the logs source and there is the risk it will timeout before even making any ingestion. the default deployment of the lambda comes with 512MB of reserved RAM and 15 mins timeout, users could anyway deploy it in their custom way with lower values: we explicitly don't support this scenario and the lambda is optimised for the default one.

for sure once we have to identify metadata parsing the message payload in order to provide them to the event router so that it does not a grok expression, then we already have the knowledge we'd need from the event routing :)

felixbarny commented 2 years ago

we have also kinesis data stream, cloudwatch logs and sqs queue as ingestion sources, where the only option for discovering the integration is from inspecting the payload. what when metadata are not available? as in the kafka/elastic-serverless-forwarder case?

Do you have examples on how the full events would look like? Maybe a gist or a link to the full payload that would be sent to Elasticsearch? Is there really no metadata event that would signal where the log is coming from? Is it just a raw message with no context?

When events are coming from Kafka, they might still have rich metadata about the source associated with the event. For example, when sending k8s logs to Kafka first and then forwarding to Elasticsearch. In this case, the logs from Kafka would have the container name, for example. The job of the Kafka router would then be to find out which metadata is available and then to route accordingly.

80 % of the AWS integrations will rely on a grok expression match.

Not saying this is impossible but it feels like this would be a massive overhead and only the very last resort. Is there any way to pre-group the logs into different data streams even if we can't associate an integration with them on ingest time?

Then we could apply structure to these events after-the-fact by adding runtime fields to these data streams and/or adjust the routing to auto-discover the logs based on the source going forward.

Is the serverless forwarder doing this type of routing client-side (meaning within the Lambda function) currently?

for sure once we have to identify metadata parsing the message payload in order to provide them to the event router so that it does not a grok expression, then we already have the knowledge we'd need from the event routing :)

The main purpose is to move routing decisions from the client to to server so that it's more easily and dynamically adjustable and so that the clients can be more lightweight. It still relies on the routing decision to be rather cheap. While it's not impossible to route solely on grok expressions, this should be the very last resort and it's not what it is primarily designed for. I'd argue that any system that's designed around routing via grok primarily is bound to have a very high CPU impact on ingest and the impact gets larger the more integrations (and thus grok-based routing conditions) are enabled.

aspacca commented 2 years ago

hi @felixbarny , sorry for the late feedback

Do you have examples on how the full events would look like? Maybe a gist or a link to the full payload that would be sent to Elasticsearch? Is there really no metadata event that would signal where the log is coming from? Is it just a raw message with no context?

The problem is on the source events: see the three different events we can receive in the elastic-serverless-forwarder https://gist.github.com/aspacca/e7ed3e8b720081e6c23651af147211f0 (with dummy messages content)

As you can see in the metadata from SQS, Kinesis and CloudWatch Logs we don't have anything we can match to an integrations. The metadata can indeed reside in the messages themselves: not sure if this is the case you were referring about with the Kafka/k8s example. but for the case of native AWS logs integration, only a 20% of them is in json format where we could easily access and discover the metadata.

Is there any way to pre-group the logs into different data streams even if we can't associate an integration with them on ingest time?

we could have a few different strategies for that: for example by event source type (sqs/cloudwatch logs/kinesis).

Then we could apply structure to these events after-the-fact by adding runtime fields to these data streams and/or adjust the routing to auto-discover the logs based on the source going forward.

could you make me an example about how this would work? from what I can see, any pre-grouping strategy not based on metadata will end up with the same problem of needing to rely on grok expressions.

Is the serverless forwarder doing this type of routing client-side (meaning within the Lambda function) currently?

the serverless forwarder currently is doing a different type of routing client side: in the single case of an s3 notification to sqs as event source we have the path of the s3 object as metadata of the event. this path is deterministic for 80% of the native AWS integrations, and we leverage this for the automatic routing. this logic could be easily applied to a routing pipeline since we forward the metadata: the logic rely on simple string matching (https://github.com/elastic/elastic-serverless-forwarder/blob/main/handlers/aws/utils.py#L136-L151)

ruflin commented 2 years ago

the serverless forwarder currently is doing a different type of routing client side: in the single case of an s3 notification to sqs as event source we have the path of the s3 object as metadata of the event. this path is deterministic for 80% of the native AWS integrations, and we leverage this for the automatic routing. this logic could be easily applied to a routing pipeline since we forward the metadata: the logic rely on simple string matching (https://github.com/elastic/elastic-serverless-forwarder/blob/main/handlers/aws/utils.py#L136-L151)

This is the routing that I think would fit really well into the routing concept as it is simple based on a single string and a quick lookup.

aspacca commented 2 years ago

This is the routing that I think would fit really well into the routing concept as it is simple based on a single string and a quick lookup.

I agree, considering the performance implications emerged in the thread

from the specific point of view of the serverless forwarder the cpu intensive parsing (grok expressions etc) should be kept on the edge. I just wonder if it does make sense to have two different routing places, one in the client and one in the cluster, identifying the two different scenarios where they should be applied, or rather is less complex and easier to maintain a single one localised in the client (given the specific situation of a compute expensive routing logic)

ruflin commented 2 years ago

I think ideally most of the routing should happen centrally as it allows to update routing without having to deploy any configs to the edge for changes or updates to the binary. If there is some routing that never changes it could be hardcoded on the edge but I would rather use it as a fallback option instead of making it part of the design. The key for me is that we get the routing concept in Elasticsearch as on the edge we can partially already do it.

zez3 commented 1 year ago

The key for me is that we get the routing concept in Elasticsearch as on the edge we can partially already do it.

I was using input tags in the past, that was the way in Graylog https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-input-filestream.html#_tags_5

My example is mostly related to syslog We used to have a generic syslog, actually 3 (TCP, UDP, TCP+TLS) inputs where every new prod box(black or gray-ish) was able to just dump it's logs/event there. Some boxes not being able to export to other than port UDP 514. Afterwards there was regex involved, centrally. Not fun and surely not performant.

Now, my workaround is to use different IP addresses on the LB to differentiate the inputs(eg. 1.1.1.1:514 for a box and 1.1.1.2:514 for another box type) but that is not IP friendly as you need bigger pools.

I was thinking of using https://www.elastic.co/guide/en/beats/filebeat/current/add-tags.html with some regex or simple string match but sometimes it's hard because we might need to parse/dissect deep in the message on edge before we can reliably tag it.

Then(graylog time) I was routing based on remote ip(log.source.address in Elastic world) but that is mostly working for on-prem and just for transparent capable LBs|Proxys. We might have a collector in between.

I think some form of edge tagging is still needed unless we want to regex a lot.

Just my 2 cents

botelastic[bot] commented 11 months ago

Hi! We just realized that we haven't looked into this issue in a while. We're sorry! We're labeling this issue as Stale to make it hit our filters and make sure we get back to it as soon as possible. In the meantime, it'd be extremely helpful if you could take a look at it as well and confirm its relevance. A simple comment with a nice emoji will be enough :+1. Thank you for your contribution!