opensearch-project / data-prepper

Data Prepper is a component of the OpenSearch project that accepts, filters, transforms, enriches, and routes data at scale.
https://opensearch.org/docs/latest/clients/data-prepper/index/
Apache License 2.0
254 stars 188 forks source link

[RFC] Conditional Routing #1007

Closed dlvenable closed 2 years ago

dlvenable commented 2 years ago

Background

Data Prepper pipelines currently do not support conditionals or routing. Thus all events in Data Prepper must flow through all sinks and processors in a pipeline. Many users require the ability to route events to different sinks and processors depending on the specific event.

The following diagram outlines a common scenario: Users need to route data to different sinks depending on some property of the event.

ConditionalRouting-SinkExampleByType

Proposal

This RFC introduces a concept of a router to Data Prepper. Pipeline authors can define named routes in the router. Data Prepper will apply routes to individual Events before sending them to Sinks.

This GitHub issue focuses on using routing to route sinks. See #522 RFC for a proposal for routing through a processor chain.

The following diagram outlines where the router will sit and what it will perform.

ConditionalRouting-Simple

Design

Data Prepper will introduce a new router component to the pipeline. This is at the same level of the YAML as the prepper and sink. The router will run after the Processor chain and before the Sinks. Data Prepper would evaluate these routes directly before passing the Events into the sinks.

log-pipeline:
  source:
    http:
  processor:
  router:
    - application-logs: '/log_type == "application"'
    - http-logs: '/log_type == "apache"'
  sink:
    - opensearch:
        hosts: [ "https://opensearch:9200" ]
        index: application_logs
        routes: [application-logs]
    - opensearch:
        hosts: [ "https://opensearch:9200" ]
        index: http_logs
        routes: [http-logs]
    - opensearch:
        hosts: [ "https://opensearch:9200" ]
        index: all_logs

Any sink with the routes property will only accept Events which match at least one of the routes. In the example above, application-logs is a named route. Data Prepper will only route events with the application-logs route to the first opensearch sink.

By default, Data Prepper will route all Events to a sink which does not define a route. Thus, in the example above, all Events will go into the third opensearch sink.

Alternatives

See the comments below for alternatives.

graytaylor0 commented 2 years ago

I really like this! It is clever to set the routes in the EventMetadata. I do think users will want the remove_routes functionality, but there is no reason to add it immediately as it can be added easily in the future. From a user perspective, only typing a route for each sink/processor rather than a full conditional (like we had with the when conditional) is much cleaner, and has the added benefit of only evaluating conditionals once in the router processor.

The thing I would like some more clarity on is what really changes from the router's perspective for sinks vs processors. Event though sinks don't have ordering, it seems like the implementation would be the same.

dapowers87 commented 2 years ago

In the first alternative, should the second router listing say apache instead of http-logs?

dlvenable commented 2 years ago

@dapowers87 , That was a typo, and I updated it. Thanks for catching and noting it!

dapowers87 commented 2 years ago

I really like the idea of the first alternative. Having a router be a base component sounds like it could open itself up to a lot of neat ideas down the road.

dapowers87 commented 2 years ago

What if we made router a base component and have pipeline flow in and out of them. Something like this?

pipeline-a:
  source:
    http:
  to-router: router-a

pipeline-b:
  processor:
    grok: ...
    add_entry: ...
  to-router: router-b

pipeline-c:
  processor:
    grok: ...
    add_entry: ...
  sink:
    opensearch: ...

pipeline-d:
  processor:
    grok: ...
    add_entry: ...
  to-router: router-c

pipeline-sink-a:
  sink:
    opensearch: ...
    stdout:

pipeline-sink-b:
  sink:
    opensearch: ...

router:
  - name: router-a
    set_routes:
      - pipeline-b: "some_conditional"
      - pipeline-c: "secondary_conditional"
  - name: router-b
    set_routes:
      - pipeline-c: "third_conditional"
      - pipeline-d: "fourth_conditional"
  - name: router-c
    set_routes:
      - pipeline-sink-a: "fifth_conditional"
      - pipeline-sink-b: "sixth_conditional"
cmanning09 commented 2 years ago

This behavior will support other use-cases in the future. One example is to have a common, shared pipeline. Currently, Data Prepper does not support multiple sources in a pipeline. However, if that support is added, users could define a common middle pipeline which is shared across different pipelines.

Can this already be achieved like so:

application-log-pipeline:
  source:
    http:
  prepper:
     ...
  sink:
    - pipeline:
        name: "log-pipeline"

appach-log-pipeline:
  source:
    http:
  prepper:
     ...
  sink:
    - pipeline:
         name: "log-pipeline"

