airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
16.11k stars 4.12k forks source link

[worker] NullPointerException during connection check #36757

Open maver1ck opened 7 months ago

maver1ck commented 7 months ago

Helm Chart Version

0.53.1

What step the error happened?

During the Sync

Revelant information

I'm trying to check MySQL destination. I'm getting NullPointerException in worker logs. nation-mysql-check-bb960379-ba10-44d8-80c4-853fb4c62c70-0-piniu is finishing properly

Relevant log output

2024-04-02 18:03:45 platform > nation-mysql-check-bb960379-ba10-44d8-80c4-853fb4c62c70-0-piniu stdoutLocalPort = 9026
2024-04-02 18:03:45 platform > nation-mysql-check-bb960379-ba10-44d8-80c4-853fb4c62c70-0-piniu stderrLocalPort = 9027
2024-04-02 18:03:45 platform >
2024-04-02 18:03:45 platform > ----- START CHECK -----
2024-04-02 18:03:45 platform >
2024-04-02 18:03:45 platform > Creating stderr socket server...
2024-04-02 18:03:45 platform > Creating pod nation-mysql-check-bb960379-ba10-44d8-80c4-853fb4c62c70-0-piniu...
2024-04-02 18:03:45 platform > Creating stdout socket server...
2024-04-02 18:03:46 platform > Waiting for init container to be ready before copying files...
2024-04-02 18:03:47 platform > Init container ready..
2024-04-02 18:03:47 platform > Copying files...
2024-04-02 18:03:47 platform > Uploading file: source_config.json
2024-04-02 18:03:47 platform > kubectl cp /tmp/083aa8c7-0554-4a30-81bf-2fbedeb58105/source_config.json airbyte/nation-mysql-check-bb960379-ba10-44d8-80c4-853fb4c62c70-0-piniu:/config/source_config.json -c init --retries=3
2024-04-02 18:03:47 platform > Waiting for kubectl cp to complete
2024-04-02 18:03:48 platform > kubectl cp complete, closing process
2024-04-02 18:03:48 platform > Uploading file: FINISHED_UPLOADING
2024-04-02 18:03:48 platform > kubectl cp /tmp/cd3f743c-de61-407a-8f1f-54de09a23e7d/FINISHED_UPLOADING airbyte/nation-mysql-check-bb960379-ba10-44d8-80c4-853fb4c62c70-0-piniu:/config/FINISHED_UPLOADING -c init --retries=3
2024-04-02 18:03:48 platform > Waiting for kubectl cp to complete
2024-04-02 18:03:48 platform > kubectl cp complete, closing process
2024-04-02 18:03:48 platform > Waiting until pod is ready...
2024-04-02 18:03:51 platform > Reading pod IP...
2024-04-02 18:03:51 platform > Pod IP: 10.67.0.123
2024-04-02 18:03:51 platform > Using null stdin output stream...
2024-04-02 18:03:51 platform > Unable to gobble line(s) from input stream provided by generic:  input stream is null.
2024-04-02 18:03:51 platform > Unexpected error while checking connection:
java.lang.NullPointerException: null
    at java.base/java.io.Reader.<init>(Reader.java:168) ~[?:?]
    at java.base/java.io.InputStreamReader.<init>(InputStreamReader.java:123) ~[?:?]
    at io.airbyte.commons.io.IOs.newBufferedReader(IOs.java:74) ~[io.airbyte-airbyte-commons-0.53.1.jar:?]
    at io.airbyte.workers.WorkerUtils.getMessagesByType(WorkerUtils.java:205) ~[io.airbyte-airbyte-commons-worker-0.53.1.jar:?]
    at io.airbyte.workers.general.DefaultCheckConnectionWorker.run(DefaultCheckConnectionWorker.java:81) ~[io.airbyte-airbyte-commons-worker-0.53.1.jar:?]
    at io.airbyte.workers.general.DefaultCheckConnectionWorker.run(DefaultCheckConnectionWorker.java:44) ~[io.airbyte-airbyte-commons-worker-0.53.1.jar:?]
    at io.airbyte.workers.temporal.TemporalAttemptExecution.get(TemporalAttemptExecution.java:142) ~[io.airbyte-airbyte-workers-0.53.1.jar:?]
    at io.airbyte.workers.temporal.check.connection.CheckConnectionActivityImpl.lambda$runWithJobOutput$1(CheckConnectionActivityImpl.java:226) ~[io.airbyte-airbyte-workers-0.53.1.jar:?]
    at io.airbyte.commons.temporal.HeartbeatUtils.withBackgroundHeartbeat(HeartbeatUtils.java:57) ~[io.airbyte-airbyte-commons-temporal-core-0.53.1.jar:?]
    at io.airbyte.workers.temporal.check.connection.CheckConnectionActivityImpl.runWithJobOutput(CheckConnectionActivityImpl.java:211) ~[io.airbyte-airbyte-workers-0.53.1.jar:?]
    at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) ~[?:?]
    at java.base/java.lang.reflect.Method.invoke(Method.java:580) ~[?:?]
    at io.temporal.internal.activity.RootActivityInboundCallsInterceptor$POJOActivityInboundCallsInterceptor.executeActivity(RootActivityInboundCallsInterceptor.java:64) ~[temporal-sdk-1.22.3.jar:?]
    at io.temporal.internal.activity.RootActivityInboundCallsInterceptor.execute(RootActivityInboundCallsInterceptor.java:43) ~[temporal-sdk-1.22.3.jar:?]
    at io.temporal.internal.activity.ActivityTaskExecutors$BaseActivityTaskExecutor.execute(ActivityTaskExecutors.java:107) ~[temporal-sdk-1.22.3.jar:?]
    at io.temporal.internal.activity.ActivityTaskHandlerImpl.handle(ActivityTaskHandlerImpl.java:124) ~[temporal-sdk-1.22.3.jar:?]
    at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handleActivity(ActivityWorker.java:278) ~[temporal-sdk-1.22.3.jar:?]
    at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:243) ~[temporal-sdk-1.22.3.jar:?]
    at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:216) ~[temporal-sdk-1.22.3.jar:?]
    at io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:105) ~[temporal-sdk-1.22.3.jar:?]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
    at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
