elastic / elasticsearch

Free and Open, Distributed, RESTful Search Engine
https://www.elastic.co/products/elasticsearch
Other
68.75k stars 24.43k forks source link

Specify multiple ingest pipelines for a data stream #61185

Open ruflin opened 3 years ago

ruflin commented 3 years ago

With the new indexing strategy, data sent from the Elastic Agent to Elasticsearch does not specify the ingest pipeline on the request, but each data stream already contains the ingest pipeline as setting. Currently we use index.default_pipeline but are thinking about also using index.final_pipeline for some final processing of the events. In some cases multiple pipeline are attached together with the pipeline processor. There are two use cases here:

The use cases around connecting multiple pipelines together I want to dive deeper here. In Ingest Manager we have at least three potential use cases for attaching multiple pipelines together:

Instead of ingest manager trying to modify the ingest pipelines to add multiple ingest pipelines together it would be nice if Elasticsearch would support an array of ingest pipelines and execute them in the order defined. Something like:

index.default_pipelines
- "pipeline1"
- "pipeline2"

The above would allow us to add a pipeline3 without having to modify the pipeline itself but only the settings. The same logic would apply to the final_pipelines so multiple teams could add their own final pipeline without conflicting with each other.

The above might also help with https://github.com/elastic/elasticsearch/issues/57968. The discussion here is what happens if the target data stream changes. With the above, I would expect that the pipelines are just added to the list and all of them are executed.

elasticmachine commented 3 years ago

Pinging @elastic/es-core-features (:Core/Features/Ingest)

mbudge commented 3 years ago

Good idea but also for normal indexes too.

The pipelines are too restrictive at the moment.

Beats is hogging the default pipeline forcing us to stick with logstash...

cutler-scott-newrelic commented 2 years ago

We would love to see this as well, as different teams may own different parts of the data ingest pipeline depending on the context.

felixbarny commented 2 years ago

The above would allow us to add a pipeline3 without having to modify the pipeline itself but only the settings.

Why is that desirable? IMHO, this increases complexity as there are two ways to change the pipelines then: the index settings and the ingest pipeline itself. In the stack management section of Kibana, there’s already good support for inspecting and editing pipelines. Having only one default pipeline per index seems less complex and easier to reason about.

Also, when adding a default pipeline to a list of default pipelines, you’ll have to remember that not only the index settings of all active indices have to be changed but also the index templates.

To me, it seems we have all the building block that we need to specify multiple ingest pipelines: the pipeline processor, processor tags to identify a processor within a pipeline, and optimistic concurrency control.

All problems in computer science can be solved by another level of indirection - Butler Lampson

Currently, Fleet integrations add their main processing pipeline directly to the index.default_pipeline setting. Instead of that, it could create a pipeline with a name that’s stable across versions of the integration and which users are free to customize. By default, it has one pipeline processor that delegates to the main processing pipeline of the integration.

When the integration gets updated because of a new version, Fleet modifies the existing pipeline by changing the pipeline processor that has a specific tag which identifies it as the pipeline processor that’s managed by Fleet. When doing so, Fleet should use concurrency control mechanisms to make sure the pipeline hasn’t been modified in the meantime.

When making changes to the default pipeline of an index, neither users nor Fleet have to worry about updating the index.default_pipeline setting of existing indices and the index template. That’s because the indices still invoke the same pipeline. Just the content of the pipeline changes.

Click to expand example ``` # User installs foo integration and fleet creates a default ingest pipeline for the index template logs-foo-* DELETE _ingest/pipeline/logs-foo PUT _ingest/pipeline/logs-foo { "version": 1, "processors": [ { "pipeline": { "tag": "logs-foo", "name": "logs-foo-1.0.0" } } ] } # User adds custom pipeline PUT _ingest/pipeline/logs-foo?if_version=1 { "processors": [ { "pipeline": { "tag": "logs-foo", "name": "logs-foo-1.0.0" } }, { "pipeline": { "name": "custom" } } ] } # Fleet updates version integration PUT _ingest/pipeline/logs-foo?if_version=2 { "processors": [ { "pipeline": { "tag": "logs-foo", "name": "logs-foo-1.1.0" } }, { "pipeline": { "name": "custom" } } ] } ```

To simplify the get-edit-put pattern when modifying pipelines, we could think of a convenience API in Elasticsearch: It would let you put/upsert or delete a processor with a given tag in an existing pipeline. The API would also take care of re-trying in case of concurrent modifications. A small caveat is that processor tags aren’t strictly unique.

cc @joshdover

ruflin commented 2 years ago

