opensearch-project / OpenSearch

🔎 Open source distributed and RESTful search engine.
https://opensearch.org/docs/latest/opensearch/index/
Apache License 2.0
9.55k stars 1.75k forks source link

[BUG] Bulk upsert does not behave like a single Upsert, with an ingestion pipeline #10864

Open acidul opened 11 months ago

acidul commented 11 months ago

Describe the bug A single Upsert works as expected with an ingestion pipeline. But the same operation in a Bulk upsert doesn't give the same result.

To Reproduce Steps to reproduce the behavior:

  1. Create an ingestion pipeline that aims to calculate a duration between 2 dates
PUT _ingest/pipeline/pipeline-duration
{
  "description": "This pipeline complete data with begin, end and duration",
  "processors": [
    {
      "script": {
        "description": "Old duration",
        "lang": "painless",
        "source": "ctx.old_duration = 'Old duration was : '+ctx.event_duration;"
      }
    },
    {
      "script": {
        "description": "Begin computation",
        "lang": "painless",
        "params": { "_source": "" }, 
        "source": """
          if (ctx.event_begin == null || ((long) ctx.event_min) < ((long) ctx.event_begin)) { 
            ctx.event_begin = (long)ctx.event_min; 
          } else { 
            ctx.event_min = ctx.event_begin 
          } 
        """
      }
    },
    {
      "script": {
        "description": "End computation",
        "lang": "painless",
        "params": { "_source": "" }, 
        "source": """
          if (ctx.event_end == null || ((long) ctx.event_max) > ((long) ctx.event_end)) { 
            ctx.event_end = (long)ctx.event_max; 
          } else { 
            ctx.event_max = ctx.event_end 
          } 
        """
      }
    },
    {
      "script": {
        "description": "Duration computation",
        "lang": "painless",
        "source": "ctx.event_duration = (long)(ctx['event_max']-ctx['event_min']);"
      }
    }
  ]
}
  1. Create an index with the ingest pipeline
PUT index-duration
{
  "aliases": {},
  "mappings": {
    "properties": {
      "event_min": {
        "type": "float"
      },
      "event_max": {
        "type": "float"
      },
      "event_begin": {
        "type": "date",
        "format": "epoch_millis"
      },
      "event_end": {
        "type": "date",
        "format": "epoch_millis"
      },
      "event_duration": {
        "type": "long"
      },
      "event_name": {
        "type": "keyword"
      },
      "old_duration": {
        "type": "keyword"
      }
    }
  },
  "settings": {
    "index": {
      "final_pipeline": "pipeline-duration"
    }
  }
}
  1. Execute the 2 following Update operations :
    
    POST index-duration/_update/doc_duration
    {
    "doc" : {
    "event_min": 1,
    "event_max": 2,
    "event_name": "occurrence_1"
    },
    "doc_as_upsert": true
    } 

second time 

POST index-duration/_update/doc_duration { "doc" : { "event_min": 3, "event_max": 5, "event_name": "occurrence_2" }, "doc_as_upsert": true }

then check the document 
GET index-duration/_doc/doc_duration
it gives : 

"_source": { "event_end": 5, "old_duration": "Old duration was : 1", "event_duration": 4, "event_min": 1, "event_name": "occurrence_2", "event_max": 5, "event_begin": 1 }

4. Now execute the same Update operations but embedded in a bulk

POST _bulk { "update": { "_index": "index-duration", "_id": "doc_duration_issue" } } { "doc" : { "event_min": 1, "event_max": 2, "event_name": "occurrence_1"},"doc_as_upsert": true}

second time : 

POST _bulk { "update": { "_index": "index-duration", "_id": "doc_duration_issue" } } { "doc" : { "event_min": 3, "event_max": 5, "event_name": "occurrence_2"},"doc_as_upsert": true}

Then check the document with 
GET index-duration/_doc/doc_duration_issue
it gives : 

