snowflakedb / snowflake-kafka-connector

Snowflake Kafka Connector (Sink Connector)
Apache License 2.0
138 stars 98 forks source link

Performance tuning in Amazon MSK #661

Closed vtstanescu closed 1 year ago

vtstanescu commented 1 year ago

Hello!

Short description

This is more of a question, not sure if it's an actual issue or not. Q: What are the guidelines for tuning the performance of the Snowflake Kafka Connector when we have consuming lag issues?

Kafka Connect configuration

We are using the Snowflake Kafka Connector in Amazon MSK Connect, a managed Kafka Connect service offering from AWS with the following connector configuration:

connector.class=com.snowflake.kafka.connector.SnowflakeSinkConnector
tasks.max=2
topics=topic1,topic2,topic3,topic4,topic5

buffer.flush.time=300
buffer.size.bytes=209715200
buffer.count.records=200000

snowflake.url.name=<redacted>
snowflake.database.name=<redacted>
snowflake.schema.name=<redacted>
snowflake.user.name=<redacted>
snowflake.private.key=<redacted>
snowflake.private.key.passphrase=<redacted>

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=com.snowflake.kafka.connector.records.SnowflakeJsonConverter

We also use a worker configuration:

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=com.snowflake.kafka.connector.records.SnowflakeJsonConverter
config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider
config.providers.secretManager.param.aws.region=<redacted>
config.providers=secretManager

The compute capacity in MSK Connect is currently configured with 2 workers, each with 2 MSK Connect Units (MCUs), which means each worker has 2 vCPUs & 8 GiB memory, so across the whole connector we have 4 vCPUs & 16 GiB memory.

As can be seen from the connector configuration, we consume data from 5 topics with a buffer flush configuration as 5 minutes/200MiB/200k records. We have a 2 broker Kafka cluster and each topic has 2 partitions.

Producers' throughput

