snowflakedb / snowflake-kafka-connector

Snowflake Kafka Connector (Sink Connector)
Apache License 2.0
140 stars 98 forks source link

Connector newer than 2.2.0 is not able to ingest data to Snowflake DB hosted on GCS #988

Open AnatolyPopov opened 2 weeks ago

AnatolyPopov commented 2 weeks ago

After the upgrade of Kafka connector that is ingesting data into Snowfalke DB hosted in GCP it started to fail with an Exception. The stacktrace is attached to this case.

Nov 05 08:36:16 kafkaconnect-36385a43-1 kafka-connector[92435]: [2024-11-05 08:36:16,720] ERROR [test_snowflake|task-0] [SF_KAFKA_CONNECTOR] Put With Cache(uploadWithoutConnection) failed after multiple retries for stageName:SNOWFLAKE_KAFKA_CONNECTOR_test_snowflake_STAGE_test, stageType:GCS, fullFilePath:test_snowflake/test/0/0_9454_1730795755708.json.gz (com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceV1:94)
Nov 05 08:36:16 kafkaconnect-36385a43-1 kafka-connector[92435]: [2024-11-05 08:36:16,721] ERROR [test_snowflake|task-0] WorkerSinkTask{id=test_snowflake-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: [SF_KAFKA_CONNECTOR] Exception: Failed to upload file with cache
                                                                Error Code: 2011
                                                                Detail: Failed to upload file to Snowflake Stage though credential caching
                                                                Message: [SF_KAFKA_CONNECTOR] Exception: Max retry exceeded
                                                                Error Code: 2010
                                                                Detail: Api retry exceeded the max retry limit
                                                                Message: [SF_KAFKA_CONNECTOR] Exception: Failed to execute cached put
                                                                Error Code: 5018
                                                                Detail: Error in cached put command
                                                                Message: Connection refused
                                                                com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:381)
                                                                com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:339)
                                                                com.snowflake.kafka.connector.internal.SnowflakeInternalStage.putWithCache(SnowflakeInternalStage.java:202)
                                                                com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceV1.lambda$putWithCache$1(SnowflakeConnectionServiceV1.java:855)
                                                                com.snowflake.kafka.connector.internal.InternalUtils.backoffAndRetry(InternalUtils.java:373)
                                                                com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceV1.putWithCache(SnowflakeConnectionServiceV1.java:851)
                                                                com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.flush(SnowflakeSinkServiceV1.java:844)
                                                                com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.insert(SnowflakeSinkServiceV1.java:699)
                                                                com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.access$300(SnowflakeSinkServiceV1.java:373)
                                                                com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1.insert(SnowflakeSinkServiceV1.java:177)
                                                                com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1.insert(SnowflakeSinkServiceV1.java:153)
                                                                com.snowflake.kafka.connector.SnowflakeSinkTask.put(SnowflakeSinkTask.java:304)
                                                                org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)
                                                                org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)
                                                                org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
                                                                org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
                                                                org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
                                                                org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
                                                                org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)
                                                                java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
                                                                java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
                                                                java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
                                                                java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
                                                                java.base/java.lang.Thread.run(Thread.java:840)
                                                                com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:381)
                                                                com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:353)
                                                                com.snowflake.kafka.connector.internal.InternalUtils.backoffAndRetry(InternalUtils.java:383)
                                                                com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceV1.putWithCache(SnowflakeConnectionServiceV1.java:851)
                                                                com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.flush(SnowflakeSinkServiceV1.java:844)
                                                                com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.insert(SnowflakeSinkServiceV1.java:699)
                                                                com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.access$300(SnowflakeSinkServiceV1.java:373)
                                                                com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1.insert(SnowflakeSinkServiceV1.java:177)
                                                                com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1.insert(SnowflakeSinkServiceV1.java:153)
                                                                com.snowflake.kafka.connector.SnowflakeSinkTask.put(SnowflakeSinkTask.java:304)
                                                                org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)
                                                                org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)
                                                                org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
                                                                org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
                                                                org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
                                                                org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
                                                                org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)
                                                                java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
                                                                java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
                                                                java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
                                                                java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
                                                                java.base/java.lang.Thread.run(Thread.java:840) (org.apache.kafka.connect.runtime.WorkerSinkTask:630)
                                                                com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException: [SF_KAFKA_CONNECTOR] Exception: Failed to upload file with cache
                                                                Error Code: 2011
                                                                Detail: Failed to upload file to Snowflake Stage though credential caching
                                                                Message: [SF_KAFKA_CONNECTOR] Exception: Max retry exceeded
                                                                Error Code: 2010
                                                                Detail: Api retry exceeded the max retry limit
                                                                Message: [SF_KAFKA_CONNECTOR] Exception: Failed to execute cached put
                                                                Error Code: 5018
                                                                Detail: Error in cached put command
                                                                Message: Connection refused
                                                                com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:381)
                                                                com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:339)
                                                                com.snowflake.kafka.connector.internal.SnowflakeInternalStage.putWithCache(SnowflakeInternalStage.java:202)
                                                                com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceV1.lambda$putWithCache$1(SnowflakeConnectionServiceV1.java:855)
                                                                com.snowflake.kafka.connector.internal.InternalUtils.backoffAndRetry(InternalUtils.java:373)
                                                                com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceV1.putWithCache(SnowflakeConnectionServiceV1.java:851)
                                                                com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.flush(SnowflakeSinkServiceV1.java:844)
                                                                com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.insert(SnowflakeSinkServiceV1.java:699)
                                                                com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.access$300(SnowflakeSinkServiceV1.java:373)
                                                                com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1.insert(SnowflakeSinkServiceV1.java:177)
                                                                com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1.insert(SnowflakeSinkServiceV1.java:153)
                                                                com.snowflake.kafka.connector.SnowflakeSinkTask.put(SnowflakeSinkTask.java:304)
                                                                org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)
                                                                org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)
                                                                org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
                                                                org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
                                                                org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
                                                                org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
                                                                org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)
                                                                java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
                                                                java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
                                                                java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
                                                                java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
                                                                java.base/java.lang.Thread.run(Thread.java:840)
                                                                com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:381)
                                                                com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:353)
                                                                com.snowflake.kafka.connector.internal.InternalUtils.backoffAndRetry(InternalUtils.java:383)
                                                                com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceV1.putWithCache(SnowflakeConnectionServiceV1.java:851)
                                                                com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.flush(SnowflakeSinkServiceV1.java:844)
                                                                com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.insert(SnowflakeSinkServiceV1.java:699)
                                                                com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.access$300(SnowflakeSinkServiceV1.java:373)
                                                                com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1.insert(SnowflakeSinkServiceV1.java:177)
                                                                com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1.insert(SnowflakeSinkServiceV1.java:153)
                                                                com.snowflake.kafka.connector.SnowflakeSinkTask.put(SnowflakeSinkTask.java:304)
                                                                org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)
                                                                org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)
                                                                org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
                                                                org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
                                                                org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
                                                                org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
                                                                org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)
                                                                java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
                                                                java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
                                                                java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
                                                                java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
                                                                java.base/java.lang.Thread.run(Thread.java:840)
                                                                        at com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:381)
                                                                        at com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:353)
                                                                        at com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceV1.putWithCache(SnowflakeConnectionServiceV1.java:865)
                                                                        at com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.flush(SnowflakeSinkServiceV1.java:844)
                                                                        at com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.insert(SnowflakeSinkServiceV1.java:699)
                                                                        at com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.access$300(SnowflakeSinkServiceV1.java:373)
                                                                        at com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1.insert(SnowflakeSinkServiceV1.java:177)
                                                                        at com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1.insert(SnowflakeSinkServiceV1.java:153)
                                                                        at com.snowflake.kafka.connector.SnowflakeSinkTask.put(SnowflakeSinkTask.java:304)
                                                                        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)
                                                                        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)
                                                                        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
                                                                        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
                                                                        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
                                                                        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
                                                                        at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)
                                                                        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
                                                                        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
                                                                        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
                                                                        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
                                                                        at java.base/java.lang.Thread.run(Thread.java:840)

After some debugging I suspect that the issue is related to upgrade of JDBC driver and switch to down scope tokens for GCS.

Could you please help us to find the way to make it working?

sfc-gh-mbobowski commented 2 weeks ago

Hello @AnatolyPopov! I've never seen a problem like that before. Please contact Snowflake Support so they can gather more information on your setup and perhaps suggest some solution based on other customers experience. It will be easier for you and us to track the issue via support.

AnatolyPopov commented 2 weeks ago

Hi @sfc-gh-mbobowski, I've already created a support case for this and investigation is in progress, thanks!