airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
15.73k stars 4.03k forks source link

[destination-snowflake] Versions 3.6.0+ fail to sync with SQL compilation error: In-list contains more than 50 non-constant values #37493

Closed rowanmoul closed 1 day ago

rowanmoul commented 5 months ago

Connector Name

destination-snowflake

Connector Version

3.6.0

What step the error happened?

During the sync

Relevant information

After updgrading the snowflake destination conntector to 3.7.0, we started getting failed syncs saying we couldn't put more than 50 non-constant values in an "in-list", which I take to mean a where x in (thing1, thing2) clause. Walking back the versions, this appears to have started in version 3.6.0. Before the upgrade we were running 3.5.14 without issue, and downgrading back to that version now allows syncs to finish without error, suggesting that something changed between 3.5.14 and 3.6.0 that causes the queries generated during sync to be invalid.

One other interesting note is that when we reduce the number of tables that we are replicating from our postgresql source to 50 or less, the sync succeeds just fine on 3.6.0+

We are running the Airbyte Helm Chart version 0.64.205 at the moment, which lists the app version as 0.57.3

Relevant log output

2024-04-22 18:25:01 destination > INFO main i.a.c.i.d.s.GeneralStagingFunctions(lambda$onCloseFunction$1):162 Cleaning up destination completed.
2024-04-22 18:25:01 destination > INFO main i.a.c.i.d.a.AsyncStreamConsumer(close):253 class io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer closed
2024-04-22 18:25:01 destination > WARN main i.a.c.i.b.IntegrationRunner(stopOrphanedThreads):372 The main thread is exiting while children non-daemon threads from a connector are still active.
Ideally, this situation should not happen...
Please check with maintainers if the connector or library code should safely clean up its threads before quitting instead.
The main thread is: main (RUNNABLE)
 Thread stacktrace: java.base/java.lang.Thread.getStackTrace(Thread.java:2450)
        at io.airbyte.cdk.integrations.base.IntegrationRunner.dumpThread(IntegrationRunner.java:402)
        at io.airbyte.cdk.integrations.base.IntegrationRunner.stopOrphanedThreads(IntegrationRunner.java:376)
        at io.airbyte.cdk.integrations.base.IntegrationRunner.stopOrphanedThreads(IntegrationRunner.java:336)
        at io.airbyte.cdk.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.java:192)
        at io.airbyte.cdk.integrations.base.IntegrationRunner.run(IntegrationRunner.java:125)
        at io.airbyte.cdk.integrations.base.adaptive.AdaptiveDestinationRunner$Runner.run(AdaptiveDestinationRunner.java:88)
        at io.airbyte.integrations.destination.snowflake.SnowflakeDestinationRunner.main(SnowflakeDestinationRunner.java:20)
2024-04-22 18:25:01 destination > WARN main i.a.c.i.b.IntegrationRunner(stopOrphanedThreads):384 Active non-daemon thread: pool-9-thread-5 (TIMED_WAITING)
 Thread stacktrace: java.base/jdk.internal.misc.Unsafe.park(Native Method)
        at java.base/java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:269)
        at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:1758)
        at java.base/java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:460)
        at java.base/java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1069)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
        at java.base/java.lang.Thread.run(Thread.java:1583)
