snowflakedb / snowflake-kafka-connector

Snowflake Kafka Connector (Sink Connector)
Apache License 2.0
138 stars 98 forks source link

Consumer offsets on Kafka lagging behind #921

Closed colinsmetz closed 1 month ago

colinsmetz commented 2 months ago

Hello,

My team and I are using the Snowflake sink connector using the SNOWPIPE_STREAMING ingestion method. We sometimes notice that the consumer offsets on Kafka are not up-to-date. In the logs, we can see logs like Fetched offsetToken for channelName:xxx, offset:<offset> where the offset matches the end offset of the Kafka topic, and the connector seems indeed to have finished distributing. But the offsets on Kafka are lower, and can stay like that for a very long period, until a new record arrives.

I have found the documentation on exactly-once semantics, but while it does clarify a few things, I am not sure if this behaviour is expected.

We are using the consumer offsets to easily check that all our sink connectors (including other types of connectors) are on track, so while the data is correctly distributed, it makes our monitoring harder.

Am I missing something? Is this a bug? Can something be done to prevent this?

sfc-gh-wtrefon commented 1 month ago

Hi @colinsmetz, kafka connector uses internal buffers when consuming the messages. The Fetched offsetToken for channelName:xxx, offset:<offset> offset that matched the end offset of the kafka topic is the offset of the inmemory buffer. It can take a while until the buffer is flushed and corresponding consumer offset in kafka and offset in Snowflake are being updated.

You can lower the lag by using only one internal buffer (param: snowflake.streaming.enable.single.buffer)

In your case I recommend using the jmx metrics. We expose metrics like offsets.processed-offset, offsets.latest-consumer-offset, offsets.persisted-in-snowflake-offset. In your case you should use offsets.processed-offset which is the offset of the buffer.

colinsmetz commented 1 month ago

Hi @sfc-gh-wtrefon, thanks for the reply!

It can take a while until the buffer is flushed and corresponding consumer offset in kafka and offset in Snowflake are being updated.

How long should we wait? We have buffer.flush.time set to 120 (2 minutes), yet once there are no new records on the Kafka topic, the consumer lag stays for hours.

In your case I recommend using the jmx metrics. We expose metrics like offsets.processed-offset, offsets.latest-consumer-offset, offsets.persisted-in-snowflake-offset. In your case you should use offsets.processed-offset which is the offset of the buffer.

Thanks, we'll have a look at those.

sfc-gh-wtrefon commented 1 month ago

Hi @colinsmetz the offset commit to Kafka is controlled by the framework and we cannot change it. At the time when the commit is called the data is still in buffer and we cannot advance offset in Kafka. The next commit will be called when new data comes in that's why you can see hours long lag.

sfc-gh-lshcharbaty commented 1 month ago

Hi @colinsmetz, Closing this issue due to inactivity. If your problem is still present, feel free to reopen the issue.