One part that is important from a Fleet perspective is that we have clear separation on what parts users can edit and what parts not. Ideally we would have read only pipelines in Elasticsearch that a user could not even modify.

I think both the settings and the processors approach proposed by @felixbarny are not ideal as there is a part that is touched by Fleet and the users which can lead to accidents and unexpected changes.

To simplify the get-edit-put pattern when modifying pipelines, we could think of a convenience API in Elasticsearch: It would let you put/upsert or delete a processor with a given tag in an existing pipeline. The API would also take care of re-trying in case of concurrent modifications. A small caveat is that processor tags aren’t strictly unique.

Such an API seems to be key to me no matter if we select the processors or the settings approach.

My personal preference is still on supporting multiple pipelines. It would allow package developers to split up very long pipelines into multiple chunks instead of having one very long pipeline. Also it allows to refer other pipelines. This is all possible with the pipelines processors too but it means custom logic is required in Fleet during installation time to put together the right "root" pipeline.

One thing we need to be careful about is upgrades. The data stream naming scheme and Fleet have a strict order in which upgrades must happen. There is a version number on the pipeline because new pipeline and mappings must be applied at the same time. So at first the pipeline is added, then the template is updated and then the new mapping and setting applied to the index. This is all not atomic so there is a chance, that a rollover could happen in the middle which means if there is no version on the pipeline, new mappings are not there yet and ingestion stops.

joshdover commented 2 years ago

@felixbarny I agree with a lot of what you're mentioning here and this is quite close to one of the solutions we're exploring in our design document (I sent you a link via DM).

One part that is important from a Fleet perspective is that we have clear separation on what parts users can edit and what parts not. Ideally we would have read only pipelines in Elasticsearch that a user could not even modify.

Agreed. If the "root" pipeline is managed / read-only, we could provide a UI for attaching a @custom pipeline (or even provide an empty one by default) which would be added to the root pipeline to run after the package's main pipeline(s).

My personal preference is still on supporting multiple pipelines. It would allow package developers to split up very long pipelines into multiple chunks instead of having one very long pipeline.

If the long pipeline problem is significant enough, we could add support to Fleet for installing multiple pipelines that are chained together. In general, I find that having multiple ways to do this may be confusing, especially when it comes to composable template overrides, etc.

Currently, Fleet integrations add their main processing pipeline directly to the index.default_pipeline setting. Instead of that, it could create a pipeline with a name that’s stable across versions of the integration and which users are free to customize.

One thing we need to be careful about is upgrades. The data stream naming scheme and Fleet have a strict order in which upgrades must happen. There is a version number on the pipeline because new pipeline and mappings must be applied at the same time.

I think @felixbarny's proposal here could still support our upgrade requirements if the user-defined custom pipeline has a stable name, but the "root" integration pipeline is versioned.

felixbarny commented 2 years ago

Agreed. If the "root" pipeline is managed / read-only, we could provide a UI for attaching a @custom pipeline (or even provide an empty one by default) which would be added to the root pipeline to run after the package's main pipeline(s).

Sounds good and we're already doing that for component templates, right?

My personal preference is still on supporting multiple pipelines. It would allow package developers to split up very long pipelines into multiple chunks instead of having one very long pipeline

If the long pipeline problem is significant enough, we could add support to Fleet for installing multiple pipelines that are chained together.

Already today, package developers can create multiple pipelines that are invoked from a main pipeline via a pipeline processor. See the panos integration for an example.

I think @felixbarny's proposal here could still support our upgrade requirements if the user-defined custom pipeline has a stable name, but the "root" integration pipeline is versioned.

++

joshdover commented 2 years ago

Sounds good and we're already doing that for component templates, right?

Yes, we do something similar for component templates today.

Already today, package developers can create multiple pipelines that are invoked from a main pipeline via a pipeline processor. See the panos integration for an example.

Yeah I noticed this too after looking at a few more integrations. Think we already have a solution to this problem.

ruflin commented 2 years ago

Already today, package developers can create multiple pipelines that are invoked from a main pipeline via a pipeline processor.

This is the case, we had this from day 1 as Filebeat modules also supported it. I see it a bit different from what is proposed here as in the current implementation, the developer must use the correct templates and use the pipeline processor. What I was thinking of is that a developer could put in a list of pipelines in the manifest and Fleet would do the pipeline processor magic. This only applies for cases where all pipelines need to be executed or the manifest could also support conditions. Often the pipelines can be split up into multiple phases like (just made up): basic enrichment with fields, taking message apart, renaming fields.

