aws-amplify / amplify-category-api

The AWS Amplify CLI is a toolchain for simplifying serverless web and mobile development. This plugin provides functionality for the API category, allowing for the creation and management of GraphQL and REST based backends for your amplify project.
https://docs.amplify.aws/
Apache License 2.0
89 stars 78 forks source link

OpenSearchStreamingLambda Unable to Delete Versioned Records #1873

Open dtJon opened 1 year ago

dtJon commented 1 year ago

How did you install the Amplify CLI?

yarn

If applicable, what version of Node.js are you using?

v18.0.0

Amplify CLI Version

12.4.0

What operating system are you using?

Mac

Did you make any manual changes to the cloud resources managed by Amplify? Please describe the changes made.

I enabled "TTL" on a DynamoDB table which was created as a GraphQL model.

Describe the bug

With @searchable specified on graphql model when the TTL process deletes a record I get a version error from the OpenSearchStreamingLambd lambda function whic is responsible for syncing to ElasticSearch.

If I was directly deleting the records using the GraphQL API myself I would increment the version and it would work fine.

Expected behavior

When a DynamoDB record is deleted for any reason I would expect it to be removed from the index.

Reproduction steps

  1. Create a new GraphQL model with the @searchable directive to cause it to sync to an ElasticSearch index.

    type Strike @model @searchable @auth(rules: [{ allow: public }]) {
    id: ID!
    timestamp: Int!
    latitude: Float!
    longitude: Float!
    dateTimeISO: String!
    pulseType: String
    _ttl: Int
    }
  2. Turn on TTL on the table in the DynamoDB console by going to the table, Update Settings, Time to Live (TTL) and specify the _ttl field.

  3. Create records with the TTL value set.

  4. After the TTL is surpassed note that the records are not deleted from the search index.

  5. Set the environment variable "DEBUG" to "1" on the Lambda function OpenSearchStreamingLambda.

  6. Note that the "delete" calls to ElasticSearch are failing with the following error:

    {
            "delete": {
                "_index": "strike",
                "_type": "_doc",
                "_id": "1695160177:26.772:-81.9166",
                "status": 409,
                "error": {
                    "type": "version_conflict_engine_exception",
                    "reason": "[1695160177:26.772:-81.9166]: version conflict, current version [1] is higher or equal to the one provided [1]",
                    "index_uuid": "dg27vTqPSZCXcjNZZRx_zw",
                    "shard": "4",
                    "index": "strike"
                }
            }
        },

Project Identifier

1daad88495bb6da2e81c50953d63dd59

Log output

