elastic / elasticsearch

Free and Open, Distributed, RESTful Search Engine
https://www.elastic.co/products/elasticsearch
Other
69.52k stars 24.6k forks source link

Continuous transform with date histogram generates duplicates after index rollover #61587

Open olt4r opened 4 years ago

olt4r commented 4 years ago

Elasticsearch version (bin/elasticsearch --version): 7.9.0

JVM version (java -version): bundled 14.0.1

Description of the problem including expected versus actual behavior:

In an ongoing project, we are using continuous transforms with date_histogram in pivot for metrics aggregation from raw data over time. We also use ILM policies for target index rollover and old metrics data removal.

We noticed that duplicate records for the same timestamp and same values of other pivot 'dimensions' are generated by transform after target index rollover. This behaviour is incorrect, no such duplicates should be generated.

Logs and source code analysis shows that:

  1. Transform implementation causes date_histogram to produce buckets for an interval intersecting right bound of time range processed by current checkpoint. This causes transform to produce incomplete aggregates for this interval, as source documents are filtered based on time range. Such 'incomplete' records are then inserted into target index.

  2. Transform implementation also rounds down the left bound of processed time range to the nearest date_histogram interval when computing aggregates. This causes 'incomplete' records produced by one checkpoint to be overwritten by 'complete' ones generated by the next checkpoint, leading to multiple upserts per checkpoint.

  3. When index rollover occurs between two checkpoints, these upserts become inserts into newly created index (via write alias), leading to data duplication.

Steps to reproduce:

  1. Setup Elasticsearch artifacts as described in setup_artifacts.txt

  2. Extract provided data_generator.zip and run data generation script (requires Python 3 and elasticsearch-py): python3 data_generator.py

  3. Create index patterns raw_data-* and agg_data-* in Kibana for easy visualization. Switch to Discover view and select agg_data-* index pattern.

  4. Wait for index rollover and subsequent transform execution / checkpoint. Notice significant increase in number of documents in one 1m interval near rollover timestamp (it will be shifted to the past due to transform frequency and delay). 01_count_anomaly

  5. Explore 1m interval with increased document count. Notice that nearly all {@timestamp, x, y} combinations occur twice in this interval, with different _index field values. Notice that documents created in newer index will always have count greater or equal to duplicated document in older index due to upserts (as mentioned in problem description). 02_record_duplicates

  6. For more in-depth analysis, enable trace logging for org.elasticsearch.xpack.transform. Capture and examine logs of transform execution around index rollover. Notice how generated queries first produce 'incomplete' records and then attempt to overwrite them with complete data during next execution. Cross-examine logs with source code.

Proposed solution:

My approach would be to eliminate 'incomplete' records from the whole process. This should solve duplication issue and also prevent upserts that currently occur for every checkpoint. One way to achieve this is to ensure that the right bound of time range processed within checkpoint is rounded down to nearest date_histogram interval.

As an experiment, I have implemented a custom CheckpointProvider that produces checkpoints aligned to given interval. I have modified TransformService so that this IntervalBasedCheckpointProvider is instantiated whenever date_histogram is present in pivot definition, using histogram's interval. Current simplified implementation supports only fixed intervals.

Initial results are encouraging - no duplicates are generated when running reproduction scenario. It also seems to work correctly for our project's metrics aggregation transforms. However, adding a new CheckpointProvider seems somewhat excessive / not entirely correct. Maybe introducing fine-grained modifications in query building logic of TransformIndexer and related classes would be a better idea?

Since my experience with Elasticsearch source code is limited, any feedback / suggestions would be greatly appreciated.

elasticmachine commented 4 years ago

Pinging @elastic/ml-core (:ml/Transform)

hendrikmuhs commented 4 years ago

@olt4r Thanks for the report and the excellent repro steps.

Your observation is correct, if transform re-writes a data point but the write index changes inbetween, it might create duplicates.

I have tested your case and I think you can prevent this behavior if you add a query to the source:

  "source" : {
        "index" : [
          "raw_data*"
        ],
        "query" : {
          "range": {
            "@timestamp": {
              "lt": "now-3m/1m"
            }
          }
        }
      },

This is what you proposed but using the config instead of implementing it in code.

(For other readers: It takes the current time, deducts the delay (3m) and rounds down to fixed_interval (1m) of the date histogram aggregation. With other words, this must be in sync with delay and fixed_interval.)

I thought about this issue in another context: Because re-writes are expensive for Lucene it would be better for performance to avoid them. However this makes only sense if group_by is a date_histogram and you group short periods. We could add a intermediate_results (true/false) setting to the config. I am not sure we can auto-guess this mode as it depends on the usecase.

Regarding your implementation proposal, I think a new CheckpointProvider sounds a bit to complicated. Top of mind I would change the query. But 1st we need to discuss the API, afterwards I can look into the implementation. I will follow up with the team on that.

Last but not least, I think this is rather an enhancement request than a bug report, I will therefore re-label it.

Again, thanks! I hope I will be able to follow up soon on it.

olt4r commented 4 years ago

@hendrikmuhs Thanks for your fast reply and proposed solution.

I have already tested this approach before reporting this issue. From what I see in the logs, using date math as described would break change detection query in a subtle way, resulting in missing records.

Consider the following filter sections of change detection queries, generated by two consecutive transform executions:

"query":{
      "bool":{
         "filter":[
            {
               "range":{
                  "@timestamp":{
                     "from":null,
                     "to":"now-15m/m",         Wed Aug 19 23:51:00.000 CEST 2020
                     "include_lower":true,
                     "include_upper":false,
                     "boost":1.0
                  }
               }
            },
            {
               "range":{
                  "@timestamp":{
                     "from":1597873303851,     Wed Aug 19 23:41:43.851 CEST 2020
                     "to":1597873903855,       Wed Aug 19 23:51:43.855 CEST 2020
                     "include_lower":true,
                     "include_upper":false,
                     "format":"epoch_millis",
                     "boost":1.0
                  }
               }
            }
         ],
         "adjust_pure_negative":true,
         "boost":1.0
      }
   }