But all the above I think can be disconnected from this conversation. Because as soon as Fleet manages a base pipeline that is fully managed by Fleet, this base pipeline could be adjusted in each iteration without it being a breaking change to the users.

zez3 commented 2 years ago

If it's decided then please push it to dev. We need this sooner than later 🙏

mbudge commented 2 years ago

We are planning a migration from Beats to Elastic Agent/Fleet, and a major part of this is migrating our Logstash code to ingest pipelines. Our code runs after raw events have been converted to ECS format.

Should we use custom component templates to override the Final pipeline, and use a pipeline processor to add Fleet's Final pipeline to our custom Final pipeline?

We've invested 2-3 years development effort into our Logstash code so we need some guidance, as we don't want to migrate then find out there's some new top secret feature coming out the following month for custom ingest pipelines.

joshdover commented 2 years ago

@zez3 I appreciate your enthusiasm :) We want to be sure we have a solution that fits well for all use cases we intend to support so we can avoid breaking changes in the future. We're getting closer, but still have a few more options to evaluate (such as runtime fields).

Should we use custom component templates to override the Final pipeline, and use a pipeline processor to add Fleet's Final pipeline to our custom Final pipeline?

@mbudge I don't believe overriding the final pipeline is possible with the @custom component templates today because the .fleet_component_template-1 is higher precedence. Best bet for a workaround today is to override the default pipeline with a pipeline that calls the package-defined pipeline in a pipeline processor. However, I don't you can do this in a way that will survive package upgrades because the package-defined pipeline is versioned. Likely what will happen when the package is upgraded is that either the upgrade will fail because the pipeline is still in use, or worse the package upgrade will succeed but ingest will stop because the old pipeline version doesn't exist anymore.

akshay-saraswat commented 2 years ago

We are planning a migration from Beats to Elastic Agent/Fleet, and a major part of this is migrating our Logstash code to ingest pipelines. Our code runs after raw events have been converted to ECS format.

We've invested 2-3 years development effort into our Logstash code so we need some guidance, as we don't want to migrate then find out there's some new top secret feature coming out the following month for custom ingest pipelines.

@mbudge we would love to connect with you and understand what use-cases you are solving with logstash and what type of processing is happening there. As Josh mentioned in a comment above, we are evaluating options to provide you with those capabilities within Agent/Fleet and we want to make sure that we handle your requirements in whatever approach we proceed with. Are you on Elastic Community Slack?

zez3 commented 2 years ago

Since Josh mentioned runtime fields: What we miss from the ElasticSearch ingest pipelines is a DNS & rDNS processor. Available on Beats processors, Logstash and requested by many in ES as a new feature. From my point of view it would make sense to have the resolved name of an IP or host to ip option, at a minimum at runtime.

At the moment I have to do the dns resolution on the beat level. Meaning dissect a lot and eventually do the dns. Sometimes I don't need to do it for all of my events, just for some(on demand @runtime). This brings me to another point that might also be relevant to the main discussion. https://github.com/elastic/integrations/issues/2305 What if the main parsing will happen on the Agent and in ES just some additional enrichment?

PurpleV0id commented 2 years ago

I was in the same situation as @mbudge going from logstash to elastic agent, I used logstash to help drop events that i needed to filter, usually by regex, or to help normalise, ie lowercase host.name, add geo data etc.

I migrated to elastic agent and created custom pipelines that i then added to the top of .fleet_final_pipeline-1 and used conditions to help filter. This pipeline fortunately is not updated very often but it is far from ideal

joshdover commented 2 years ago

FYI we are planning to solve this problem for integration data streams by adding support for a new @custom ingest pipeline that will execute after the integration's default pipeline. See more details in https://github.com/elastic/kibana/issues/133740

PurpleV0id commented 2 years ago

Will there be any option to add this to the final pipeline as well? There are pipelines that I use that run across many integrations, and some that run across all

joshdover commented 2 years ago

Will there be any option to add this to the final pipeline as well? There are pipelines that I use that run across many integrations, and some that run across all

No plans currently, but would be curious to know why it needs to run in the final pipeline. That is generally reserved for any processors that need to run even if there are failures.

The proposal in https://github.com/elastic/kibana/issues/133740 will still allow for sharing pipelines across integrations, by using a pipeline processor in each data stream's custom pipeline that references the common one. I'll admit it requires a bit of manual work though.

I think if we wanted to extend the proposal in https://github.com/elastic/kibana/issues/133740 we could add a few custom pipelines that get called by every data stream, which would allow for different levels of granularity. This would allow for something like: