airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
15.78k stars 4.04k forks source link

[source-aws datalake] Destination process exited with non-zero exit code 137 #39128

Open guilhermenoronha opened 4 months ago

guilhermenoronha commented 4 months ago

Connector Name

source-aws datalake

Connector Version

0.1.7

What step the error happened?

During the sync

Relevant information

My connector is getting this error 137 every time I try to extract a data. Something I noticed is the connector getting error when the extraction comes to approximately 400mb of data in the source. Here is some metadata of the extraction when the error occurred:

11:43AM 06/05/2024 |398.44 MB| 2,850 records extracted |no records loaded| Job id: 214| 17m 21s

Here is the full log:

default_workspace_job_214_attempt_2.txt

Relevant log output

}, {
  "failureOrigin" : "destination",
  "internalMessage" : "Destination process message delivery failed",
  "externalMessage" : "Something went wrong within the destination connector",
  "metadata" : {
    "attemptNumber" : 1,
    "jobId" : 214,
    "connector_command" : "write"
  },
  "stacktrace" : "io.airbyte.workers.internal.exception.DestinationException: Destination process message delivery failed\n\tat io.airbyte.workers.general.BufferedReplicationWorker.writeToDestination(BufferedReplicationWorker.java:465)\n\tat io.airbyte.workers.general.BufferedReplicationWorker.lambda$runAsyncWithTimeout$5(BufferedReplicationWorker.java:263)\n\tat java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)\n\tat java.base/java.lang.Thread.run(Thread.java:1583)\nCaused by: java.io.IOException: Stream closed\n\tat java.base/java.lang.ProcessBuilder$NullOutputStream.write(ProcessBuilder.java:447)\n\tat java.base/java.io.OutputStream.write(OutputStream.java:167)\n\tat java.base/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:125)\n\tat java.base/java.io.BufferedOutputStream.implFlush(BufferedOutputStream.java:252)\n\tat java.base/java.io.BufferedOutputStream.flush(BufferedOutputStream.java:246)\n\tat java.base/sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:412)\n\tat java.base/sun.nio.cs.StreamEncoder.lockedFlush(StreamEncoder.java:214)\n\tat java.base/sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:201)\n\tat java.base/java.io.OutputStreamWriter.flush(OutputStreamWriter.java:262)\n\tat java.base/java.io.BufferedWriter.implFlush(BufferedWriter.java:372)\n\tat java.base/java.io.BufferedWriter.flush(BufferedWriter.java:359)\n\tat io.airbyte.workers.internal.DefaultAirbyteMessageBufferedWriter.flush(DefaultAirbyteMessageBufferedWriter.java:31)\n\tat io.airbyte.workers.internal.DefaultAirbyteDestination.notifyEndOfInputWithNoTimeoutMonitor(DefaultAirbyteDestination.java:155)\n\tat io.airbyte.workers.internal.DefaultAirbyteDestination.notifyEndOfInput(DefaultAirbyteDestination.java:145)\n\tat io.airbyte.workers.general.BufferedReplicationWorker.writeToDestination(BufferedReplicationWorker.java:459)\n\t... 5 more\n",
  "timestamp" : 1717599560971
}, {
  "failureOrigin" : "source",
  "internalMessage" : "Source process read attempt failed",
  "externalMessage" : "Something went wrong within the source connector",
  "metadata" : {
    "attemptNumber" : 1,
    "jobId" : 214,
    "connector_command" : "read"
  },
  "stacktrace" : "io.airbyte.workers.internal.exception.SourceException: Source process read attempt failed\n\tat io.airbyte.workers.general.BufferedReplicationWorker.readFromSource(BufferedReplicationWorker.java:389)\n\tat io.airbyte.workers.general.BufferedReplicationWorker.lambda$runAsyncWithHeartbeatCheck$3(BufferedReplicationWorker.java:242)\n\tat java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)\n\tat java.base/java.lang.Thread.run(Thread.java:1583)\nCaused by: java.lang.IllegalStateException: Source process is still alive, cannot retrieve exit value.\n\tat com.google.common.base.Preconditions.checkState(Preconditions.java:515)\n\tat io.airbyte.workers.internal.DefaultAirbyteSource.getExitValue(DefaultAirbyteSource.java:136)\n\tat io.airbyte.workers.general.BufferedReplicationWorker.readFromSource(BufferedReplicationWorker.java:375)\n\t... 5 more\n",
  "timestamp" : 1717599577802
}, {
  "failureOrigin" : "replication",
  "internalMessage" : "java.io.IOException: Stream closed",
  "externalMessage" : "Something went wrong during replication",
  "metadata" : {
    "attemptNumber" : 1,
    "jobId" : 214
  },
  "stacktrace" : "java.lang.RuntimeException: java.io.IOException: Stream closed\n\tat io.airbyte.workers.general.BufferedReplicationWorker$CloseableWithTimeout.lambda$close$0(BufferedReplicationWorker.java:538)\n\tat io.airbyte.workers.general.BufferedReplicationWorker.lambda$runAsyncWithTimeout$5(BufferedReplicationWorker.java:263)\n\tat java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)\n\tat java.base/java.lang.Thread.run(Thread.java:1583)\nCaused by: java.io.IOException: Stream closed\n\tat java.base/java.lang.ProcessBuilder$NullOutputStream.write(ProcessBuilder.java:447)\n\tat java.base/java.io.OutputStream.write(OutputStream.java:167)\n\tat java.base/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:125)\n\tat java.base/java.io.BufferedOutputStream.implFlush(BufferedOutputStream.java:252)\n\tat java.base/java.io.BufferedOutputStream.flush(BufferedOutputStream.java:246)\n\tat java.base/sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:412)\n\tat java.base/sun.nio.cs.StreamEncoder.lockedFlush(StreamEncoder.java:214)\n\tat java.base/sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:201)\n\tat java.base/java.io.OutputStreamWriter.flush(OutputStreamWriter.java:262)\n\tat java.base/java.io.BufferedWriter.implFlush(BufferedWriter.java:372)\n\tat java.base/java.io.BufferedWriter.flush(BufferedWriter.java:359)\n\tat io.airbyte.workers.internal.DefaultAirbyteMessageBufferedWriter.flush(DefaultAirbyteMessageBufferedWriter.java:31)\n\tat io.airbyte.workers.internal.DefaultAirbyteDestination.notifyEndOfInputWithNoTimeoutMonitor(DefaultAirbyteDestination.java:155)\n\tat io.airbyte.workers.internal.DefaultAirbyteDestination.notifyEndOfInput(DefaultAirbyteDestination.java:145)\n\tat io.airbyte.workers.internal.DefaultAirbyteDestination.close(DefaultAirbyteDestination.java:170)\n\tat io.airbyte.workers.general.BufferedReplicationWorker$CloseableWithTimeout.lambda$close$0(BufferedReplicationWorker.java:536)\n\t... 5 more\n",
  "timestamp" : 1717599577802
} ]
2024-06-05 15:00:37 platform > 
2024-06-05 15:00:37 platform > ----- END REPLICATION -----
2024-06-05 15:00:37 platform > 
2024-06-05 15:00:38 platform > Retry State: RetryManager(completeFailureBackoffPolicy=BackoffPolicy(minInterval=PT10S, maxInterval=PT30M, base=3), partialFailureBackoffPolicy=null, successiveCompleteFailureLimit=5, totalCompleteFailureLimit=10, successivePartialFailureLimit=1000, totalPartialFailureLimit=10, successiveCompleteFailures=2, totalCompleteFailures=2, successivePartialFailures=0, totalPartialFailures=0)
 Backoff before next attempt: 30 seconds

