elastic / elasticsearch

Free and Open Source, Distributed, RESTful Search Engine
https://www.elastic.co/products/elasticsearch
Other
1.52k stars 24.89k forks source link

[Transform] Speed up continuous transform by updating aggregation results in an incremental way #71097

Open przemekwitek opened 3 years ago

przemekwitek commented 3 years ago

Currently, continuous transform uses a number of optimizations in order to avoid excessive work when the changes between checkpoints are small. Most notably, transform identifies terms that changed since the last checkpoint. However, once it has identified changed terms, it will re-compute the aggregations on these terms without time filtering, i.e. from the beginning of time. We should consider implementing incremental updates for cases where we are able to compute aggregation result given previous result and the current portion.

Example: Suppose the user configured a continuous pivot transform with a number of group-by fields:

                "group_by": {
                    "a": { "terms": { "field": "a" } },
                    "b": { "terms": { "field": "b" } },
                    "c": { "terms": { "field": "c" } }
                },

and a number of simple aggregations:

                "aggregations": {
                    "min_date": { "min": { "field": "d" } },
                    "max_date": { "max": { "field": "d" } },
                    "value_count": { "value_count": { "field": "d" } }
                }

Suppose that we are at checkpoint n-1 and the destination index already contains a document:

{
    "a": "some-a",
    "b": "some-b",
    "c": "some-c",
    "min_date": 123456789,
    "max_date": 987654321,
    "value_count": 1000
}

Now, when we move to checkpoint n, the transform should only query in time range from the last checkpoint to the next checkpoint (i.e.: not from the beginning of time). Let's say the query returned:

{
    "a": "some-a",
    "b": "some-b",
    "c": "some-c",
    "min_date": 222222222,
    "max_date": 999999999,
    "value_count": 2000
}

which is our incremental update for the checkpoint n. When we update the destination index, instead of overwriting the values in the existing document, which would yield:

{
    "a": "some-a",
    "b": "some-b",
    "c": "some-c",
    "min_date": 222222222,
    "max_date": 999999999,
    "value_count": 2000
}

we should instead merge the existing document with the incremental update:

{
    "a": "some-a",
    "b": "some-b",
    "c": "some-c",
    "min_date": 123456789,  # minimum was taken from the existing document
    "max_date": 999999999,  # maximum was updated
    "value_count": 3000  # value count was summed up
}

because:

new.min_date = min(old.min_date, update.min_date)
new.max_date = max(old.max_date, update.max_date)
new.value_count = old.value_count + update.value_count

Of course, not all the aggregations are additive in nature (think, e.g.: percentiles) so the performance gains after implementing this optimization will be limited. The costs and benefits need to be considered and probably some benchmarking would help making an informed decision here.

elasticmachine commented 3 years ago

Pinging @elastic/ml-core (Team:ML)