log-pipeline:
  source:
    http:
  prepper:
     ...
  sink:
    - opensearch:

An alternative I would like to propose is making a router a type of sink. It reduces the routing and/or conditional checks in processors & core. It does not require building a concept of routes into data prepper core and only requires us to develop a single plugin. It does create more pipelines which may be a downside depending on how you look at it. Here is an example:

log-pipeline:
  source:
    http:
  prepper:
      ... generic log processing
  sink:
    -router:
      - '/type == "application"':
        - opensearch:
            hosts: [ "https://opensearch:9200" ]
            index: application_logs
      - '/type == "apache"':
        - pipeline:
             name: "apache-log-pipeline"

appache-log-pipeline:
  source:
    pipeline: log-pipeline
  prepper:
     ... unique appache log processing
  sink:
    - opensearch:
         hosts: [ "https://opensearch:9200" ]
         index: appache_logs
cmanning09 commented 2 years ago

has the added benefit of only evaluating conditionals once in the router processor.

@graytaylor0 , the original conditional would be evaluated once but every downstream processor / sink / core will need to evaluate the route.

graytaylor0 commented 2 years ago

has the added benefit of only evaluating conditionals once in the router processor.

@graytaylor0 , the original conditional would be evaluated once but every downstream processor / sink / core will need to evaluate the route.

Yes but evaluating if a String is in a Set is less computationally intensive than evaluating the conditional itself.

graytaylor0 commented 2 years ago

Can this already be achieved like so:


source:
http:
prepper:
...
sink:
- pipeline:
name: "log-pipeline"

appach-log-pipeline: source: http: prepper: ... sink:

log-pipeline: source: http: prepper: ... sink:

application-log-pipeline:
  source:
    http:
  prepper:
     ...
  sink:
    - pipeline:
        name: "log-pipeline"

appach-log-pipeline:
  source:
    pipeline: 
       name: "application-log-pipeline"
  prepper:
     ...
  sink:
    - pipeline:
         name: "log-pipeline"
dlvenable commented 2 years ago

This behavior will support other use-cases in the future. One example is to have a common, shared pipeline. Currently, Data Prepper does not support multiple sources in a pipeline. However, if that support is added, users could define a common middle pipeline which is shared across different pipelines.

Can this already be achieved like so:

@cmanning09 ,

There are a few reasons we cannot do this now.

  1. As @graytaylor0 pointed out, pipeline authors must specify the pipeline source in their pipeline. They cannot support "any pipeline" as a source.
  2. Pipelines cannot have multiple sources. As I noted in the issue description, this is a separate issue.
  3. Some users want to have pipelines where events flow through a shared pipeline and then out to different final destination sinks. So for example, application logs go through the shared pipeline and then out to an application logs OpenSearch. HTTP logs could also go through the shared pipeline and then out to an HTTP-logs OpenSearch cluster.

It is for the third reason that I proposed a router processor. These routers would apply routes to Events early in a pipeline and then allow them to route differently later.

An alternative I would like to propose is making a router a type of sink. It reduces the routing and/or conditional checks in processors & core. It does not require building a concept of routes into data prepper core and only requires us to develop a single plugin. It does create more pipelines which may be a downside depending on how you look at it. Here is an example:

This approach is somewhat similar to some of the alternatives I described (though it moves from Core to a plugin). If Data Prepper had a router sink, how would Data Prepper handle the shard pipeline use-case that I just described?

dlvenable commented 2 years ago

The discussions indicate some additional interest in a solution similar to the first alternative provided. I want to clarify the similarities and differences between the proposal and the first alternative. Then I'll elaborate on the specifics of the first alternative. Finally, I'll make some pros and cons for each.

Similarities

Both approaches use a router prior to sending data to sinks. The diagram below shows where events are routed.

ConditionalRouting-Simple

Differences

First, I'd like to make distinction between two phases of routing.

The primary proposal and the alternatives in this RFC have two different approaches for evaluation of routes.

  1. Pre-evaluation - Perform the Routing Evaluation anytime prior to the Routing Application phase. This can be performed in the same pipeline or in previous pipelines.
  2. Just-in-time evaluation - Perform the Routing Evaluation immediately before the Routing Application.

Pre-Evaluation

The current proposal to include a router processor performs pre-evaluation of routes. The routes are evaluated and then added to Events in the processor. However, Events are not actually routed until heading into the Sinks.

ConditionalRouting-PreEvaluation

Just-in-Time Evaluation

The first alternative is to support just-in-time evaluation of routes. In this approach, the router will apply both Routing Evaluation and Routing Application.