2024-04-22 18:25:01 destination > ERROR main i.a.c.i.b.AirbyteExceptionHandler(uncaughtException):64 Something went wrong in the connector. See the logs for more details. net.snowflake.client.jdbc.SnowflakeSQLException: SQL compilation error:
In-list contains more than 50 non-constant values: COLLATE_TO_BINARY('gatewayactivitytemplates', 'en-ci'),COLLATE_TO_BINARY('worklistemployees', 'en-ci'),COLLATE_TO_BINARY('attachtotypes', 'en-ci'),...
    at net.snowflake.client.jdbc.SnowflakeUtil.checkErrorAndThrowExceptionSub(SnowflakeUtil.java:144) ~[snowflake-jdbc-3.14.1.jar:3.14.1]
    at net.snowflake.client.jdbc.SnowflakeUtil.checkErrorAndThrowException(SnowflakeUtil.java:77) ~[snowflake-jdbc-3.14.1.jar:3.14.1]
    at net.snowflake.client.core.StmtUtil.pollForOutput(StmtUtil.java:501) ~[snowflake-jdbc-3.14.1.jar:3.14.1]
    at net.snowflake.client.core.StmtUtil.execute(StmtUtil.java:407) ~[snowflake-jdbc-3.14.1.jar:3.14.1]
    at net.snowflake.client.core.SFStatement.executeHelper(SFStatement.java:482) ~[snowflake-jdbc-3.14.1.jar:3.14.1]
    at net.snowflake.client.core.SFStatement.executeQueryInternal(SFStatement.java:199) ~[snowflake-jdbc-3.14.1.jar:3.14.1]
    at net.snowflake.client.core.SFStatement.executeQuery(SFStatement.java:133) ~[snowflake-jdbc-3.14.1.jar:3.14.1]
    at net.snowflake.client.core.SFStatement.execute(SFStatement.java:769) ~[snowflake-jdbc-3.14.1.jar:3.14.1]
    at net.snowflake.client.core.SFStatement.execute(SFStatement.java:677) ~[snowflake-jdbc-3.14.1.jar:3.14.1]
    at net.snowflake.client.jdbc.SnowflakeStatementV1.executeInternal(SnowflakeStatementV1.java:320) ~[snowflake-jdbc-3.14.1.jar:3.14.1]
    at net.snowflake.client.jdbc.SnowflakeStatementV1.execute(SnowflakeStatementV1.java:392) ~[snowflake-jdbc-3.14.1.jar:3.14.1]
    at com.zaxxer.hikari.pool.ProxyStatement.execute(ProxyStatement.java:94) ~[HikariCP-5.1.0.jar:?]
    at com.zaxxer.hikari.pool.HikariProxyStatement.execute(HikariProxyStatement.java) ~[HikariCP-5.1.0.jar:?]
    at io.airbyte.cdk.db.jdbc.JdbcDatabase.lambda$executeWithinTransaction$1(JdbcDatabase.java:57) ~[io.airbyte.airbyte-cdk.java.airbyte-cdk-core-0.27.1.jar:?]
    at io.airbyte.cdk.db.jdbc.DefaultJdbcDatabase.execute(DefaultJdbcDatabase.java:47) ~[io.airbyte.airbyte-cdk.java.airbyte-cdk-core-0.27.1.jar:?]
    at io.airbyte.cdk.db.jdbc.JdbcDatabase.executeWithinTransaction(JdbcDatabase.java:54) ~[io.airbyte.airbyte-cdk.java.airbyte-cdk-core-0.27.1.jar:?]
    at io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler.commitDestinationStates(JdbcDestinationHandler.java:380) ~[io.airbyte.airbyte-cdk.java.airbyte-cdk-db-destinations-0.27.1.jar:?]
    at io.airbyte.integrations.base.destination.typing_deduping.DefaultTyperDeduper.prepareSchemasAndRunMigrations(DefaultTyperDeduper.java:142) ~[io.airbyte.airbyte-cdk.java.airbyte-cdk-typing-deduping-0.27.1.jar:?]
    at io.airbyte.cdk.integrations.destination.staging.GeneralStagingFunctions.lambda$onStartFunction$0(GeneralStagingFunctions.java:46) ~[io.airbyte.airbyte-cdk.java.airbyte-cdk-db-destinations-0.27.1.jar:?]
    at io.airbyte.commons.concurrency.VoidCallable.call(VoidCallable.java:15) ~[io.airbyte.airbyte-cdk.java.airbyte-cdk-dependencies-0.27.1.jar:?]
    at io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer.start(AsyncStreamConsumer.kt:188) ~[io.airbyte.airbyte-cdk.java.airbyte-cdk-core-0.27.1.jar:?]
    at io.airbyte.cdk.integrations.base.IntegrationRunner.consumeWriteStream(IntegrationRunner.java:298) ~[io.airbyte.airbyte-cdk.java.airbyte-cdk-core-0.27.1.jar:?]
    at io.airbyte.cdk.integrations.base.IntegrationRunner.consumeWriteStream(IntegrationRunner.java:289) ~[io.airbyte.airbyte-cdk.java.airbyte-cdk-core-0.27.1.jar:?]
    at io.airbyte.cdk.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.java:190) ~[io.airbyte.airbyte-cdk.java.airbyte-cdk-core-0.27.1.jar:?]
    at io.airbyte.cdk.integrations.base.IntegrationRunner.run(IntegrationRunner.java:125) ~[io.airbyte.airbyte-cdk.java.airbyte-cdk-core-0.27.1.jar:?]
    at io.airbyte.cdk.integrations.base.adaptive.AdaptiveDestinationRunner$Runner.run(AdaptiveDestinationRunner.java:88) ~[io.airbyte.airbyte-cdk.java.airbyte-cdk-core-0.27.1.jar:?]
    at io.airbyte.integrations.destination.snowflake.SnowflakeDestinationRunner.main(SnowflakeDestinationRunner.java:20) ~[io.airbyte.airbyte-integrations.connectors-destination-snowflake.jar:?]

