snowflakedb / snowflake-kafka-connector

Snowflake Kafka Connector (Sink Connector)
Apache License 2.0
130 stars 92 forks source link

No way to set connection properties, connector fails with "Authentication token has expired. The user must authenticate again." #525

Open amkartashov opened 1 year ago

amkartashov commented 1 year ago

Snowflake connector does not accept connection properties in snowflake URL: https://github.com/snowflakedb/snowflake-kafka-connector/blob/4d358495ea172244ede28c42c39fb20b559f1fad/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeURL.java#L53

We need to be able to pass CLIENT_SESSION_KEEP_ALIVE set to true. Without this settings connector fails with (after connection is idle for a few hours):

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
        at java.base/java.util.concurrent.Executors.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException:
[SF_KAFKA_CONNECTOR] Exception: Failed to upload file with cache
[SF_KAFKA_CONNECTOR] Error Code: 2011
[SF_KAFKA_CONNECTOR] Detail: Failed to upload file to Snowflake Stage though credential caching
[SF_KAFKA_CONNECTOR] Message:
[SF_KAFKA_CONNECTOR] [SF_KAFKA_CONNECTOR] Exception: Max retry exceeded
[SF_KAFKA_CONNECTOR] [SF_KAFKA_CONNECTOR] Error Code: 2010
[SF_KAFKA_CONNECTOR] [SF_KAFKA_CONNECTOR] Detail: Api retry exceeded the max retry limit
[SF_KAFKA_CONNECTOR] [SF_KAFKA_CONNECTOR] Message:
[SF_KAFKA_CONNECTOR] [SF_KAFKA_CONNECTOR] [SF_KAFKA_CONNECTOR] Exception: Failed to execute cached put
[SF_KAFKA_CONNECTOR] [SF_KAFKA_CONNECTOR] [SF_KAFKA_CONNECTOR] Error Code: 5018
[SF_KAFKA_CONNECTOR] [SF_KAFKA_CONNECTOR] [SF_KAFKA_CONNECTOR] Detail: Error in cached put command
[SF_KAFKA_CONNECTOR] [SF_KAFKA_CONNECTOR] [SF_KAFKA_CONNECTOR] Message: Authentication token has expired.  The user must authenticate again.
alonpr commented 1 year ago

Similar issue happened to me as well. Would it make sense to set CLIENT_SESSION_KEEP_ALIVE = true for the connector user?

sfc-gh-tzhang commented 1 year ago

@amkartashov which version of connector is this? I couldn't find the message of "Authentication token has expired. The user must authenticate again." in the latest code

alonpr commented 1 year ago

@sfc-gh-tzhang For me, it happened with version 1.6.9

alonpr commented 1 year ago

@sfc-gh-tzhang which is the recommended version to use according to the documentation: https://docs.snowflake.com/en/user-guide/kafka-connector-install.html#installing-the-connector

amkartashov commented 1 year ago

@amkartashov which version of connector is this? I couldn't find the message of "Authentication token has expired. The user must authenticate again." in the latest code

@sfc-gh-tzhang it's 1.8.0

Error message comes from snowflake jdbc driver, and they suggest to set session parameter to fix this, but there is no way to do this with kafka connector because it accepts bare url only, w/o any parameters. See https://github.com/snowflakedb/snowflake-jdbc/issues/182 f.e.

sfc-gh-tzhang commented 1 year ago

Thanks, looks like the error is actually from Snowflake server side instead of JDBC. For the fix, we could support reading it from the config file and instruct this code to accept it as a property.

amkartashov commented 1 year ago

@sfc-gh-tzhang

I see below possible options:

  1. additional options in connector configuration file, smth like snowflake.connection.properties - same place where we have snowflake.url.name
  2. env vars
  3. JVM system properties
amkartashov commented 1 year ago

Current w/a from snowflake support is to set this setting on snowflake side: ALTER USER [user_name] SET CLIENT_SESSION_KEEP_ALIVE = true

sfc-gh-gjachimko commented 1 week ago

@sfc-gh-tzhang could we close this issue now?