The following diagram shows how this works conceptually.

ConditionalRouting-JitEvaluation

Details on First Alternative

The YAML syntax for the first alternative is repeated below.

log-pipeline:
  source:
    http:
  prepper:
  router:
    - application-logs: '/log_type == "application"'
    - http-logs: '/log_type == "apache"'
  sink:
    - opensearch:
        hosts: [ "https://opensearch:9200" ]
        index: application_logs
        route: application-logs
    - opensearch:
        hosts: [ "https://opensearch:9200" ]
        index: http_logs
        route: http-logs
    - opensearch:
        hosts: [ "https://opensearch:9200" ]
        index: all_logs

While the router and sink appear in different YAML sections, they would operate close enough together that Route Evaluation and Route Application occur simultaneously.

Multiple Routes

This approach can also support the routes property (plural). The following example shows how it would work.

log-pipeline:
  source:
    http:
  prepper:
  router:
    - 4xx_requests: '/status_code >= 400 and /status_code < 500'
    - 5xx_requests: '/status_code >= 500'
  sink:
    - opensearch:
        hosts: [ "https://opensearch:9200" ]
        index: non_success_logs
        routes:
          - 4xx_requests
          - 5xx_requests
    - opensearch:
        hosts: [ "https://opensearch:9200" ]
        index: error_logs
        routes:
          - 5xx_requests

Achieving Similar Behavior to Pre-Evaluation

Pipeline authors can achieve a similar result as pre-evaluation using the new mutate processors, after they have support for conditional configuration. Pipeline authors will be able to conditionally set user-defined fields on Events in mutate processor. In the router definition, they can configure the routing based on the values user-defined field.

The following shows a conceptual example. Note that the add_entries processor does not currently support conditionals.

log-pipeline:
  source:
    http:
  prepper:
    - add_entries:
         entries:
           - key: "log_type"
             value: application
        when: '/file_name == "application.log"'
    - add_entries:
         entries:
           - key: "log_type"
             value: apache
        when: '/file_name == "http.log"'
  router:
    - application-logs: '/log_type == "application"'
    - apache: '/log_type == "apache"'
  sink:
    - opensearch:
        hosts: [ "https://opensearch:9200" ]
        index: application_logs
        route: application-logs
    - opensearch:
        hosts: [ "https://opensearch:9200" ]
        index: http_logs
        route: apache

The only significant downside is that the field is now part of the Event and the Sink will send it to the final destination system. This can still be worked around by routing to a second pipeline which drops the field from the Event.

Pros & Cons of the Approaches

This section considers some advantages and disadvantages to each approach.

Performance

I expect the performance of either routing approach to be similar. There may be some subtle differences depending on the pipelines. For both approaches, evaluation and routing need to take place. Both approaches would iterate over events, evaluate the events against expressions, and then place those events into new collections. This yields a time complexity of O(n) for both approaches, where n is the number of events. Space complexity will be O(n), where n is the number of Events since new pointers are needed.

Both approaches do use the concept of "named routes". This may yield some performance gains because expressions do not need to be reevaluated.

As a counter example, see the following syntax. In this type of router, the /log_type == "application" would need to be performed twice for each event.

  sink:
    - opensearch:
        hosts: [ "https://opensearch:9200" ]
        index: application_and_http
        routes:
          - '/log_type == "application"'
          - '/log_type == "http"'
    - opensearch:
        hosts: [ "https://opensearch:9200" ]
        index: application_only
        routes:
          - '/log_type == "application"'

Both of the proposal and the alternative avoid this duplicate evaluation. However, this is not likely going to have a significant impact on the overall performance of the pipeline.

Ease-of-Use

Because these two approaches apply routes in different places, the approach taken will have an impact on how easy and intuitive Data Prepper is for pipeline authors.

I expect that placing conditionals closer to the routing - as just-in-time evaluation does - will be easier for pipeline authors to grasp.

The ability to add routes in one pipeline and have them continue to apply in others may be useful for robust pipeline authors. But, it could also cause confusion for new pipeline authors. The just-in-time evaluation yields simpler usage here as well.

Visualization

One long-term possibility for Data Prepper is to support pipeline visualizations. In particular, the ability to build pipelines graphically.

The alternative approach (just-in-time evaluation) may be better suited for this because the conditions sit directly on the lines where the routing occurs.

This diagram shows how the pre-evaluation might be visualized. As you can see, the routes are defined as properties on the router processors.

ConditionalRouting-PreEvaluationVisualization

In the diagram for just-in-time evaluation, the conditions can be visualized directly on the arrows.

ConditionalRouting-JitVisualization

Flexibility