Stack Trace: net.snowflake.client.jdbc.SnowflakeSQLException: SQL compilation error:
In-list contains more than 50 non-constant values: COLLATE_TO_BINARY('gatewayactivitytemplates', 'en-ci'),COLLATE_TO_BINARY('worklistemployees', 'en-ci'),COLLATE_TO_BINARY('attachtotypes', 'en-ci'),...
    at net.snowflake.client.jdbc.SnowflakeUtil.checkErrorAndThrowExceptionSub(SnowflakeUtil.java:144)
    at net.snowflake.client.jdbc.SnowflakeUtil.checkErrorAndThrowException(SnowflakeUtil.java:77)
    at net.snowflake.client.core.StmtUtil.pollForOutput(StmtUtil.java:501)
    at net.snowflake.client.core.StmtUtil.execute(StmtUtil.java:407)
    at net.snowflake.client.core.SFStatement.executeHelper(SFStatement.java:482)
    at net.snowflake.client.core.SFStatement.executeQueryInternal(SFStatement.java:199)
    at net.snowflake.client.core.SFStatement.executeQuery(SFStatement.java:133)
    at net.snowflake.client.core.SFStatement.execute(SFStatement.java:769)
    at net.snowflake.client.core.SFStatement.execute(SFStatement.java:677)
    at net.snowflake.client.jdbc.SnowflakeStatementV1.executeInternal(SnowflakeStatementV1.java:320)
    at net.snowflake.client.jdbc.SnowflakeStatementV1.execute(SnowflakeStatementV1.java:392)
    at com.zaxxer.hikari.pool.ProxyStatement.execute(ProxyStatement.java:94)
    at com.zaxxer.hikari.pool.HikariProxyStatement.execute(HikariProxyStatement.java)
    at io.airbyte.cdk.db.jdbc.JdbcDatabase.lambda$executeWithinTransaction$1(JdbcDatabase.java:57)
    at io.airbyte.cdk.db.jdbc.DefaultJdbcDatabase.execute(DefaultJdbcDatabase.java:47)
    at io.airbyte.cdk.db.jdbc.JdbcDatabase.executeWithinTransaction(JdbcDatabase.java:54)
    at io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler.commitDestinationStates(JdbcDestinationHandler.java:380)
    at io.airbyte.integrations.base.destination.typing_deduping.DefaultTyperDeduper.prepareSchemasAndRunMigrations(DefaultTyperDeduper.java:142)
    at io.airbyte.cdk.integrations.destination.staging.GeneralStagingFunctions.lambda$onStartFunction$0(GeneralStagingFunctions.java:46)
    at io.airbyte.commons.concurrency.VoidCallable.call(VoidCallable.java:15)
    at io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer.start(AsyncStreamConsumer.kt:188)
    at io.airbyte.cdk.integrations.base.IntegrationRunner.consumeWriteStream(IntegrationRunner.java:298)
    at io.airbyte.cdk.integrations.base.IntegrationRunner.consumeWriteStream(IntegrationRunner.java:289)
    at io.airbyte.cdk.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.java:190)
    at io.airbyte.cdk.integrations.base.IntegrationRunner.run(IntegrationRunner.java:125)
    at io.airbyte.cdk.integrations.base.adaptive.AdaptiveDestinationRunner$Runner.run(AdaptiveDestinationRunner.java:88)
    at io.airbyte.integrations.destination.snowflake.SnowflakeDestinationRunner.main(SnowflakeDestinationRunner.java:20)

