airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
15.87k stars 4.07k forks source link

[kubernetes] want to increase number of workers in k8s #33567

Open sivankumar86 opened 10 months ago

sivankumar86 commented 10 months ago

Platform Version

0.50.34

What step the error happened?

None

Revelant information

Job is failing due to "Caused by: java.lang.NullPointerException: Cannot invoke "java.lang.Integer.intValue()" because the return value of "io.airbyte.workers.process.KubePortManagerSingleton.take()" is null"

Is there a way to increase worker based on number of jobs pending in k8s. Please provide me pointer to do the same.

I have tried CPU/Memory metrics but, no luck.

Relevant log output

io.airbyte.workers.exception.WorkerException: Failed to create pod for check step
        at io.airbyte.workers.process.KubeProcessFactory.create(KubeProcessFactory.java:188) ~[io.airbyte-airbyte-commons-worker-0.50.34.jar:?]
        at io.airbyte.workers.process.AirbyteIntegrationLauncher.check(AirbyteIntegrationLauncher.java:143) ~[io.airbyte-airbyte-commons-worker-0.50.34.jar:?]
        at io.airbyte.workers.general.DefaultCheckConnectionWorker.run(DefaultCheckConnectionWorker.java:71) ~[io.airbyte-airbyte-commons-worker-0.50.34.jar:?]
        at io.airbyte.workers.general.DefaultCheckConnectionWorker.run(DefaultCheckConnectionWorker.java:44) ~[io.airbyte-airbyte-commons-worker-0.50.34.jar:?]
        at io.airbyte.workers.temporal.TemporalAttemptExecution.get(TemporalAttemptExecution.java:135) ~[io.airbyte-airbyte-workers-0.50.34.jar:?]
        at io.airbyte.workers.temporal.check.connection.CheckConnectionActivityImpl.lambda$runWithJobOutput$1(CheckConnectionActivityImpl.java:133) ~[io.airbyte-airbyte-workers-0.50.34.jar:?]
        at io.airbyte.commons.temporal.HeartbeatUtils.withBackgroundHeartbeat(HeartbeatUtils.java:57) ~[io.airbyte-airbyte-commons-temporal-core-0.50.34.jar:?]
        at io.airbyte.workers.temporal.check.connection.CheckConnectionActivityImpl.runWithJobOutput(CheckConnectionActivityImpl.java:118) ~[io.airbyte-airbyte-workers-0.50.34.jar:?]
        at jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104) ~[?:?]
        at java.lang.reflect.Method.invoke(Method.java:578) ~[?:?]
        at io.temporal.internal.activity.RootActivityInboundCallsInterceptor$POJOActivityInboundCallsInterceptor.executeActivity(RootActivityInboundCallsInterceptor.java:64) ~[temporal-sdk-1.17.0.jar:?]
        at io.temporal.internal.activity.RootActivityInboundCallsInterceptor.execute(RootActivityInboundCallsInterceptor.java:43) ~[temporal-sdk-1.17.0.jar:?]
        at io.temporal.internal.activity.ActivityTaskExecutors$BaseActivityTaskExecutor.execute(ActivityTaskExecutors.java:95) ~[temporal-sdk-1.17.0.jar:?]
        at io.temporal.internal.activity.ActivityTaskHandlerImpl.handle(ActivityTaskHandlerImpl.java:92) ~[temporal-sdk-1.17.0.jar:?]
        at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handleActivity(ActivityWorker.java:241) ~[temporal-sdk-1.17.0.jar:?]
        at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:206) ~[temporal-sdk-1.17.0.jar:?]
        at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:179) ~[temporal-sdk-1.17.0.jar:?]
        at io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:93) ~[temporal-sdk-1.17.0.jar:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
        at java.lang.Thread.run(Thread.java:1589) ~[?:?]
Caused by: java.lang.NullPointerException: Cannot invoke "java.lang.Integer.intValue()" because the return value of "io.airbyte.workers.process.KubePortManagerSingleton.take()" is null
        at io.airbyte.workers.process.KubeProcessFactory.create(KubeProcessFactory.java:131) ~[io.airbyte-airbyte-commons-worker-0.50.34.jar:?]

similar to this

https://community.temporal.io/t/suggested-metrics-to-autoscale-temporal-workers-on/5870/3

marcosmarxm commented 9 months ago

Today you only can increase using the env variable. Some other users had created some external applications to auto scale based on the number of pods. The best path is to use cron and calculate the number of workers you'll need.

sivankumar86 commented 9 months ago

@marcosmarxm Thanks for reply. At the moment, we are running jobs 4 hours once and nightly jobs hence, it would be great if we can increase /decrease the workers based on total number of jobs (running + pending). is there a blog or document which you can refer to me ?