opensearch-project / data-prepper

Data Prepper is a component of the OpenSearch project that accepts, filters, transforms, enriches, and routes data at scale.
https://opensearch.org/docs/latest/clients/data-prepper/index/
Apache License 2.0
259 stars 190 forks source link

Enable pipeline to discard data older than XX #4667

Open marfago opened 3 months ago

marfago commented 3 months ago

Is your feature request related to a problem? Please describe. I have a data pipeline built as a combination of AOSS pipeline and AOSS collection. This pipeline is a real time monitor for logs. We recently had an outage so the source did not move logs for few days. When we finally unblocked the pipeline and restarted the ingestion, all the days were moved at once and the AOSS pipeline started to ingest oldest to newest. This behavior does not work for us where we prioritize fresher data over older because we want a real-time monitor.

Describe the solution you'd like I propose to introduce a a new behavior where the pipeline can discard data in the queue that are older than XX (days, hours,minutes). In this way users may choose to prioritize fresher data over older data without causing the queue to grow indefinitely. In my case I may just set this flag on 1H and only ingest fresh data (at least for some time) forgetting about the past.

For example: max_retention: 1h max_retention: 1d max_retention: 1w

Describe alternatives you've considered (Optional) I dont have any.

Additional context Related to https://github.com/opensearch-project/data-prepper/issues/4666

dlvenable commented 3 months ago

We do have the drop_events processor which can drop events that meet a certain condition.

I think what we are missing is a Data Prepper expression and/or function for comparing time.

Something like this could work:

drop_when: /my_timestamp < now() - 3d

Where 3d is our standard Data Prepper duration concept.

However, we do not have a now() function, nor the ability to perform comparisons against times. But, both could be added.

dlvenable commented 3 months ago

This could be done a little more easily by adding just a now() method for now.

drop_when: /my_timestamp < now() - 3 * 24 * 60 * 60 * 1000

@marfago , Are you interested in working on adding the now() method?

marfago commented 3 months ago

@dlvenable thank you for your comment. For the solution that you propose, how is the my_timestamp retrieved?

dlvenable commented 3 months ago

@marfago , Do your events have an existing timestamp field that you could use? The my_timestamp would need to be a value from your source events.

Are you using Amazon S3 as a source? If you also need a timestamp, we could include the value of the S3 object header Last-Modified as metadata on the events. Then you'd be able to use that to approximate the time of the event. This could be useful in general as well.