snowflakedb / snowflake-kafka-connector

Snowflake Kafka Connector (Sink Connector)
Apache License 2.0
136 stars 96 forks source link

schematization=true, error Insert Row Error , Columns not present in the table shouldn't be specified #878

Closed marystory closed 5 days ago

marystory commented 1 month ago

I am testing the schematization feature for the Snowpipe Streaming ingestion method locally. I have tried it with JSON and Avro value serialization. In both cases, the destination table does not exist and is supposed to be created by the Kafka connector.

I get the error below:

ERROR [snowflake-sink-connector|task-0] [SF_KAFKA_CONNECTOR] Insert Row Error message:The given row cannot be converted to the internal format: Extra columns: ["EMAIL", "AGE", "NAME"]. Columns not present in the table shouldn't be specified, rowIndex:1 (com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel:94)

Setup: kafka_2.13-3.2.3 : for kafka connect snowflake-kafka-connector-2.2.2

name=snowflake-sink-connector
connector.class=com.snowflake.kafka.connector.SnowflakeSinkConnector
snowflake.ingestion.method=SNOWPIPE_STREAMING
tasks.max=1
topics=user
snowflake.topic2table.map=user:user
buffer.flush.time=20
buffer.count.records=10000
buffer.size.bytes=5000000

errors.tolerance=all
errors.log.enable=true

key.converter.schemas.enable=false
value.converter.schemas.enable=true
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

# Snowflake configurations
# The URL for accessing your Snowflake account
snowflake.url.name=<>
snowflake.database.name=<>
snowflake.schema.name=INGESTION
snowflake.user.name=<>
snowflake.role.name=<>
snowflake.private.key=<>
snowflake.private.key.passphrase=<>

snowflake.enable.schematization=true

Schematization should be able to create the table with my schema. So I am confused about the error I got. I get the same error even when I create the destination table manually and set its ENABLE_SCHEMA_EVOLUTION to true.

Any guidance on whether I am missing a setup or using wrong versions? I am using redpanda cluster and kafka connect.

Thanks

sfc-gh-mbobowski commented 1 month ago

Hello @marystory, thank you for trying out schema evolution feature!

I don't see any error within your config and v2.2.2 should definitely support schematization. Please provide information on:

dthuering commented 1 month ago

Hello @sfc-gh-mbobowski

I did run into the same issue today. It happens when I create the table (with record_metadata only) with a tag before the connector is setup. When the table does not exists, it works.

I am using:

Best, Danny

Here the log