The pre-evaluation route may offer more flexibility. In particular, it can nicely accommodate routes on processors. Any processor could be executed only for events of a certain route. This could allow for a weaving of Events in and out of processors. This may be powerful, but could also be hard to visualize.

dlvenable commented 2 years ago

@cmanning09 , I want to be sure to address the idea of a "router-as-a-sink" which you proposed. In this comment, I want to compare it similarly to how I compared the pre-evaluation and just-in-time evaluation in my last comment.

Performance

I do expect that the performance will be similar. The time and space complexity should both remain O(n).

The main difference may lie in the threading model. Each Sink runs in parallel currently in some shared threads. This router sink would also run in its own Future and thus have one thread from the perspective of Data Prepper Core.

Data Prepper does have some open issues for allowing Processors to make better use of the threading model. So this could be improved in the future. But, for now, it would be either sequential or create its own threads.

Ease-of-Use

Having the router sink does appear rather intuitive to use. However, it does result in more nesting. Avoiding this nesting is becoming harder with conditionals, but it would be nice to avoid it as much as possible.

Pipeline Visualization

I tried to create a similar visualization for your proposal. I'm pasting it below. If you believe there is a better visualization, please share.

Visualization-Router-as-Sink

I do think the visualizations for the other approaches are more intuitive.

Flexibility

I expect that this approach is just as flexible as the just-in-time router evaluation. I do not see any loss of flexibility there. As with the just-in-time router evaluation, this solution would have no bearing on processor routing.

dlvenable commented 2 years ago

Original RFC Proposal

The following comment is here to track the original RFC proposal.

Data Prepper will add a new Event metadata field for routes. Once a route is added, it remains part of the Event as long as that Event exists within Data Prepper. Data Prepper will also include a new processor named router. This processor adds routes to Events when processed. Additionally, sinks can be configured to only accept Events with a matching route.

The routing conditionals will use the condition syntax as determined by #1005.

Configuration

The following is an example of the proposed configuration. It is the YAML configuration for the pipeline shown in the diagram above.

log-pipeline:
  source:
    http:
  prepper:
    - router:
        set_routes:
          - application-logs: '/log_type == "application"'
          - http-logs: '/log_type == "apache"'
  sink:
    - opensearch:
        hosts: [ "https://opensearch:9200" ]
        index: application_logs
        route: application-logs
    - opensearch:
        hosts: [ "https://opensearch:9200" ]
        index: http_logs
        route: apache
    - opensearch:
        hosts: [ "https://opensearch:9200" ]
        index: all_logs

Additionally, pipeline authors can use the routes property (plural) to use the same sink for multiple routes. Data Prepper will route to a sink when any route from the routes property is on the Event.

sink:
    - opensearch:
        hosts: [ "https://opensearch:9200" ]
        index: error_logs
        routes:
          - 4xx_requests
          - 5xx_requests

Data Prepper will provide both the route and routes options. The route is a simpler syntax to cover many common scenarios. Users can only define either route or routes. If a user defines both, Data Prepper will error and exit.

Route Evaluation

The router processor will evaluate route expressions and add the route metadata when the processor runs.

This allows pipeline authors to apply routes using fields which you wish to remove or alter later. The following example shows how the file_name field can be used to determine a route. Then that field is removed, but the route is still applied. Please note that the exact operation for removing fields is still being worked on as part of #508.

log-pipeline:
  source:
    http:
  prepper:
    - router:
        set_routes:
          - application-logs: '/file_name == "application.log"'
          - http-logs: '/file_name == "http.log"'
    - delete_entries:
         with_keys:
           - 'file_name'
  sink:
    - opensearch:
        hosts: [ "https://opensearch:9200" ]
        index: application_logs
        route: application-logs
    - opensearch:
        hosts: [ "https://opensearch:9200" ]
        index: http_logs
        route: apache

Implementation

The route property on the Sinks will be a Data Prepper Core feature. While the route sits on the same level as pipeline settings, it will be managed entirely by Data Prepper Core. Plugin authors will not be able to use the route property, override it, or disable it.

This is different from the original when concept proposed in #522.

I propose adding the route here because this makes the most sense for a pipeline author. These configurations are how pipeline authors conceive of their sinks. Thus, making the route a property on the sink should be straightforward for pipeline authors. The downside to this approach is that it may be slightly more confusing to sink plugin authors. They may expect that they have to handle the route. I believe this is a reasonable tradeoff for improving the pipeline author experience.

Model Changes

The EventMetadata model will have a new set for routes. This set will be modifiable.

Set<String> getRoutes();
dlvenable commented 2 years ago