"query":{
      "bool":{
         "filter":[
            {
               "range":{
                  "@timestamp":{
                     "from":null,
                     "to":"now-15m/m",         Thu Aug 20 00:01:00.000 CEST 2020
                     "include_lower":true,
                     "include_upper":false,
                     "boost":1.0
                  }
               }
            },
            {
               "range":{
                  "@timestamp":{
                     "from":1597873903855,     Wed Aug 19 23:51:43.855 CEST 2020
                     "to":1597874503855,       Thu Aug 20 00:01:43.855 CEST 2020
                     "include_lower":true,
                     "include_upper":false,
                     "format":"epoch_millis",
                     "boost":1.0
                  }
               }
            }
         ],
         "adjust_pure_negative":true,
         "boost":1.0
      }
   }

NOTE: These samples have been copied directly from Elasticsearch logs on our test environment, not produced by provided reproduction scenario, so transform parameters are different (15m delay, 10m frequency, 1m date histogram), but the principle is the same. It was just faster to provide an already analyzed example. In order to generate corresponding logs for reproduction scenario with date math queries, simply enable traces for org.elasticsearch.xpack.transform

I have translated all timestamp values into human-readable form. For relative time specifications, I have provided values based on log timestamps.

Notice that change detection queries cover the following time ranges: Wed Aug 19 23:41:43.851 CEST 2020 - Wed Aug 19 23:51:00.000 CEST 2020 Wed Aug 19 23:51:43.855 CEST 2020 - Thu Aug 20 00:01:00.000 CEST 2020

However, no changes are detected for: Wed Aug 19 23:51:00.000 CEST 2020 - Wed Aug 19 23:51:43.855 CEST 2020

Transforms implementation uses change detection query results to explicitly enumerate terms for subsequent composite query used for data aggregation. So, speaking in terms of provided reproduction scenario, any {x,y} values falling into this uncovered time range (and at the same time not present in any of covered ranges), are effectively excluded from transform results, leading to some records potentially missing after each transform execution.

Is my understanding correct? If so, can you think of any other solution / workaround possible with current implementation?

Regarding future solution, I can see how generating intermediate results is desirable in some usecases, especially for longer intervals. Taking this into consideration, I agree that auto-guessing is not an option and explicit configuration is the correct way to go. I also agree that fine-grained query changes would be preferable to whole new CheckpointProvider.

Thanks for feedback!

hendrikmuhs commented 4 years ago

@olt4r You are right, this breaks change collection for terms, what I proposed might be ok if the transform uses only a date_histogram.

But, I think the query must use the same delay as for sync (in the repro steps you used 3m, your example uses 15m), otherwise this won't work, even if you only have a date_histogram group_by.

There is probably another race condition: The timestamp when the checkpoint is created and the timestamp of now in the datemath search query is off by some time. I think if the bucket boundary is between those two and a rollover happens, it will create dups again.

Long story short, this is quite complicated and should be properly solved in code.

hendrikmuhs commented 4 years ago

Throwing in a workaround to write to write timebased indexes.

Instead of using ILM to rollover the transform destination index you could use a Date Index Name Ingest Processor to change the output index based on the time bucket. This will always route upserts to the right index and therefore workaround the original duplication problem.

Example config:

PUT _ingest/pipeline/round-1h
{
  "description": "create hourly indexes",
  "processors" : [
    {
      "date_index_name" : {
        "field" : "@timestamp",
        "index_name_prefix" : "agg_data-",
        "date_rounding" : "h",
        "index_name_format": "yyyy-MM-dd-HH",
        "date_formats": ["UNIX_MS"]
      }
    }
  ]
}

In this example I create hourly indexes (20m does not seem to be possible) and configure the format compatible to transform. You can use this pipeline as part of the transform destination.

The problem with this solution is the auto-deletion of indexes using ILM, because you do not use it, you need some custom code to delete old indexes.

olt4r commented 4 years ago

@hendrikmuhs The query in my example uses the same delay as for transform sync - see the long NOTE paragraph right below provided logs. As I mentioned, these were copied directly from our test environment and the transform there uses different parameters (15m delay, 10m frequency, 1m date histogram).

I decided to use an already analyzed example to provide feedback on your proposed solution as soon as possible. For reproduction scenario, the correct date math to use would of course be now-3m/1m. Sorry for the confusion.

You are right about the other race condition. It seems to me that all these problems are related to a subtle incompatibility in time handling between transforms checkpointing mechanism and other Elasticsearch areas (current system time vs calendar-aligned intervals).

Thank you very much for proposed workaround :+1: At first glance this should eliminate duplicates, however I will need some time to look into it and confirm whether it can be applied in our project.

hendrikmuhs commented 3 years ago

I created https://github.com/elastic/elasticsearch/issues/62746 as a spin-off from this discussion.

ddolcimascolo commented 1 year ago

Hi all,

We're hit by this too. I'm in a slightly different use case: the date field used in the date_histgram is not the same field that is used as the sync.time.field setting. As such, I'm not benefiting from checkpoint alignments... Using time-based indices does not seem like a proper workaround to me, I use ILM to rollover on both primary shard size and index age, I can't just thrown everyhting in, let's say, monthly indices; they'll grow too big.

For your reference: https://discuss.elastic.co/t/transform-not-aligning-checkpoints-with-date-histogram/324063

Is there any plan to avoid duplicates in this case ?

Thanks, David