2024-07-15 13:26:39,237 INFO [connect-structured|task-2] [SF_KAFKA_CONNECTOR] Successfully called insertRows for channel:T.DATA.STRUCTURED.STRUCTURED_7, buffer:StreamingBuffer{numOfRecords=5, bufferSizeBytes=3829, firstOffset=0, lastOffset=4}, insertResponseHasErrors:true, needToResetOffset:false (com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel) [task-thread-connect-structured-2]
2024-07-15 13:26:39,238 ERROR [connect-structured|task-2] WorkerSinkTask{id=connect-structured-2} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Error inserting Records using Streaming API with msg:The given row cannot be converted to the internal format: Extra columns: ["RECORDID", "TODOS"]. Columns not present in the table shouldn't be specified, rowIndex:0 (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-connect-structured-2]org.apache.kafka.connect.errors.DataException: Error inserting Records using Streaming API with msg:The given row cannot be converted to the internal format: Extra columns: ["RECORDID", "TODOS"]. Columns not present in the table shouldn't be specified, rowIndex:0   
at com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel.handleInsertRowsFailures(TopicPartitionChannel.java:791)  
at com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel.insertBufferedRecords(TopicPartitionChannel.java:557) 
at com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel.insertBufferedRecordsIfFlushTimeThresholdReached(TopicPartitionChannel.java:526)  
at com.snowflake.kafka.connector.internal.streaming.SnowflakeSinkServiceV2.insert(SnowflakeSinkServiceV2.java:293)  at com.snowflake.kafka.connector.SnowflakeSinkTask.put(SnowflakeSinkTask.java:304)  
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581) 
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)    
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)   
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203) 
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)   
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243) 
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)    
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)   
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)    
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)    
at java.base/java.lang.Thread.run(Thread.java:829)Caused by: net.snowflake.ingest.utils.SFException: The given row cannot be converted to the internal format: Extra columns: ["RECORDID", "TODOS"]. Columns not present in the table shouldn't be specified, rowIndex:0    
at net.snowflake.ingest.streaming.internal.AbstractRowBuffer.verifyInputColumns(AbstractRowBuffer.java:424) 
at net.snowflake.ingest.streaming.internal.AbstractRowBuffer$ContinueIngestionStrategy.insertRows(AbstractRowBuffer.java:162)   
at net.snowflake.ingest.streaming.internal.AbstractRowBuffer.insertRows(AbstractRowBuffer.java:469) 
at net.snowflake.ingest.streaming.internal.ParquetRowBuffer.insertRows(ParquetRowBuffer.java:37)    
at net.snowflake.ingest.streaming.internal.SnowflakeStreamingIngestChannelInternal.insertRows(SnowflakeStreamingIngestChannelInternal.java:387) 
at com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel$InsertRowsApiResponseSupplier.get(TopicPartitionChannel.java:665) 
at com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel$InsertRowsApiResponseSupplier.get(TopicPartitionChannel.java:625) at dev.failsafe.Functions.lambda$toCtxSupplier$11(Functions.java:243)   
at dev.failsafe.Functions.lambda$get$0(Functions.java:46)   
at dev.failsafe.internal.FallbackExecutor.lambda$apply$0(FallbackExecutor.java:51)  
at dev.failsafe.SyncExecutionImpl.executeSync(SyncExecutionImpl.java:182)   
at dev.failsafe.FailsafeExecutor.call(FailsafeExecutor.java:438)    
at dev.failsafe.FailsafeExecutor.get(FailsafeExecutor.java:115) 
at com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel.insertRowsWithFallback(TopicPartitionChannel.java:619)    
at com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel.insertBufferedRecords(TopicPartitionChannel.java:545) ... 14 more
sfc-gh-xhuang commented 1 month ago

@dthuering Since you created the table outside of the kafka connector, did you ALTER TABLE SET ENABLE_SCHEMA_EVOLUTION = TRUE on it?

snowflake.enable.schematization

dthuering commented 1 month ago

@sfc-gh-xhuang No, I did not. It works when I do. Thank you! 👍

sfc-gh-xhuang commented 1 month ago

@marystory Can you also recheck if the table was created with ENABLE_SCHEMA_EVOLUTION?

marystory commented 1 month ago

Thanks for reviewing

@sfc-gh-xhuang Yes, as mentioned above in my post, I get the error in both of these scenarios

@sfc-gh-mbobowski the error posted above is from the connector logs. Here is the rest of the log. No data was loaded into snowflake.