The maintainers are working on implementing this in #1337. Since we have finalized the approach, I'm closing this RFC. We will continue to track the implementation in #1337 and that will indicate when the feature itself is complete.

dlvenable commented 2 years ago

I'm re-opening this RFC for discussion based on a comment in a related PR - https://github.com/opensearch-project/data-prepper/pull/1681#pullrequestreview-1082667003.

The current YAML design proposed in this RFC is pasted again below.

log-pipeline:
  source:
    http:
  processor:
  router:
    - application-logs: '/log_type == "application"'
    - http-logs: '/log_type == "apache"'
  sink:
    - opensearch:
        hosts: [ "https://opensearch:9200" ]
        index: application_logs
        routes: [application-logs]
    - opensearch:
        hosts: [ "https://opensearch:9200" ]
        index: http_logs
        routes: [http-logs]
    - opensearch:
        hosts: [ "https://opensearch:9200" ]
        index: all_logs

The router is the mechanism for assigning routes. Then each Sink can apply one or more routes that it supports. If no routes are specified, then that Sink supports all routes. Pipeline authors define these routes on the concrete sink that they create. This proposal has the functionality in Data Prepper Core and Sink plugin developers do not have the ability to control the routing for their sinks.

Some of the reasoning for this structure:

It has a few downsides worth noting:

Overall, I think that the compact and clear syntax will help more users overall. But, I'd like to get other thoughts.

Some New Alternatives

Additionally, we can consider breaking the syntax with a few possible forms:

Alternative 1: Combine router and sinks. There would no longer be a sinks section.

router:
  - '/log_type == "application"':
     - opensearch:
         host: ...
     - opensearch:
          host: ...
  - '/log_type == "apache"':
     - opensearch:
          host:

Alternative 2: Invert how we generate plugins. This could be combined with #1025 to move id and route up a level.

router:
    - application: '/log_type == "application"'
    - http: '/log_type == "apache"'
sinks:
  - opensearch-a
       type: opensearch-application
       routes: ["application"]
       configuration:
          host: http://...
          username: admin
          password: admin
  - opensearch-apache
       type: opensearch
       routes: ["apache"]
       configuration:
          host: http://...
          username: admin
          password: admin
  - opensearch-all
       type: opensearch
       configuration:
          host: http://...
          username: admin
          password: admin

Alternative 3: Again using #1025, if plugin ids are required, we can use them in the router.

  router:
    - application-logs:
         condition: '/log_type == "application"'
         sinks: [opensearch-application]
    - http-logs: '/log_type == "apache"'
          sinks: [opensearch-apache]
  sink:
    - opensearch:
        id: opensearch-application
        hosts: [ "https://opensearch:9200" ]
        index: application_logs
    - opensearch:
        id: opensearch-apache
        hosts: [ "https://opensearch:9200" ]
        index: http_logs
    - opensearch:
        id: opensearch-all
        hosts: [ "https://opensearch:9200" ]
        index: all_logs
cmanning09 commented 2 years ago

We are defining routes and their destinations separately. Can this be (mis-)used as a dropping mechanism? All http-logs would not have a sink in the example below. This may be an Implementation detail but something I wanted to call out.

log-pipeline:
  source:
    http:
  processor:
  router:
    - application-logs: '/log_type == "application"'
    - http-logs: '/log_type == "apache"'
  sink:
    - opensearch:
        hosts: [ "https://opensearch:9200" ]
        index: application_logs
        routes: [application-logs]
dlvenable commented 2 years ago

The gains to the pipeline author experience for configuring sinks according to the current RFC outweigh some of the possible issues which arise from confusion to plugin developers. So I'll continue to keep the routes property in-line with other sink properties. This is expected if we think about these properties as being properties of the sink itself, and not so much plugin-specific properties.

One change I'd like to suggest from the current RFC is to rename the pipeline-level router to route. The reason I suggest this:

The following puts this all together?

log-pipeline:
  source:
    http:
  processor:
  route:
    - application-logs: '/log_type == "application"'
    - http-logs: '/log_type == "apache"'
  sink:
    - opensearch:
        hosts: [ "https://opensearch:9200" ]
        index: application_logs
        routes: [application-logs]
    - opensearch:
        hosts: [ "https://opensearch:9200" ]
        index: http_logs
        routes: [http-logs]
    - opensearch:
        hosts: [ "https://opensearch:9200" ]
        index: all_logs
cmanning09 commented 2 years ago

Thanks @dlvenable. I agree with your logic for prioritizing for the pipeline author experience. I like the suggestion to rename router to route.

dlvenable commented 2 years ago

I'm closing this issue since this is the design we are working toward now as part of #1337.