2024-04-02 18:03:51 INFO i.a.c.t.HeartbeatUtils(withBackgroundHeartbeat):64 - Stopping temporal heartbeating...
2024-04-02 18:03:51 platform >
2024-04-02 18:03:51 platform > ----- END CHECK -----
2024-04-02 18:03:51 platform >
2024-04-02 18:03:51 INFO i.a.c.t.HeartbeatUtils(withBackgroundHeartbeat):73 - Temporal heartbeating stopped.
2024-04-02 18:03:51 WARN i.t.i.a.ActivityTaskExecutors$BaseActivityTaskExecutor(execute):126 - Activity failure. ActivityId=c8eb51e0-089c-3170-ab2b-a228f19559d0, activityType=RunWithJobOutput, attempt=1
java.lang.RuntimeException: io.temporal.serviceclient.CheckedExceptionWrapper: io.airbyte.workers.exception.WorkerException: Unexpected error while getting checking connection.
    at io.airbyte.commons.temporal.HeartbeatUtils.withBackgroundHeartbeat(HeartbeatUtils.java:62) ~[io.airbyte-airbyte-commons-temporal-core-0.53.1.jar:?]
    at io.airbyte.workers.temporal.check.connection.CheckConnectionActivityImpl.runWithJobOutput(CheckConnectionActivityImpl.java:211) ~[io.airbyte-airbyte-workers-0.53.1.jar:?]
    at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) ~[?:?]
    at java.base/java.lang.reflect.Method.invoke(Method.java:580) ~[?:?]
    at io.temporal.internal.activity.RootActivityInboundCallsInterceptor$POJOActivityInboundCallsInterceptor.executeActivity(RootActivityInboundCallsInterceptor.java:64) ~[temporal-sdk-1.22.3.jar:?]
    at io.temporal.internal.activity.RootActivityInboundCallsInterceptor.execute(RootActivityInboundCallsInterceptor.java:43) ~[temporal-sdk-1.22.3.jar:?]
    at io.temporal.internal.activity.ActivityTaskExecutors$BaseActivityTaskExecutor.execute(ActivityTaskExecutors.java:107) ~[temporal-sdk-1.22.3.jar:?]
    at io.temporal.internal.activity.ActivityTaskHandlerImpl.handle(ActivityTaskHandlerImpl.java:124) ~[temporal-sdk-1.22.3.jar:?]
    at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handleActivity(ActivityWorker.java:278) ~[temporal-sdk-1.22.3.jar:?]
    at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:243) ~[temporal-sdk-1.22.3.jar:?]
    at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:216) ~[temporal-sdk-1.22.3.jar:?]
    at io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:105) ~[temporal-sdk-1.22.3.jar:?]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
    at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
Caused by: io.temporal.serviceclient.CheckedExceptionWrapper: io.airbyte.workers.exception.WorkerException: Unexpected error while getting checking connection.
    at io.temporal.serviceclient.CheckedExceptionWrapper.wrap(CheckedExceptionWrapper.java:57) ~[temporal-serviceclient-1.22.3.jar:?]
    at io.temporal.internal.sync.WorkflowInternal.wrap(WorkflowInternal.java:539) ~[temporal-sdk-1.22.3.jar:?]
    at io.temporal.activity.Activity.wrap(Activity.java:52) ~[temporal-sdk-1.22.3.jar:?]
    at io.airbyte.workers.temporal.TemporalAttemptExecution.get(TemporalAttemptExecution.java:147) ~[io.airbyte-airbyte-workers-0.53.1.jar:?]
    at io.airbyte.workers.temporal.check.connection.CheckConnectionActivityImpl.lambda$runWithJobOutput$1(CheckConnectionActivityImpl.java:226) ~[io.airbyte-airbyte-workers-0.53.1.jar:?]
    at io.airbyte.commons.temporal.HeartbeatUtils.withBackgroundHeartbeat(HeartbeatUtils.java:57) ~[io.airbyte-airbyte-commons-temporal-core-0.53.1.jar:?]
    ... 14 more