Detailed logs ``` [2024-07-11 18:57:27,495] INFO [snowflake-sink-connector|task-0] [SF_KAFKA_CONNECTOR] Migrate OffsetToken response for table:random, sourceChannel:snowflake_sink_connector_225951378_users_0, destinationChannel:users_0 is:ChannelMigrateOffsetTokenResponseDTO{responseCode=51, responseMessage='Source Channel does not exist for Offset Migration'} (com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceV1:46) [2024-07-11 18:57:27,496] INFO [snowflake-sink-connector|task-0] [SF_KAFKA_CONNECTOR] Opening a channel with name:users_0 for table name:random (com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel:46) [2024-07-11 18:57:28,111] INFO [snowflake-sink-connector|task-0] [SF_INGEST] Open channel request succeeded, channel=users_0, table=TESTDB.INGESTION.random, clientSequencer=3, rowSequencer=0, client=KC_CLIENT_snowflake_sink_connector_225951378_0 (net.snowflake.ingest.streaming.internal.SnowflakeStreamingIngestClientInternal:58) [2024-07-11 18:57:28,111] INFO [snowflake-sink-connector|task-0] [SF_INGEST] Channel=USERS_0 created for table=RANDOM (net.snowflake.ingest.streaming.internal.SnowflakeStreamingIngestChannelInternal:58) [2024-07-11 18:57:28,302] INFO [snowflake-sink-connector|task-0] [SF_KAFKA_CONNECTOR] Fetched offsetToken for channelName:TESTDB.INGESTION.RANDOM.USERS_0, offset:null (com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel:46) [2024-07-11 18:57:28,306] INFO [snowflake-sink-connector|task-0] [SF_KAFKA_CONNECTOR] TopicPartitionChannel:TESTDB.INGESTION.RANDOM.USERS_0, offset token is NULL, will rely on Kafka to send us the correct offset instead (com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel:46) [2024-07-11 18:57:28,307] INFO [snowflake-sink-connector|task-0] [SF_KAFKA_CONNECTOR] task opened with 1 partitions, execution time: 2344 milliseconds (com.snowflake.kafka.connector.SnowflakeSinkTask:46) [2024-07-11 18:57:28,316] INFO [snowflake-sink-connector|task-0] [Consumer clientId=connector-consumer-snowflake-sink-connector-0, groupId=connect-snowflake-sink-connector] Node 0 was unable to process the fetch request with (sessionId=1073741852, epoch=2312): FETCH_SESSION_ID_NOT_FOUND. (org.apache.kafka.clients.FetchSessionHandler:523) [2024-07-11 18:57:31,598] INFO [snowflake-sink-connector|task-0] [SF_KAFKA_CONNECTOR] Fetched offsetToken for channelName:TESTDB.INGESTION.RANDOM.USERS_0, offset:null (com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel:46) [2024-07-11 18:57:41,775] INFO [snowflake-sink-connector|task-0] [SF_KAFKA_CONNECTOR] Fetched offsetToken for channelName:TESTDB.INGESTION.RANDOM.USERS_0, offset:null (com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel:46) [2024-07-11 18:57:51,792] INFO [snowflake-sink-connector|task-0] [SF_KAFKA_CONNECTOR] Successfully called insertRows for channel:TESTDB.INGESTION.RANDOM.USERS_0, buffer:StreamingBuffer{numOfRecords=2, bufferSizeBytes=544, firstOffset=0, lastOffset=1}, insertResponseHasErrors:true, needToResetOffset:false (com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel:46) [2024-07-11 18:57:51,795] ERROR [snowflake-sink-connector|task-0] [SF_KAFKA_CONNECTOR] Insert Row Error message:The given row cannot be converted to the internal format: Extra columns: ["EMAIL", "AGE", "NAME"]. Columns not present in the table shouldn't be specified, rowIndex:0 (com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel:94) [2024-07-11 18:57:51,795] ERROR [snowflake-sink-connector|task-0] [SF_KAFKA_CONNECTOR] Insert Row Error message:The given row cannot be converted to the internal format: Extra columns: ["EMAIL", "AGE", "NAME"]. Columns not present in the table shouldn't be specified, rowIndex:1 (com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel:94) [2024-07-11 18:57:51,795] WARN [snowflake-sink-connector|task-0] [SF_KAFKA_CONNECTOR] Although config:errors.tolerance is set, Dead Letter Queue topic:errors.deadletterqueue.topic.name is not set. (com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel:82) ```

I am planning to test it on confluent kafka cluster in the next few days to see if I have similar issues or not.

sfc-gh-mbobowski commented 1 month ago

@marystory thank you for posting the logs.

I investigated the code deeply and what I see is that connector behaves as if the schema evolution feature is not enabled. The error with extra columns should be caught and trigger ALTER TABLE statement but it does not happen. I will introduce additional logs in the schematization code because right now logging is very poor and we are practically blind even on a debug level.

What I would do is:

If you find anything new I will try to help. The version with additional logging will most probably be released in the next month.

sfc-gh-mbobowski commented 2 weeks ago

@marystory could you try it out with 2.4.0 release? It contains more verbose logging.

sfc-gh-mbobowski commented 5 days ago

Closing due to inactivity.