"_source": { "event_end": 5, "old_duration": "Old duration was : null", "event_duration": 2, "event_min": 3, "event_name": "occurrence_2", "event_max": 5, "event_begin": 3 }

**Expected behavior**
Bulk Upsert and "Single" Upsert should have the same behavior when there is an ingestion pipeline. 
We expect to get the same values than doc_duration :

"_source": { "event_end": 5, "old_duration": "Old duration was : 1", "event_duration": 4, "event_min": 1, "event_name": "occurrence_2", "event_max": 5, "event_begin": 1 }



**Plugins**
odfe-node1 opensearch-alerting                  2.11.0.0
odfe-node1 opensearch-anomaly-detection         2.11.0.0
odfe-node1 opensearch-asynchronous-search       2.11.0.0
odfe-node1 opensearch-cross-cluster-replication 2.11.0.0
odfe-node1 opensearch-custom-codecs             2.11.0.0
odfe-node1 opensearch-geospatial                2.11.0.0
odfe-node1 opensearch-index-management          2.11.0.0
odfe-node1 opensearch-job-scheduler             2.11.0.0
odfe-node1 opensearch-knn                       2.11.0.0
odfe-node1 opensearch-ml                        2.11.0.0
odfe-node1 opensearch-neural-search             2.11.0.0
odfe-node1 opensearch-notifications             2.11.0.0
odfe-node1 opensearch-notifications-core        2.11.0.0
odfe-node1 opensearch-observability             2.11.0.0
odfe-node1 opensearch-performance-analyzer      2.11.0.0
odfe-node1 opensearch-reports-scheduler         2.11.0.0
odfe-node1 opensearch-security                  2.11.0.0
odfe-node1 opensearch-security-analytics        2.11.0.0
odfe-node1 opensearch-sql                       2.11.0.0

**Host/Environment (please complete the following information):**
 - docker images : opensearchproject/opensearch:2.11.0

**Additional context**
This issue is not exactly the same than the one posted  here : #2607 
That's why I prefer open a new one.
msfroh commented 11 months ago

Interesting... I suspect that the root cause may be the same as #2607, but this is another good test case.

Hopefully we can address both issues with one fix.

gaobinlong commented 10 months ago

It looks a little bit complicated, update api will transform updateRequest to a new indexRequest if the document exists, and the new doc in the indexRequest has been merged with the existing document, see: https://github.com/opensearch-project/OpenSearch/blob/8673fa937db405b8d614f8d4a02c0aa52587c037/server/src/main/java/org/opensearch/action/update/UpdateHelper.java#L106 , after that the new indexRequest will be sent to TransportBulkAction and then execute pipeline.

However, bulk api doesn't do that transformation before executing pipeline, so the behaviors are different. We may not transform updateRequest in TransportBulkAction because the method updateHelper.prepare() can only be called at shard level.

Another finding is that the behavior of executing pipeline between upsert AND doc_as_upsert are also different in bulk api, that's because of this line: https://github.com/opensearch-project/OpenSearch/blob/8673fa937db405b8d614f8d4a02c0aa52587c037/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java#L224.

When the document with ID 1 exists, and doc_as_upsert is true, the pipeline will be executed on the partial doc {"x":3, "y":5}:

curl -X POST "localhost:9200/_bulk?pretty" -H 'Content-Type: application/json' -d'
{ "update": { "_index": "test1", "_id": "1" } }
{ "doc" : {"x":3, "y":5}, "doc_as_upsert":true}

, but when upsert is set, the pipeline will be executed on the upsert doc {"x":1}, nothing changed because this doc will not be used anymore:

curl -X POST "localhost:9200/_bulk?pretty" -H 'Content-Type: application/json' -d'
{ "update": { "_index": "test1", "_id": "1" } }
{ "doc" : {"x":3, "y":5}, "upsert":{"x":1}}
peternied commented 7 months ago

[Triage - attendees 1 2 3 4] @acidul Thanks for filing this bug