The producer side of Kafka is a backend application that writes to the topics with the following estimated throughputs (last 2 weeks' data): topic1 - avg 2.15 MiB/s, max 3.28 MiB/s; messages: avg 323/s max 398/s (the messages is not actual Kafka topic metric, but from our backend) topic2 - avg 1.14 MiB/s, max 1.83 MiB/s topic3 - avg 14.5 KiB/s, max 47.7 KiB/s topic4 - this can be ignored as it's rarely seeing data and when it does, it's very low quantity and size/message topic5 - avg 34 KiB/s, max 389 KiB/s

As can be seen from the metrics from Amazon MSK, we don't seem to be hitting the limits of the data throughput of the cluster, brokers, or topic partitions. Nonetheless, when the peak producers' time occurs, the Snowflake Kafka Connector is lagging with only one topic, topic1, the biggest in our application.

Worth mentioning that we don't see issues with the compute capacity of the connector, not even during producers' peaks. We can try to enable more metrics (per-topic and per-partition) if it helps narrow down the issue, but we don't think this is a problem with Kafka, but rather Kafka Connect. We are also looking to try the latest Snowflake Kafka Connector and try the Snowflake Streaming method, instead of Snowpipe.

References

sfc-gh-rcheng commented 1 year ago

@sfc-gh-japatel has experience working with MSK, though it maybe different from what you are trying to do. Jay can you take a look?

sfc-gh-wfateem commented 1 year ago

Hey @vtstanescu, I think the question is a bit open-ended, but I have a few thoughts. For starters, when you say there's a consumer lag, how big of a lag are we talking and is that constant or is it continuously increasing?

Just looking at your metrics and along with your configurations, I can understand why you might be seeing a lag and it's not necessarily an issue. Take the following metric from your producer for topic 1:

topic1 - avg 2.15 MiB/s, max 3.28 MiB/s; messages: avg 323/s max 398/s

Here are the relative configurations for your Snowflake Kafka Connector:

buffer.flush.time=300
buffer.size.bytes=209715200
buffer.count.records=200000

For simplicity, let's say you're consistently producing 323 records per second. At that rate alone, and if we consider the buffer.count.records parameter alone, then the connector is only going to start writing out your output file after 200000 / 323 = 619 seconds i.e. ~10 minutes.

At that rate, your buffer.flush.time parameter is your safety net, so it should be writing out the output file every 300 seconds (5 minutes). We're not going to commit the offsets until your entire batch of records is written out to a file, and then we confirm that this was ingested by Snowpipe at the backend.

If you think about that message that first comes into your topic, and we say that's immediately read by Kafka Connect and passed to the Snowflake Kafka Connector, you can't expect that offset to be committed until 5 minutes have passed at least. So you can see why it would be natural to have a lag here.

To bring down that latency, you would need to decrease the flushing time and number of records. The downfall of that is that you will end up driving your costs significantly since you're writing out a lot more files. Not to mention it can increase disk I/O activity on your nodes because of how frequently you're writing out your files on disk.

In order to really get a good idea of a benchmark for a particular environment, I suggest you start off by creating a single connector, with a single task that reads from a single topic with a single partition. You can then run your load tests to see how much a single task can handle, according to your requirements. As I mentioned previously, depending on your configurations you can expect a certain degree of consumer lag naturally. That performance and latency should be significantly better when you switch to SNOWPIPE_STREAMING as the ingestion method.

sfc-gh-wfateem commented 1 year ago

@vtstanescu I hope some of what I provided earlier was helpful, but let us know if you still have any questions. Also, in case you're using SNOWPIPE as the ingestion method, please be sure to use v1.9.3.

vtstanescu commented 1 year ago

Hi @sfc-gh-wfateem!

Thank you for the in-depth explanation there. I got a couple of things I'd like to address, while fully aware they might not point to an issue with the Snowflake Kafka Connector. Nonetheless, could be helpful for others having similar issues.

First of all, why is this? Are there any improvements worth considering, besides the option to use Snowpipe Streaming?

Also, in case you're using SNOWPIPE as the ingestion method, please be sure to use v1.9.3.

We are currently using 1.8.1 (had to rollback to it from 1.8.2 because of #517) and didn't upgrade it since.

UPDATE: I reviewed the CHANGELOG for all releases since 1.8.1 and I understand why the recommandation:

  1. Anything between 1.6.9 & 1.9.0 is a beta release
  2. 1.9.2 is broken for Snowpipe without streaming.

Secondly, since Friday we started having an incident and are still troubleshooting the root cause, but our connector started to lag on our biggest topic only on one partition. Snippet from kafka-consumer-groups.sh --describe:

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
topic1          0          51536759603     51767715037     230955434
topic1          1          51764259226     51764380960     121734   

This lag increased gradually (see SumOffsetLag metric graph from Amazon CloudWatch) and it doesn't seem to be able to recover from it.

Screenshot 2023-07-04 at 22 04 38

What I'm looking to understand at this point is:

  1. What's the recommendation for the Snowflake Kafka Connector in regard to how many topics it should handle? We currently have 6 topics in the topics property of the connector. Is the rule of thumb to have a task per partition? ie. tasks.max = sum([ len(topic.partitions) for topic in topics ]...) And of course, appropriate connector compute capacity. Or is it more recommended to have separate connectors for each topic? Though I don't expect this to be the case, how should you choose your connector compute capacity in this scenario?
  2. From Snowflake documentation it looks like buffer.* properties are per partition (as I'd expect). In this case, to estimate the required capacity, say for connector memory, we should use a formula like: connector_memory = tasks.max * buffer.size.bytes * headroom_margin_percent where headroom_margin_percent is other memory needs of the connector, say we can do 1.2 - 1.3 value here (20-30% memory headroom)

We are now trying to increase the tasks.max of the connector as we realized we might have untapped potential in our setup. ie. we have 1 MSK Connect connector configured with provisioned capacity as follows: 2 workers * 2 MCU/worker (MSK Connect Units - one MCU provides 1 vCPU & 4 GiB memory) which translates to 2 * (2 vCPU & 8 GiB) = 4 vCPUs & 16 GiB memory across the connector, but we only had tasks.max = 2 We are going to try now 4 workers * 2 MCU/worker and tasks.max = 8 to see if we recover from our lag. We'll keep this post updated with our findings, but we are open to any suggestions.

sfc-gh-wfateem commented 1 year ago

Hi @vtstanescu

You'll want to look at the number of partitions rather than the number of topics. How many partitions you can handle is not a straightforward question to answer and it's going to depend on a lot of factors. To illustrate using an extreme example, someone may have 100 partitions that only have incoming messages at the rate of 1 message/second, while someone else may have 1 single partition with a rate of 10,000,000 messages/second. The example with one single partition will be the higher load, and it's not necessarily true that your task will be able to keep up with that rate, which is why I mentioned that each user needs to do a benchmark test for their particular environment.

The ideal scenario is that you have a single task per partition while having already done some kind of preliminary benchmarking to determine the optimal rate of incoming messages per partition. If you determine that 1,000,000 messages per second is the optimal rate in your scenario, then given that previous example I provided, you'll want to scale out that single partition to 10 partitions to even out that load of 10 million messages per second and then increase the number of tasks from 1 to 10.

In your case, however, it does look like you had things running fine from the 24th of June until something happened between the 29th and the 30th. Kind of difficult to say what that is, but given that pattern in the chart, I would say you may likely be experiencing several rebalances around the time where you see an increase in lag, and then once rebalancing completes, data is consumed, and processed again where you see a slight drop in lag, and so on. If you're hitting frequent rebalancing while having a constant rate of incoming messages, then you can expect the lag to continuously increase gradually over time.

Having one single connector is fine, but you could also consider one connector per topic in order to span out the number of consumer groups. The only benefit of doing that is in case you have a problem with one single topic, then a consumer group rebalancing would only impact one connector (one consumer group), while the others can continue processing without any interruption.

vtstanescu commented 1 year ago

Getting back with some results from our experience, yet to do a benchmark in a separate environment. Interesting to see live performance nonetheless.

Check the 1st graph which shows BytesInPerSec & BytesOutPerSec (per partition - 5min average):

  1. This huge red line is the BytesOutPerSec after the scale out I described in my last message. KC was catching up on the lagging partition. Post going through this lag we reduced our KC connector back to 2 workers (2 MCUs each => 4 vCPUs, 8 GiB across the connector) with tasks.max set to 4. This means we have 3:1 ratio of partitions/task (as we have 6 topics with 2 partitions eachs => 12 partitions total). We do this to keep cost down as not all our topics produce at the same rate as the highest one.
  2. BytesIn get bigger than BytesOut and as a consequence OffsetLag increases (see second graph). My conclusion here is one partition gets less time in a task since that task handles other partitions too, though I'd expect the lag of the 2 partitions of the same topic to stay close to each other over time, but that's not the case (again, see second graph)
  3. This dip is us recreating the connector to increase the worker count (to 3, from 2) and tasks.max (to 6, from 4), in an attempt to cope with increased production rate. The dip is in BytesOut, since the consumer is down, producer is still going so BytesIn doesn't spike down (increases actually with our traffic pattern)
  4. After connector starts we can see it consumes at the same rate data is produced, although it is quite behind and it could do with consuming more to catch-up. It looks like it the ceiling is around 1.4MiB/s. For reference this topic gets over 7k messages/sec/partition at peaks, while the lows can be half of that at 3.5k messages/sec/partition.
  5. BytesIn starts to reduce, we receive less traffic, but BytesOut remains consistent as KC has catching up to do.

BytesIn & BytesOut PerSec (5 minutes average)

image

Offset Lag

image

MessagesInPerSec (5 minutes average)

image

sfc-gh-wfateem commented 1 year ago

Thanks for sharing @vtstanescu. So what conclusions are you drawing out of this test, and what question(s)/issue(s) do you need help with in this issue? I want to be sure we understand what the purpose of this issue is and what it is we're trying to tackle here.

vtstanescu commented 1 year ago

I think I'm looking to understand 2 things here:

  1. What are the limits of the connector? And, is there a way to estimate the amount of tasks, compute capacity and topic partitions needed to process a certain throughput? Given the limit I see above, it looks to me that we might need to increase the number of partitions, though a conservative Kafka estimate is that a partition can provide 10MB/s throughput, but we don't see that on the consumer side. Looking at the MessagesInPerSec metric, it looks like the buffer.count.records would be the first to be hit in order to flush a batch to Snowpipe, not the buffer.flush.time or buffer.size.bytes. Can this be our problem, though I don't see how?
  2. Can the README of the project have more details about the inner workings of the connector. Details like how many tasks are created, based on what parameters, and what a task does? These might be helpful for someone finding himself in a performance issue, like I presented, to pinpoint a bottleneck and then changer certain parameters based on data, rather than trial and error. The trial and error part is what seems like we need to do for our case, test throughput in a separate environment and tweak parameters until we find what works, but the tweaking part doesn't seem to be done based on data & knowledge but rather just trying combinations. Maybe I don't see/understand what I should look at to know what parameters to tweak to achieve desired performance, so that might be the end goal of what I want to get out of this issue.
sfc-gh-wfateem commented 1 year ago

@vtstanescu What is your throughput target? Is that 10 MB/s then? And is the expectation here to process that using one single task? Concerning your data, how large are your messages (average, and maximum size)? Are the messages in Avro format or plain String? What converters are you using to serialize and deserialize your data? Can you give us an example of what your message looks like structure-wise? Is your MSK cluster on AWS in the same region as your Snowflake account? If not, what regions are involved here?

Finally, for all these metrics you have provided, are we still talking about the ingestion method SNOWPIPE, or are we talking about SNOWPIPE_STREAMING?

If you haven't tried SNOWPIPE_STREAMING then can I ask you to run another quick test and let us know if you see an improvement in throughput and whether or not that meets your expectations?

Refer to the following documentation: https://docs.snowflake.com/en/user-guide/data-load-snowpipe-streaming-kafka

Trying to briefly address some of your questions:

Looking at the MessagesInPerSec metric, it looks like the buffer.count.records would be the first to be hit in order to flush a batch to Snowpipe, not the buffer.flush.time or buffer.size.bytes. Can this be our problem, though I don't see how?

The connector will flush out the data from memory based on whichever parameter was satisfied first (per partition). You previously mentioned the following configurations:

buffer.flush.time=300
buffer.size.bytes=209715200
buffer.count.records=200000

You're going to flush out that data after 300 seconds in case you never produce 200,000 records or manage to read 209715200 bytes ( ~209 MBs) within that time.

The charts you provided show that at most you have spiked to 1.6 MB/s for a very short period of time. At that rate, you would flush out that data after 130 seconds. Messages/sec chart tells me that you are in fact more likely to be hitting that buffer.count.records value first at 200k. If at peak you're receiving 7.5k messages per second, then at the 26th second mark you would have reached 200k messages.

Details like how many tasks are created, based on what parameters, and what a task does?

A big part of this is based on Kafka Connect. Your tasks.max parameter is going to dictate how many consumers you have running in your sink connector's consumer group, whether that's Snowflake connector or anything else like the S3 connector. So in theory, you're going to have one task that consumes from one single partition (or more). I recommend you read the first comment in the following Jira: https://issues.apache.org/jira/browse/KAFKA-13136

A task here is going to be Kafka Connect SinkTask: https://kafka.apache.org/20/javadoc/org/apache/kafka/connect/sink/SinkTask.html

Which every Sink connector implements, in our case the Snowflake Sink Task implementation is here: https://github.com/snowflakedb/snowflake-kafka-connector/blob/master/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTask.java#L54

The process of reading and deserializing messages is handled by the Kafka Connect layer using the Kafka Consumer running in the worker node. That data is passed along to us in the put() method which we then process (buffering in memory, and flushing out to file, moving the file to Snowflake, and sending a request to Snowpipe to ingest the file).

vtstanescu commented 1 year ago

Q&A

Q: What is your throughput target? Is that 10 MB/s then? A: We don't need that much, but we can see at peaks we produce ~1.6MB/s per partition and this will grow (until a point we start using more partitions for the topic in question). So we expect the consumer (SF Kafka Connector) to keep up with that, while we are within the 10 MB/s partition throughput. Again, the 10 MB/s is a conservative, general expected throughput of a Kafka partition, not our current throughput.

Q: And is the expectation here to process that using one single task? A: If we are talking about the 10MB/s per partition, it would be nice to know that the SF Kafka Connector can process that much as long as Kafka can give it that throughput, but understanding there will be some overhead flushing to Snowpipe, we expect that to be lower, but not 1.4MB/s as we see in our production environment.

Q: Concerning your data, how large are your messages (average, and maximum size)? A: For this topic that has this issue the average message size is 1.1 KiB, maximum goes to 5.65 KiB, while the minimum is 0.8 KiB. I got these figures from the Snowpipe table using LENGTH(RECORD_CONTENT). The maximum value seems an outlier, I'll check separately on that with the dev team.

Q: Are the messages in Avro format or plain String? What converters are you using to serialize and deserialize your data? Can you give us an example of what your message looks like structure-wise? A: Messages are one-line JSONs to keep message size to a minimum. The key.converter is org.apache.kafka.connect.storage.StringConverter, though we don't key our messages, thus their are round-robin balanced between the 2 partitions. The value.converter is com.snowflake.kafka.connector.records.SnowflakeJsonConverter

Q: Is your MSK cluster on AWS in the same region as your Snowflake account? If not, what regions are involved here? A: All infrastructure (MSK, MSK Connect & Snowflake account) is in the same region of the AWS cloud. We don't use AWS PrivateLink to connect to Snowflake.

Q: Finally, for all these metrics you have provided, are we still talking about the ingestion method SNOWPIPE, or are we talking about SNOWPIPE_STREAMING? If you haven't tried SNOWPIPE_STREAMING then can I ask you to run another quick test and let us know if you see an improvement in throughput and whether or not that meets your expectations? A: It's SNOWPIPE, without streaming. We are going to try it, but I'm pending a dev environment in Snowflake to test it out before rolling it into production. I'll present more info when after I complete that test.

KC Tasks assignment and MSK Connect

Thank you for sharing the links regarding how KC creates and assigns tasks. This is been very helpful for me (with less than 1 year experience operating Kafka & KC). The comments in this ticket, https://issues.apache.org/jira/browse/KAFKA-13136, made me think whether this can be an issue in MSK Connect, which uses Kafka 2.7.1 version for the client library. Amazon doesn't support other versions for MSK Connect, though for MSK (Kafka clusters) they support more versions (we currently use 3.3.1). Back to the issue, I found the following interesting log messages (remind you that we use v1.8.1 of the Snowflake Kafka Connector):

[Worker-0178b67d48ee8ac28] [2023-07-07 13:39:23,213] INFO [REDACTED_MSK_CONNECT_CONNECTOR_NAME|task-3] [SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:3]:open, TopicPartition number: 0 (com.snowflake.kafka.connector.SnowflakeSinkTask:244) [Worker-0178b67d48ee8ac28] [2023-07-07 13:39:23,239] INFO [REDACTED_MSK_CONNECT_CONNECTOR_NAME|task-0] [SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:0]:open, TopicPartition number: 6 (com.snowflake.kafka.connector.SnowflakeSinkTask:244) [Worker-00c15c99fba9659ce] [2023-07-07 13:39:23,215] INFO [REDACTED_MSK_CONNECT_CONNECTOR_NAME|task-4] [SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:4]:open, TopicPartition number: 0 (com.snowflake.kafka.connector.SnowflakeSinkTask:244) [Worker-00c15c99fba9659ce] [2023-07-07 13:39:23,238] INFO [REDACTED_MSK_CONNECT_CONNECTOR_NAME|task-1] [SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:1]:open, TopicPartition number: 6 (com.snowflake.kafka.connector.SnowflakeSinkTask:244) [Worker-0d928ae7cb870fd2b] [2023-07-07 13:39:23,214] INFO [REDACTED_MSK_CONNECT_CONNECTOR_NAME|task-2] [SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:2]:open, TopicPartition number: 0 (com.snowflake.kafka.connector.SnowflakeSinkTask:244) [Worker-0d928ae7cb870fd2b] [2023-07-07 13:39:23,215] INFO [REDACTED_MSK_CONNECT_CONNECTOR_NAME|task-5] [SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:5]:open, TopicPartition number: 0 (com.snowflake.kafka.connector.SnowflakeSinkTask:244)

I cross checked this realization with kafka-consumer-groups.sh --describe --members --verbose and kafka-consumer-groups.sh --describe --state and I think we pinpointed our problem: No matter how big we make tasks.max the default RangeAssignor of the consumer group, will not be assigning any partitions to more tasks than the highest partition count of a topic from the topics consumed by the connector.

This seems to be the issue of MSK Connect, using v2.7.1 of Kafka client library. We have another consumer group used by AWS Lambda ESM and that has the strategy cooperative-sticky. AWS seems to point this out here: https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-troubleshooting.html

We will try the CooperativeStickyAssignor to see if this improves our performance, assuming the Snowflake Kafka Connector's poor performance was caused by the fact that one task had to deal with 6 partitions.

vtstanescu commented 1 year ago

We fixed our issue by using a different assignor than the default RangeAssignor one. We ended up with StickyAssignor one (eager rebalancing), instead of the CooperativeStickyAssignor that does incremental cooperative rebalancing. That's because the Kafka Connect framework has a bug in 2.7.1 causes it to stop storing offsets in the consumer group when a incremental cooperative rebalancing strategy is used. See the Kafka bug report for more details.

Unfortunately Amazon only support this version of the Kafka client library in their MSK Connect service. From the Snowflake Kafka Connector side, everything looks fine, our performance figures look good now.

sfc-gh-wfateem commented 1 year ago

Thanks for the update @vtstanescu Can you share what your throughput looks like now and how many tasks you're using?

vtstanescu commented 1 year ago

The throughput increased overall as the producer is also producing more data, the key take away is the connector is keeping up with the production rate, and we have no more lag (than the buffer values we use). We are currently operating with 6 tasks, so we have 2 partitions/task across all the topics the connector subscribes to.

We'll be looking to test a few things in the near future:

jsrcodes commented 1 year ago

Hi @vtstanescu , thanks for this discussion. We are also running into a similar issue, but in our case we use Snowpipe Streaming. I suspect in our case, the lower retention and frequent Network I/O to shift the rolled segments to remote storage is likely affecting the broker's egress to consumers. The retention settings we use: segment.ms = 10 minutes, local.retention.ms = 15 minutes, remote storage retention = 3 days).

May I know your EBS storage volume per broker? We currently set it to 80GB which is why we went with such a shorter local retention.

vtstanescu commented 1 year ago

@jsrcodes Sounds like you are looking at the Kafka broker being a problem, not the Snowflake KC connector, so it's outside the scope of this issue. In my use case, we didn't touch the segment configuration, so it's Kafka's default (7d). That being said, we allocate way more storage for the brokers.