Contribute

marcosmarxm commented 4 months ago

@guilhermenoronha did you try to increase resurces used by the source/destination? Exist 137 means mostly OOM

guilhermenoronha commented 3 months ago

I set up the following variables:

JOB_MAIN_CONTAINER_MEMORY_REQUEST=3Gi JOB_MAIN_CONTAINER_MEMORY_LIMIT=6Gi CHECK_JOB_MAIN_CONTAINER_MEMORY_REQUEST=3Gi CHECK_JOB_MAIN_CONTAINER_MEMORY_LIMIT=6Gi

As I concern, the changes took effect as shown in the log below:

Preparing command: docker run --rm --init -i -w /data/514/0 --log-driver none --name source-declarative-manifest-check-514-0-wfels --network host -v airbyte_workspace:/data -v oss_local_root:/local -e DEPLOYMENT_MODE=OSS -e WORKER_CONNECTOR_IMAGE=airbyte/source-declarative-manifest:0.85.0 -e AUTO_DETECT_SCHEMA=true -e LAUNCHDARKLY_KEY= -e SOCAT_KUBE_CPU_REQUEST=0.1 -e SOCAT_KUBE_CPU_LIMIT=2.0 -e FIELD_SELECTION_WORKSPACES= -e USE_STREAM_CAPABLE_STATE=true -e WORKER_ENVIRONMENT=DOCKER -e AIRBYTE_ROLE=dev -e APPLY_FIELD_SELECTION=false -e WORKER_JOB_ATTEMPT=0 -e OTEL_COLLECTOR_ENDPOINT=http://host.docker.internal:4317 -e FEATURE_FLAG_CLIENT=config -e AIRBYTE_VERSION=0.58.0 -e WORKER_JOB_ID=514 --memory-reservation=3Gi --memory=6Gi airbyte/source-declarative-manifest:0.85.0 check --config source_config.json

However, it didn't work. Checking the memory usage on the Airbyte machine, it remains stable between 3G and 5G of usage (considering the whole Airbyte architectures). See the print below:

image

Any ideas?

henriblancke commented 1 month ago

The datalake destination keeps records in memory and automatically flushes them to athena every 10000 records. It is possible that that 10k limit is to high for the type data you are trying to sync (judging from your output 398.44 MB| 2,850) and that can cause an OOM in the destination processs. I think the best way forward here is to allow for configuring that limit in the config instead of hardcoding it.