Caused by: io.airbyte.workers.exception.WorkerException: Unexpected error while getting checking connection.
    at io.airbyte.workers.general.DefaultCheckConnectionWorker.run(DefaultCheckConnectionWorker.java:135) ~[io.airbyte-airbyte-commons-worker-0.53.1.jar:?]
    at io.airbyte.workers.general.DefaultCheckConnectionWorker.run(DefaultCheckConnectionWorker.java:44) ~[io.airbyte-airbyte-commons-worker-0.53.1.jar:?]
    at io.airbyte.workers.temporal.TemporalAttemptExecution.get(TemporalAttemptExecution.java:142) ~[io.airbyte-airbyte-workers-0.53.1.jar:?]
    at io.airbyte.workers.temporal.check.connection.CheckConnectionActivityImpl.lambda$runWithJobOutput$1(CheckConnectionActivityImpl.java:226) ~[io.airbyte-airbyte-workers-0.53.1.jar:?]
    at io.airbyte.commons.temporal.HeartbeatUtils.withBackgroundHeartbeat(HeartbeatUtils.java:57) ~[io.airbyte-airbyte-commons-temporal-core-0.53.1.jar:?]
    ... 14 more
Caused by: java.lang.NullPointerException
    at java.base/java.io.Reader.<init>(Reader.java:168) ~[?:?]
    at java.base/java.io.InputStreamReader.<init>(InputStreamReader.java:123) ~[?:?]
    at io.airbyte.commons.io.IOs.newBufferedReader(IOs.java:74) ~[io.airbyte-airbyte-commons-0.53.1.jar:?]
    at io.airbyte.workers.WorkerUtils.getMessagesByType(WorkerUtils.java:205) ~[io.airbyte-airbyte-commons-worker-0.53.1.jar:?]
    at io.airbyte.workers.general.DefaultCheckConnectionWorker.run(DefaultCheckConnectionWorker.java:81) ~[io.airbyte-airbyte-commons-worker-0.53.1.jar:?]
    at io.airbyte.workers.general.DefaultCheckConnectionWorker.run(DefaultCheckConnectionWorker.java:44) ~[io.airbyte-airbyte-commons-worker-0.53.1.jar:?]
    at io.airbyte.workers.temporal.TemporalAttemptExecution.get(TemporalAttemptExecution.java:142) ~[io.airbyte-airbyte-workers-0.53.1.jar:?]
    at io.airbyte.workers.temporal.check.connection.CheckConnectionActivityImpl.lambda$runWithJobOutput$1(CheckConnectionActivityImpl.java:226) ~[io.airbyte-airbyte-workers-0.53.1.jar:?]
    at io.airbyte.commons.temporal.HeartbeatUtils.withBackgroundHeartbeat(HeartbeatUtils.java:57) ~[io.airbyte-airbyte-commons-temporal-core-0.53.1.jar:?]
    ... 14 more
2024-04-02 18:03:52 platform > Setting stdout...
2024-04-02 18:03:54 INFO i.a.w.p.ExitCodeWatcher(persistExitCode):117 - Received exit code 0 for pod nation-mysql-check-bb960379-ba10-44d8-80c4-853fb4c62c70-0-piniu
A
maver1ck commented 7 months ago

I'm pretty sure this is a race condition between Stdout and stderr listeners and HeartbeatUtils trying to read from not initialized streams.

Adding Thread.sleep(5000) in this line solved my issue. But there should be something less hacky. https://github.com/airbytehq/airbyte-platform/blob/7c5419626b4c285a4f0a842076e52b6324bfffe8/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/KubePodProcess.java#L704

marcosmarxm commented 6 months ago

Are you still experiencing the issue @maver1ck ?

maver1ck commented 6 months ago

@marcosmarxm I have own clone of Airbyte with Thread.sleep(20000) added. This works for me. Have anything changed in the codebase to retry with new Airbyte version ?

index 89cb90dbd6..2d20bde27c 100644
--- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/KubePodProcess.java
+++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/KubePodProcess.java
@@ -688,6 +688,9 @@ public class KubePodProcess implements KubePod {
         LOGGER.info("Using null stdin output stream...");
         this.stdin = NullOutputStream.NULL_OUTPUT_STREAM;
       }
+      LOGGER.info("Sleeping 20sec");
+      Thread.sleep(20000);
+      LOGGER.info("Sleep ends");
     } catch (final Exception e) {
       // We need to make sure the ports are offered back
       cleanup();
ogirardot commented 5 months ago

reproduced using helm chart version airbyte-0.148.1, app version 0.62.4 is there any workaround ?

maver1ck commented 5 months ago

@ogirardot Compile your own version of Airbyte with change from above included. But this is not a full solution.