opensearch-project / opensearch-js

Node.js Client for OpenSearch
https://opensearch.org/docs/latest/clients/javascript/
Apache License 2.0
185 stars 120 forks source link

[BUG] client.helpers.bulk causes DeserializationError on batches of records with both INSERT and DELETE operations #192

Open abagnani opened 2 years ago

abagnani commented 2 years ago

Error: I found this bug while using the client.helpers.bulk helper function. When sending a batch of records into the function I was met with the following error: DeserializationError: Unexpected token u in JSON at position 0. This error is the result of trying to use the bulk helper on an array of records with both INSERT and DELETE operations.

Code: I was able to reproduce my error in the following script using a basic mock client and a simple batch of records as input.

#!/usr/bin/env node

const { Client } = require('@opensearch-project/opensearch');
const Mock = require('@elastic/elasticsearch-mock');

const mock = new Mock();

const client = new Client({
  node: 'http://localhost:9200',
  Connection: mock.getConnection(),
});

mock.add({
    method: 'POST',
    path: '/_bulk',
  }, () => ({
    errors: true,
    items: [
      { index: { _index: index, _id: '1', status: 404 } },
      { delete: { _index: index, _id: '2', status: 404 } },
      { index: { _index: index, _id: '3', status: 404 } },
    ],
  }));

const index = 'TEST_INDEX_NAME';

const testRecords = [
  { id: '1', key: 'test1' },
  { id: '2', key: 'test2', operation: 'remove' },
  { id: '3', key: 'test3' },
];

async function bulkTest () {
  const result = await client.helpers.bulk({
    datasource: testRecords,
    onDocument(doc) {
      if (doc.operation === 'remove') {
        return {
          delete: { _index: index, _id: doc.id },
        };
      }
      return {
        index: { _index: index, _id: doc.id },
      };
    },
    onDrop(doc) {
      console.log(doc);
    },
  });
  return result;
};

(async function() {
    const result = await bulkTest();
    console.log(result);
})();

The output of this script (test.js) is:

$ ./test.js
{
  status: 404,
  error: undefined,
  operation: { index: { _index: 'TEST_INDEX_NAME', _id: '1' } },
  document: { id: '1', key: 'test1' },
  retried: false
}
{
  status: 404,
  error: undefined,
  operation: { id: '1', key: 'test1' },
  document: null,
  retried: false
}
/@opensearch-project/opensearch/lib/Serializer.js:65
      throw new DeserializationError(err.message, json)
      ^
DeserializationError: Unexpected token u in JSON at position 0
    at Serializer.deserialize (/@opensearch-project/opensearch/lib/Serializer.js:65:13)
    at /@opensearch-project/opensearch/lib/Helpers.js:739:34
    at onBody (/@opensearch-project/opensearch/lib/Transport.js:368:9)
    at Class.onEnd (/@opensearch-project/opensearch/lib/Transport.js:285:11)
    ...
  data: undefined
}

Problem: In the spirit of trying to make fewer but larger bulk calls in order to maximize efficiency, I added a new field (operation) to each 'delete' record in my lambda handler function. This lets me send all records, regardless of their operation, into one client.helpers.bulk call (as can be seen above). However, client.helpers.bulk produces this error for batches of records that contain both types of operations.

Stepping through the code, I was able to pinpoint this error as coming from a nonuniform stride of two arrays, bulkBody and items from Helper.js. In this line, opensearch is trying to create an index (indexSlice) for bulkBody to find a document that corresponds to the current index of items. Since the batch has both index and delete operations, bulkBody is an array of operation objects AND document objects (because indexed records add both to this array, but deleted records only add one). So, you no longer can use just the index of a record in items to find its corresponding document in bulkBody.

In the event of an error, this incorrect indexSlice value causes problems when trying to grab the right document to retry and trying to display the correct OnDrop output, seen here. In my case, the value pulled from the indexSlice + 1 index of bulkBody is undefined (it is out of the scope of the array). Therefore, the value undefined is unable to deserialize here and the DeserializationError occurs.

Solution: Maybe indexSlice can be calculated correctly by adding some way to count how many records in the array were 'deletes' so far and doing some fancy math!

wbeckler commented 1 year ago

Would you have a suggestion as to the optimal data structure that should be output?