``` # Put your logs below this line [ERROR] 2023-09-19T22:26:15.475Z d9c5d76d-1318-49bc-a79e-13631f3c2144 List of items with errors: [ { "index": { "_index": "strike", "_type": "_doc", "_id": "1695162267:26.0143:-80.7475", "status": 409, "error": { "type": "version_conflict_engine_exception", "reason": "[1695162267:26.0143:-80.7475]: version conflict, current version [1] is higher or equal to the one provided [1]", "index_uuid": "dg27vTqPSZCXcjNZZRx_zw", "shard": "0", "index": "strike" } } } , { "index": { "_index": "strike", "_type": "_doc", "_id": "1695162279:26.1014:-80.8857", "status": 409, "error": { "type": "version_conflict_engine_exception", "reason": "[1695162279:26.1014:-80.8857]: version conflict, current version [1] is higher or equal to the one provided [1]", "index_uuid": "dg27vTqPSZCXcjNZZRx_zw", "shard": "4", "index": "strike" } } } , { "index": { "_index": "strike", "_type": "_doc", "_id": "1695162277:39.6121:-105.5387", "status": 409, "error": { "type": "version_conflict_engine_exception", "reason": "[1695162277:39.6121:-105.5387]: version conflict, current version [1] is higher or equal to the one provided [1]", "index_uuid": "dg27vTqPSZCXcjNZZRx_zw", "shard": "4", "index": "strike" } } } , { "index": { "_index": "strike", "_type": "_doc", "_id": "1695162279:26.0634:-80.9293", "status": 409, "error": { "type": "version_conflict_engine_exception", "reason": "[1695162279:26.0634:-80.9293]: version conflict, current version [1] is higher or equal to the one provided [1]", "index_uuid": "dg27vTqPSZCXcjNZZRx_zw", "shard": "0", "index": "strike" } } } , { "index": { "_index": "strike", "_type": "_doc", "_id": "1695162266:25.9231:-80.9561", "status": 409, "error": { "type": "version_conflict_engine_exception", "reason": "[1695162266:25.9231:-80.9561]: version conflict, current version [1] is higher or equal to the one provided [1]", "index_uuid": "dg27vTqPSZCXcjNZZRx_zw", "shard": "0", "index": "strike" } } } , { "index": { "_index": "strike", "_type": "_doc", "_id": "1695162266:26.7787:-82.4085", "status": 409, "error": { "type": "version_conflict_engine_exception", "reason": "[1695162266:26.7787:-82.4085]: version conflict, current version [1] is higher or equal to the one provided [1]", "index_uuid": "dg27vTqPSZCXcjNZZRx_zw", "shard": "2", "index": "strike" } } } , { "index": { "_index": "strike", "_type": "_doc", "_id": "1695162265:26.0052:-80.9707", "status": 409, "error": { "type": "version_conflict_engine_exception", "reason": "[1695162265:26.0052:-80.9707]: version conflict, current version [1] is higher or equal to the one provided [1]", "index_uuid": "dg27vTqPSZCXcjNZZRx_zw", "shard": "1", "index": "strike" } } } , { "index": { "_index": "strike", "_type": "_doc", "_id": "1695162276:26.0057:-80.9281", "status": 409, "error": { "type": "version_conflict_engine_exception", "reason": "[1695162276:26.0057:-80.9281]: version conflict, current version [1] is higher or equal to the one provided [1]", "index_uuid": "dg27vTqPSZCXcjNZZRx_zw", "shard": "2", "index": "strike" } } } , { "index": { "_index": "strike", "_type": "_doc", "_id": "1695162276:26.8173:-82.4225", "status": 409, "error": { "type": "version_conflict_engine_exception", "reason": "[1695162276:26.8173:-82.4225]: version conflict, current version [1] is higher or equal to the one provided [1]", "index_uuid": "dg27vTqPSZCXcjNZZRx_zw", "shard": "3", "index": "strike" } } } , { "index": { "_index": "strike", "_type": "_doc", "_id": "1695162275:26.8562:-82.3871", "status": 409, "error": { "type": "version_conflict_engine_exception", "reason": "[1695162275:26.8562:-82.3871]: version conflict, current version [1] is higher or equal to the one provided [1]", "index_uuid": "dg27vTqPSZCXcjNZZRx_zw", "shard": "0", "index": "strike" } } } , { "index": { "_index": "strike", "_type": "_doc", "_id": "1695162275:26.8467:-82.416", "status": 409, "error": { "type": "version_conflict_engine_exception", "reason": "[1695162275:26.8467:-82.416]: version conflict, current version [1] is higher or equal to the one provided [1]", "index_uuid": "dg27vTqPSZCXcjNZZRx_zw", "shard": "1", "index": "strike" } } } , { "index": { "_index": "strike", "_type": "_doc", "_id": "1695162274:26.8718:-82.3754", "status": 409, "error": { "type": "version_conflict_engine_exception", "reason": "[1695162274:26.8718:-82.3754]: version conflict, current version [1] is higher or equal to the one provided [1]", "index_uuid": "dg27vTqPSZCXcjNZZRx_zw", "shard": "0", "index": "strike" } } } , { "index": { "_index": "strike", "_type": "_doc", "_id": "1695162285:40.4665:-107.6591", "status": 409, "error": { "type": "version_conflict_engine_exception", "reason": "[1695162285:40.4665:-107.6591]: version conflict, current version [1] is higher or equal to the one provided [1]", "index_uuid": "dg27vTqPSZCXcjNZZRx_zw", "shard": "3", "index": "strike" } } } , { "index": { "_index": "strike", "_type": "_doc", "_id": "1695162284:38.2422:-105.8139", "status": 409, "error": { "type": "version_conflict_engine_exception", "reason": "[1695162284:38.2422:-105.8139]: version conflict, current version [1] is higher or equal to the one provided [1]", "index_uuid": "dg27vTqPSZCXcjNZZRx_zw", "shard": "4", "index": "strike" } } } , { "index": { "_index": "strike", "_type": "_doc", "_id": "1695162283:25.9149:-80.9431", "status": 409, "error": { "type": "version_conflict_engine_exception", "reason": "[1695162283:25.9149:-80.9431]: version conflict, current version [1] is higher or equal to the one provided [1]", "index_uuid": "dg27vTqPSZCXcjNZZRx_zw", "shard": "1", "index": "strike" } } } , { "index": { "_index": "strike", "_type": "_doc", "_id": "1695162282:25.9903:-80.8288", "status": 409, "error": { "type": "version_conflict_engine_exception", "reason": "[1695162282:25.9903:-80.8288]: version conflict, current version [1] is higher or equal to the one provided [1]", "index_uuid": "dg27vTqPSZCXcjNZZRx_zw", "shard": "1", "index": "strike" } } } , { "index": { "_index": "strike", "_type": "_doc", "_id": "1695162281:25.9768:-81.017", "status": 409, "error": { "type": "version_conflict_engine_exception", "reason": "[1695162281:25.9768:-81.017]: version conflict, current version [1] is higher or equal to the one provided [1]", "index_uuid": "dg27vTqPSZCXcjNZZRx_zw", "shard": "0", "index": "strike" } } } , { "index": { "_index": "strike", "_type": "_doc", "_id": "1695162281:25.8627:-81.081", "status": 409, "error": { "type": "version_conflict_engine_exception", "reason": "[1695162281:25.8627:-81.081]: version conflict, current version [1] is higher or equal to the one provided [1]", "index_uuid": "dg27vTqPSZCXcjNZZRx_zw", "shard": "2", "index": "strike" } } } , { "index": { "_index": "strike", "_type": "_doc", "_id": "1695162281:25.9805:-80.912", "status": 409, "error": { "type": "version_conflict_engine_exception", "reason": "[1695162281:25.9805:-80.912]: version conflict, current version [1] is higher or equal to the one provided [1]", "index_uuid": "dg27vTqPSZCXcjNZZRx_zw", "shard": "1", "index": "strike" } } } , { "index": { "_index": "strike", "_type": "_doc", "_id": "1695162280:25.9642:-80.9185", "status": 409, "error": { "type": "version_conflict_engine_exception", "reason": "[1695162280:25.9642:-80.9185]: version conflict, current version [1] is higher or equal to the one provided [1]", "index_uuid": "dg27vTqPSZCXcjNZZRx_zw", "shard": "2", "index": "strike" } } } , { "index": { "_index": "strike", "_type": "_doc", "_id": "1695162280:25.9695:-80.9237", "status": 409, "error": { "type": "version_conflict_engine_exception", "reason": "[1695162280:25.9695:-80.9237]: version conflict, current version [1] is higher or equal to the one provided [1]", "index_uuid": "dg27vTqPSZCXcjNZZRx_zw", "shard": "1", "index": "strike" } } } , { "index": { "_index": "strike", "_type": "_doc", "_id": "1695162280:26.0546:-80.9512", "status": 409, "error": { "type": "version_conflict_engine_exception", "reason": "[1695162280:26.0546:-80.9512]: version conflict, current version [1] is higher or equal to the one provided [1]", "index_uuid": "dg27vTqPSZCXcjNZZRx_zw", "shard": "1", "index": "strike" } } } ] ```

Additional information

As mentioned earlier, if you explicitly delete it yourself using the GQL API it works fine. However, it is my feeling that the TTL feature is so useful, and common, that it should work with the search index as well.

In my case I modified the standard code to increment the version number, as follows, but it would be nice if I didn't have to do this.

elif is_ddb_delete:
            action = {'delete': {'_index': doc_opensearch_index_name,
                                '_type': doc_type, '_id': doc_id}}
            if OPENSEARCH_USE_EXTERNAL_VERSIONING and '_version' in doc_fields:
                action['delete'].update([
                    ('version_type', 'external'),
                    ('version', doc_fields['_version']+1) # This increment is my change
                ])
            # Action line with 'delete' directive
            opensearch_actions.append(json.dumps(action))

Before submitting, please confirm:

ykethan commented 1 year ago

Hey, 👋 thanks for raising this! I'm going to transfer this over to our API repository for better assistance 🙂.