opensearch-project / data-prepper

Data Prepper is a component of the OpenSearch project that accepts, filters, transforms, enriches, and routes data at scale.
https://opensearch.org/docs/latest/clients/data-prepper/index/
Apache License 2.0
259 stars 191 forks source link

[BUG] dynamodb source hangs if initial OCUs are not enough in AWS ingestion pipeline #3914

Open jonparker opened 9 months ago

jonparker commented 9 months ago

Describe the bug

AWS Opensearch Ingestion pipeline with DynamoDB

  1. DynamoDB export
  2. DynamoDB stream
  3. Opensearch sink
  4. DDB table more than 1.7 gigabytes with 2.1m records
  5. Issue: No records are sent to the sink and stopping and increasing the OCU min and max does not fix the issue.

The pipeline circuit breaker flips open and closed.

2024-01-04T01:29:28.016 [pool-3-thread-1] INFO  org.opensearch.dataprepper.breaker.HeapCircuitBreaker - Circuit breaker tripped and open. 6442470568 used memory bytes > 6442450944 configured

2024-01-04T01:29:30.342 [pool-3-thread-1] INFO  org.opensearch.dataprepper.breaker.HeapCircuitBreaker - Circuit breaker closed. 6423596448 used memory bytes <= 6442450944 configured

After stopping the pipeline and re-configuring it to have more min and max OCUs (6-10 Ingestion-OCU) the flip flop of the circuit breaker no longer happens but it gets stuck trying to process the shards and continuously reports 0 shards found or no shards acquired.

2024-01-04T05:58:20.517 [pool-13-thread-1] INFO  org.opensearch.dataprepper.plugins.source.dynamodb.leader.ShardManager - Listing shards (DescribeStream call) took 25 milliseconds with 0 shards found

2024-01-04T05:58:41.941 [pool-13-thread-4] INFO  org.opensearch.dataprepper.plugins.source.dynamodb.stream.StreamScheduler - No new shards acquired after 250 attempts. This means that all shards are currently being consumed, or that the export is still in progress. New shards will not be consumed until the export is fully processed.

To Reproduce Steps to reproduce the behavior:

  1. Create DDB table with 1.7 gigabytes of data and 2.1m records.
  2. Create pipeline with min OCUs of 1 and max OCUs of 4:
version: "2"
dynamodb-pipeline:
  source:
    dynamodb:
      tables:
        - table_arn: 'arn:aws:dynamodb:us-east-2:123456789:table/MyTable'
          stream:
            start_position: "LATEST"
          export:
            s3_bucket: "opensearch-ingestion"
            s3_region: "us-east-2"
            s3_prefix: "export/"
      aws:
        sts_role_arn: "arn:aws:iam::123456789:role/opensearch-ingestion-pipeline"
        region: "us-east-2"
  sink:
    - opensearch:
        hosts:
          [
            "https://123456789.us-east-2.aoss.amazonaws.com"
          ]
        index: "mytableindex"
        index_type: "custom"
        document_id: "${getMetadata(\"primary_key\")}"
        action: "${getMetadata(\"opensearch_action\")}"
        document_version: "${getMetadata(\"document_version\")}"
        document_version_type: "external"
        aws:
          sts_role_arn: "arn:aws:iam::123456789:role/opensearch-ingestion-pipeline"
          region: "us-east-2"
          serverless: true
          network_policy_name: "easy-my-data"

Expected behavior The pipeline recovers and processes all records.

Environment (please complete the following information):

Additional context Add any other context about the problem here.

graytaylor0 commented 9 months ago

So in both cases, there are still no records being sent to the sink? If the export is not fully completed (with all data from the export making it to the sink), then the pipeline will never begin processing the latest shards from streams, as indicated by the INFO log that says no new shards have been acquired. It is difficult to know why the sink would not be receiving any records without knowing the full logs and metrics

jonparker commented 9 months ago

@graytaylor0 Actually after another check I realised there is a difference between the two pipelines.

The one that failed had a processor:

processor:
    - drop_events:
        drop_when: "/source == 'Source1' or contains(/created, '2021-') or contains(/created, '2020-')"

I removed the processor from the second pipeline as I wasn't sure if it was the cause of the issue.

However for the broken pipeline I can see it is logging INFO org.opensearch.dataprepper.plugins.source.dynamodb.export.DataFileScheduler - All Exports are done, streaming can continue... but then there are no documents sent to the sink and it just logs INFO org.opensearch.dataprepper.plugins.source.dynamodb.leader.ShardManager - Listing shards (DescribeStream call) took 27 milliseconds with 0 shards found multiple times.

There is currently no data being written to the DDB table so it makes sense that there would be nothing in the DDB stream but it should still send all the records from the S3 export to the sink.

These are the only log messages from the org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink logger:

image

The successful pipeline that I created with a bigger min and max OCU was successful and I can see the org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink logger logged out some warning that the first pipeline didn't:

image

graytaylor0 commented 9 months ago

It looks like in your first pipeline, the drop_events processor was dropping all of the export data, keeping it from going into OpenSearch.

On this second pipeline, it looks like document errors on the sink. Is this happening for all documents or select documents? It is recommended to configure a DLQ to send failed documents to.

Also, the error messages from OpenSearch are cut off, but it looks like there is something in your data that OpenSearch does not like with the created field

jonparker commented 9 months ago

Ok I guess the processor might have dropped all events but I would have expected some informational logs to indicate that all the records have at least been processed. I guess the processor doesn't know how many records there are or even if they have come from an export or a stream but it would be great if it could log out some high level metrics on what it has processed for visibility.

The errors with the other pipeline are fine as they are only for a few records that have dates that don't have valid UTC values. I can see in the pipeline metrics that the second pipeline successfully sent most of the records to the sink and I can also see them in the Open search collection.