2024-04-22 18:25:02 replication-orchestrator > Try to close a destination which is already close
2024-04-22 18:25:02 replication-orchestrator > writeToDestination: exception caught
java.net.SocketException: Broken pipe
    at java.base/sun.nio.ch.SocketDispatcher.write0(Native Method) ~[?:?]
    at java.base/sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:62) ~[?:?]
    at java.base/sun.nio.ch.NioSocketImpl.tryWrite(NioSocketImpl.java:394) ~[?:?]
    at java.base/sun.nio.ch.NioSocketImpl.implWrite(NioSocketImpl.java:410) ~[?:?]
    at java.base/sun.nio.ch.NioSocketImpl.write(NioSocketImpl.java:440) ~[?:?]
    at java.base/sun.nio.ch.NioSocketImpl$2.write(NioSocketImpl.java:819) ~[?:?]
    at java.base/java.net.Socket$SocketOutputStream.write(Socket.java:1195) ~[?:?]
    at java.base/sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:309) ~[?:?]
    at java.base/sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:381) ~[?:?]
    at java.base/sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:357) ~[?:?]
    at java.base/sun.nio.cs.StreamEncoder.lockedWrite(StreamEncoder.java:158) ~[?:?]
    at java.base/sun.nio.cs.StreamEncoder.write(StreamEncoder.java:139) ~[?:?]
    at java.base/java.io.OutputStreamWriter.write(OutputStreamWriter.java:219) ~[?:?]
    at java.base/java.io.BufferedWriter.implFlushBuffer(BufferedWriter.java:178) ~[?:?]
    at java.base/java.io.BufferedWriter.flushBuffer(BufferedWriter.java:163) ~[?:?]
    at java.base/java.io.BufferedWriter.implWrite(BufferedWriter.java:334) ~[?:?]
    at java.base/java.io.BufferedWriter.write(BufferedWriter.java:313) ~[?:?]
    at java.base/java.io.Writer.write(Writer.java:278) ~[?:?]
    at io.airbyte.workers.internal.VersionedAirbyteMessageBufferedWriter.write(VersionedAirbyteMessageBufferedWriter.java:39) ~[io.airbyte-airbyte-commons-worker-0.57.3.jar:?]
    at io.airbyte.workers.internal.DefaultAirbyteDestination.acceptWithNoTimeoutMonitor(DefaultAirbyteDestination.java:138) ~[io.airbyte-airbyte-commons-worker-0.57.3.jar:?]
    at io.airbyte.workers.internal.DefaultAirbyteDestination.accept(DefaultAirbyteDestination.java:131) ~[io.airbyte-airbyte-commons-worker-0.57.3.jar:?]
    at io.airbyte.workers.general.BufferedReplicationWorker.writeToDestination(BufferedReplicationWorker.java:450) ~[io.airbyte-airbyte-commons-worker-0.57.3.jar:?]
    at io.airbyte.workers.general.BufferedReplicationWorker.lambda$runAsyncWithTimeout$5(BufferedReplicationWorker.java:263) ~[io.airbyte-airbyte-commons-worker-0.57.3.jar:?]
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[?:?]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
    at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
2024-04-22 18:25:02 replication-orchestrator > writeToDestination: done. (forDest.isDone:false, isDestRunning:true)
2024-04-22 18:25:02 replication-orchestrator > readFromDestination: exception caught
java.lang.IllegalStateException: Destination process is still alive, cannot retrieve exit value.
    at com.google.common.base.Preconditions.checkState(Preconditions.java:512) ~[guava-32.1.3-jre.jar:?]
    at io.airbyte.workers.internal.DefaultAirbyteDestination.getExitValue(DefaultAirbyteDestination.java:210) ~[io.airbyte-airbyte-commons-worker-0.57.3.jar:?]
    at io.airbyte.workers.general.BufferedReplicationWorker.readFromDestination(BufferedReplicationWorker.java:492) ~[io.airbyte-airbyte-commons-worker-0.57.3.jar:?]
    at io.airbyte.workers.general.BufferedReplicationWorker.lambda$runAsync$2(BufferedReplicationWorker.java:235) ~[io.airbyte-airbyte-commons-worker-0.57.3.jar:?]
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[?:?]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
    at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
2024-04-22 18:25:02 replication-orchestrator > readFromDestination: done. (writeToDestFailed:true, dest.isFinished:false)
2024-04-22 18:25:02 replication-orchestrator > processMessage: done. (fromSource.isDone:false, forDest.isClosed:true)
2024-04-22 18:25:02 replication-orchestrator > (pod: airbyte / source-postgres-read-341-0-nolfl) - Closed all resources for pod
2024-04-22 18:25:02 replication-orchestrator > Total records read: 1982 (878 KB)

Contribute

marcosmarxm commented 5 months ago

Thanks for reporting the issue @rowanmoul I added it to the connector backlog for further investigation in next sprints.

rowanmoul commented 5 months ago

Please let me know if I can help dig into the problem further (eg, turning on more detailed logging, trying a debug version of the connector, etc). I should be able to reliably re-produce this issue on our dev server still.

stephane-airbyte commented 4 months ago

@rowanmoul can you attach the full logs to this issue?

rowanmoul commented 4 months ago

I'd rather not post them publicly, as it contains many of our table names. Can I send the log file to you on slack?

stephane-airbyte commented 4 months ago

Thank you @rowanmoul for providing the logs on slack. What you posted in the description contains all the relevant information. We'll need to push a new version of the snowflake connector with a bit more logging, so we can see what query exactly is failing. The initial assessment is that the issue is probably caused by the high number of streams in your connection. If you want to get unblocked quickly, a workaround would be to split the connection into several connections with less than 50 streams each. I know it's not ideal though

rowanmoul commented 4 months ago

Right now our workaround is running snowflake destination connector 3.5.14, which does not seem to have this problem with > 50 streams. When you have a new version with more logging let me know and I should be able to reproduce the issue just by upgrading the connector.

evantahler commented 4 months ago

Will be solved by https://github.com/airbytehq/airbyte/pull/38585