elastic / elasticsearch

Free and Open Source, Distributed, RESTful Search Engine
https://www.elastic.co/products/elasticsearch
Other
1.25k stars 24.85k forks source link

Add delete_flag property for latest transform #87583

Open tmitanitky opened 2 years ago

tmitanitky commented 2 years ago

Description

The latest transform is a great way to extract the latest documents from the source "history" index. Occasionally, we may want to use delete_flag on the history index to perform a logical delete and keep the delete history for a while.

Since there is no way to handle logical deletion in the current transform (in fact, it is feasible by tricking with retention_policy even now), the logically deleted document will also be indexed in the destination index, and it is necessary to filter delete_flag: false every query time.

When using source.query parameter, the query is conditioned BEFORE transform works, so even if the latest document is delete_flag: true, the older delete_flag: false document will be indexed to the destination index.

If the function of delete_flag is implemented, it will be possible to handle the history index with logical delete well.

As API, there might be some options:

Here is the example of the trick.

PUT _ingest/pipeline/last_updated_timestamp_pipeline
{
  "processors" : [
    {
      "set" : {
        "field" : "LastUpdatedTimestamp",
        "value" : "{{_ingest.timestamp}}"
      }
    }
  ]
}

PUT test_latest_transform
{
  "mappings": {
    "properties": {
      "delete_flag_datetime":{
        "type": "date"
      },
      "id": {
        "type": "keyword"
      }
    }
  },
  "settings": {
    "index":{
      "default_pipeline": "last_updated_timestamp_pipeline"
    }
  }
}

POST test_latest_transform/_doc
{
  "id": "a",
  "delete_flag_datetime":"1970-01-01"
}
POST test_latest_transform/_doc
{
  "id": "b",
  "delete_flag_datetime":"2200-01-01"
}

GET test_latest_transform/_search
# there are 2 documents

PUT _transform/test_retention_policy
{
  "dest": {
    "index": "test_latest_transform_dest"
  }, 
  "latest":{
    "sort": "LastUpdatedTimestamp",
    "unique_key": "id"
  },
  "source":{
    "index": "test_latest_transform"
  },
  "retention_policy":{
    "time":{
      "field": "delete_flag_datetime",
      "max_age": "10000d"
    }
  },
  "frequency": "3s",
  "sync": {
    "time":{
      "delay": "1s",
      "field": "LastUpdatedTimestamp"
    }
  }
}

POST _transform/test_retention_policy/_start

GET _transform/test_retention_policy/_stats
# make sure checkpoint completed

GET test_latest_transform_dest/_search?filter_path=hits.hits._source.id,hits.hits._source.delete_flag_datetime

# response
{
  "hits" : {
    "hits" : [
      {
        "_source" : {
          "id" : "b",
          "delete_flag_datetime" : "2200-01-01"
        }
      }
    ]
  }
}

# index new documents with delete_flag reverted
POST test_latest_transform/_doc
{
  "id": "a",
  "delete_flag_datetime":"2200-01-01"
}
POST test_latest_transform/_doc
{
  "id": "b",
  "delete_flag_datetime":"1970-01-01"
}

GET test_latest_transform/_search
# get 4 documents. 2 for each id=a and id=b. 

GET _transform/test_retention_policy/_stats
# make sure new checkpoint completed

GET test_latest_transform_dest/_search?filter_path=hits.hits._source.id,hits.hits._source.delete_flag_datetime
# response: id=b is deleted and id=a is newly indexed.
{
  "hits" : {
    "hits" : [
      {
        "_source" : {
          "id" : "a",
          "delete_flag_datetime" : "2200-01-01"
        }
      }
    ]
  }
}

If we can use {"delete_flag": true/flase} field, it is more intuitive and harder to embed bugs, and also it can be used in combination with the original retention_policy use.

Thanks.

elasticmachine commented 2 years ago

Pinging @elastic/ml-core (Team:ML)

hendrikmuhs commented 2 years ago

I think what makes this tricky is the deletion of existing documents, that's why filtering won't work. Filtering will let the old document stay in the index.

That's why I think it requires a Delete By Query, which is what retention_policy uses internally. However it is hardcoded to the time use case (that's why it is retention_policy.time). If we expand retention_policy to take a term query (retention_policy.term), this use case can be implemented.