Closed JavierMonton closed 2 weeks ago
I also receive a strange offset WARN recently:
com.snowflake.kafka.connector.internal.streaming.BufferedTopicPartitionChannel "skipping current record - expected offset 0 but received 0. The current offset stored in Snowflake: 0"
IMHO does not make sense expected offset 0, but received 0 - seems not an issue at all to me.
I do not know if could be related to bundle change (2024_08) but I was about to open as issue regarding offset WARNs as well.
Seems like that the first record offset for 2024_07 is 0 or null and for 2024_08 is -1 or null instead.
EDIT:
I confirm what I said, the offset in 2024_08 starts from -1, by doing: SELECT SYSTEM$SNOWPIPE_STREAMING_UPDATE_CHANNEL_OFFSET_TOKEN('-table-', '-channel-', '-1'); The first record is ingested correctly.
In 2024_07 I was doing: SELECT SYSTEM$SNOWPIPE_STREAMING_UPDATE_CHANNEL_OFFSET_TOKEN('-table-', '-channel-', '0');
Hi @JavierMonton, thanks for reaching out. Let me go through your comment step by step:
The rowCount is always 1, and this message is produced around 12.000 times per minute. We had this issue in the past and the message was produced even more than that. The error seems to appear after restarting the connector, but we don't know how to fix it.
The BufferedTopicPartitionChannel
buffers the rows in memory flushing them when the threshold is reached. Normally, we would like to flush multiple rows at the same time, but it's not possible as each row can introduce a new column, so we insert them into the ingest-sdk buffer one by one (the SDK does some internal validation logic regarding the schema).
preEndOffset=58193701, curStartOffset=58193703, curEndOffset=58193703, rowCount=1.
That line essentially means that the last inserted offset (stored in the channel on the Snowflake side is 58193701, and now the call to insgest-sdk insertRows
inserts the rows between offsets (58193703, 58193703). The API supports inserting multiple rows in batches so that's why the message needs to be decoded a bit. The function that verifies offsets is defined
at https://github.com/snowflakedb/snowflake-kafka-connector/blob/master/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingUtils.java#L63-L73.
It expects all offsets to be incremental by 1 - in your case, offset 58193702 is missing - which is fine as Kafka does not guarantee that offsets will differ by 1 - only that they'll be incremental. So I'll discuss it with the team, and most probably, I'll provide a better one (checking if it's incremental only).
We believe this started to happen when we moved from version 2.2.2 to version 2.4.0. It stopped when we moved to version 2.4.1 but it started to happen again.
Actually, the OffsetTokenVerificationFunction was introduced in 2.2.1
Hi @simonepm.
I confirm what I said, the offset in 2024_08 starts from -1, by doing: SELECT SYSTEM$SNOWPIPE_STREAMING_UPDATE_CHANNEL_OFFSET_TOKEN('-table-', '-channel-', '-1'); The first record is ingested correctly. In 2024_07 I was doing: SELECT SYSTEM$SNOWPIPE_STREAMING_UPDATE_CHANNEL_OFFSET_TOKEN('-table-', '-channel-', '0');
Let me circle back on that, as I'm unaware of any changes that could have caused that.
Thanks @sfc-gh-achyzy , to add context to this I still need to understand how SYSTEM$SNOWPIPE_STREAMING_UPDATE_CHANNEL_OFFSET_TOKEN works 100%, as it also happens that I set it to '0' for all the partitions of a topic, but only a subset of the topic partitions are sent back again to Snowflake (e.g. 6/10) after the Kafka connector is restarted. I guess is the interaction with the offset topic that makes things hard to be synched together with the CHANNELS counter.
@simonepm If you check https://github.com/snowflakedb/snowflake-kafka-connector/blob/1302e366c0f7012e2bc72d3ab9fc9eab3543dcf5/src/main/java/com/snowflake/kafka/connector/internal/streaming/channel/TopicPartitionChannel.java#L10 we assume that the offsetToken should be "-1" or null to be treated as non-existent for Snowflake side. That way we'll fall back to Kafka for the current offset. For now, the logic assumes the next message is +1 (which is not always true if you use transactions and some of them were rolled back).
TL;DR - reset to -1 or null. The logic I've mentioned is defined at https://github.com/snowflakedb/snowflake-kafka-connector/blob/master/src/main/java/com/snowflake/kafka/connector/internal/streaming/BufferedTopicPartitionChannel.java#L387-L388
Thanks for confirming that!
Hi @JavierMonton, thanks for reaching out. Let me go through your comment step by step:
The rowCount is always 1, and this message is produced around 12.000 times per minute. We had this issue in the past and the message was produced even more than that. The error seems to appear after restarting the connector, but we don't know how to fix it.
The
BufferedTopicPartitionChannel
buffers the rows in memory flushing them when the threshold is reached. Normally, we would like to flush multiple rows at the same time, but it's not possible as each row can introduce a new column, so we insert them into the ingest-sdk buffer one by one (the SDK does some internal validation logic regarding the schema).preEndOffset=58193701, curStartOffset=58193703, curEndOffset=58193703, rowCount=1.
That line essentially means that the last inserted offset (stored in the channel on the Snowflake side is 58193701, and now the call to insgest-sdk
insertRows
inserts the rows between offsets (58193703, 58193703). The API supports inserting multiple rows in batches so that's why the message needs to be decoded a bit. The function that verifies offsets is defined at https://github.com/snowflakedb/snowflake-kafka-connector/blob/master/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingUtils.java#L63-L73. It expects all offsets to be incremental by 1 - in your case, offset 58193702 is missing - which is fine as Kafka does not guarantee that offsets will differ by 1 - only that they'll be incremental. So I'll discuss it with the team, and most probably, I'll provide a better one (checking if it's incremental only).We believe this started to happen when we moved from version 2.2.2 to version 2.4.0. It stopped when we moved to version 2.4.1 but it started to happen again.
Actually, the OffsetTokenVerificationFunction was introduced in 2.2.1
Hi @sfc-gh-achyzy, many thanks for the explanation. I assume some false positives can happen, for example when reading from a compacted topic, where some offsets might be missing.
We are getting this message a lot, see these examples:
preEndOffset=57088836, curStartOffset=57088838, curEndOffset=57088838, rowCount=1.
preEndOffset=57088834, curStartOffset=57088836, curEndOffset=57088836, rowCount=1.
preEndOffset=57088832, curStartOffset=57088834, curEndOffset=57088834, rowCount=1.
but after your explanation, I looked for these missing offsets, and we are indeed missing them in our source topic.
So I assume the warning is fine.
From our side, this warning becomes unmanageable because it's produced too many times, but now I understand the reason. For now, we have suppressed the warning from the KC logger properties, but it'd be very nice if there was a way of disabling this check.
In any case, thanks a lot for the help! I'll keep looking for new releases :)
Hi, @JavierMonton @sfc-gh-achyzy prepared a fix for that. The delivery will be aligned with our release train.
The fix was released in 2.5.0. Closing the issue.
Hello,
We are having an issue with the connector where it is creating thousands of warning messages like:
The
rowCount
is always 1, and this message is produced around 12.000 times per minute. We had this issue in the past and the message was produced even more than that. The error seems to appear after restarting the connector, but we don't know how to fix it.We are not sure if this means that we are losing messages or if the warning can be ignored, but even if it can be ignored, the number of logs it produces is massive.
This is our configuration: Connector version: 2.4.1
The Kafka cluster is an MSK, and we use Confluent's Kafka Connect docker. We believe this started to happen when we moved from version
2.2.2
to version2.4.0
. It stopped when we moved to version2.4.1
but it started to happen again.We have tried stopping and recreating the connector, as well as deleting the Snowflake table and starting the connector again, but it still happens on some tables.
Is there any suggestion about how to deal with this? We can try to troubleshoot it, but we are not sure how right now.
Thanks.