Open cotedm opened 7 years ago
Hi,
I am trying to analyze the heap size for the s3-sink connector. We have 2 K8s pods (replica) running with 6 registered different connectors. Each connector is pointing to a different topic, each with 16 partitions. As we have tasks.max =16, I guess each pod would have 8 tasks (assuming the distribution of tasks among pods)running in parallel. I am seeing that the heap size of that container is always ~7GB in stage (not much data coming) and ~12GB in prod (having the same config). I am not sure how come it is deciding to take fix constant such high memory My understanding was each pod should take (5MB(s3 part size) + ~2MB(consumer.max.partition.fetch.bytes)) 8(partitions) 6(connectors) = 340MB But seeing container is taking 7GB memory.
I am seeing it is flushing every 5 min (rotate.schedule.interval.ms). We don't have much data coming so seeing max 10KB (around 10 records) files getting flushed every 5 min.
Below is the configuration {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"partition.duration.ms": "300000",
"s3.region": "us-east-1",
"topics.dir": "data/live",
"schema.compatibility": "NONE",
"flush.size": "50000",
"topics": "commerce.activity.subscription",
"timezone": "UTC",
"tasks.max": "16",
"s3.part.size": "5242880",
"locale": "US",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"name": "event-s3-sink.commerce.activity.subscription",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"s3.bucket.name": "peopleconnect-prod-datalake-ldz",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH'00'",
"rotate.schedule.interval.ms": "300000"
}
Thanks in advance.
Niket
Hi, I am facing similar problem..Any updates here?
hi @niketanand123
you said
6 registered different connectors. Each connector is pointing to a different topic, each with 16 partitions.
so you have 2 worker nodes, each worker node runs 6 connectors, each connector processes 6 topics, and reads from 8 partitions on each topic, is that correct?
If so, and if you are using the default partitioning scheme, your calculations for one worker node would look like this:
6 connectors 6 topics 8 partitions * (s3.part.size + consumer.max.partition.fetch.bytes)
which is
6 6 8 * (5 + 2) = 2016MB
So in short, you also need to multiply your result by 6 (topics), as connector creates a file for each topic/partition pair.
As for the last 5GB that you see, I'd advise to take a heap dump of the worker node and see what takes that amount of RAM
@sergeykranga can you help me? I have about 3400 partitions on 30 different topics. My s3.part.size is 5MB. I have 30 tasks(1 task for each topic) across 2 worker nodes What should be my min and max heap size for kafka connect?
@sxganapa it depends on your partitioning scheme. if the default is used, then kafka connect allocates, at least, n_of_topics n_of_topic_partitions s3.part.size
In your case, in the worst case scenario, you will at least need 30 3400 5MB = 510000MB (510GB) of RAM (😱 ). If this is what you really would like to do, you'd probably want to use more worker nodes. Still a lot though imho.
I have been researching for a while now trying to figure out a programmatic way of calculating potential RAM consumption given some variables (like number of topics, partitions, s3.part.size etc.), but so far, while results being consistent with one of our production clusters, it was not reflecting reality for another cluster, so there is more to it than I understand.
So the above is a rough estimation and does not include RAM that kafka related processes allocate.
What's not cool i think is that kafka-connect needs at least 5MB of memory (min allowed for S3 multi-part upload). I'd really like if it was possible to tell it not to use multi-part if the file size is less than 5MB, then for each file we could configure it to allocate, say, 500KB of RAM. That would make it much easier to run in prod environments where there are many topics with dozens/hundreds of partitions. What's also sad is that even if you just have a dozen 1KB events per hour, it still allocates full 5MB of RAM for a single file
@sergeykranga thank you. I am using TimeBasedPartitioning with wall clock as the timeextractor. Also 510 Gb is it across all nodes or just one node. Also is this max heap needed correct?
@sxganapa 510GB is for all worker nodes. It is not the max heap, it is the size of RAM that it just allocates for s3.part.size (basically this is how much memory it allocates for one s3 file that it will upload to S3). I'd add at least 1/2 of that for other things that need memory (so you'd end up with 715GB of RAM). I'd start from there and see how it goes.
Just to give an example, at my place, there is 1 topic with 60 partitions. In my case I have a custom partitioning scheme that is also based on the client number in kafka events, and it allocates s3.part.size per client number, per partition, per topic.
So in the worst case of mine, if it reads 200 records from a each partition (20060 total records), and there are 200 unique client numbers in each batch, then in my case it would need 200 unique client numbers 60 partitions 1 topic s3.part.size (5MB) = 60000MB (60GB) of RAM.
We run 6 worker nodes, and we'd expect each node to use 10GB of RAM. In reality we see ~20GB RAM usage. I'm still working on understanding the rest of RAM usage for my personal understanding.
@niketanand123 I guess I am seeing the same situation at my place. RAM usage on the worker node reaches 98.5% and stays at that mark all the time.
After some researching and talking with people who know more about JVM memory management, what I think happens is that in reality, kafka connect still only allocates what it needs to, and no more. The reason we see higher RAM usage is because the JVM itself does not release the used memory to the OS, and keeps it for later if that RAM is needed in future (I found this page to be helpful in understanding the general problem https://www.geekyhacker.com/2019/01/04/jvm-does-not-release-memory/)
You can verify the actual RAM usage by forcing a full GC. For that, I usually ssh into a worker node under question and run
free -m # to get save current usage
jmap -histo:live `pidof java` # or take the pid of the worker process by any other means
free -m # this should now show more available RAM
In my case, RAM drops from 98.5% to 67.7% (and grows again back to 98.5% in a day or so). I'd like to hear about the results in your case :)
From looking at a heap dump, it looks like the connector may allocate s3.part.size
3 times for each topic and partition:
Depending on whether you have store.kafka.headers
and store.kafka.keys
enabled, this may change because it looks like the allocation only happens when these options are true
.
Is that your experience too?
Ideally, we'd want to be able to tall each Kafka Connect machine what tasks to run, without having to spin up separate Kafka Connect clusters. Currently, it looks like on a cluster with 2 instances, 2 tasks from 1 connector sometimes run on a single instance, leaving the other cluster instances handling other trivial connectors with low RAM usage. The result is that RAM usage is not evenly balanced across the cluster. Maybe we're missing a specific configuration option?
There should be some documentation added around operationally how you should run this connector. Draft formula for sizing heap of a worker that runs this connector:
Heap max size >= s3.part.size * #active partitions on a single node + 100MB
The 100MB is for constantly needed things for the JVM and is static. The #active partitions on a single node is the result of assignment of partitions to tasks. In a balanced stage, this should be roughly at most: ceil(#partitions / #connect nodes). A concrete example: s3.part.size = 100MB total number of partitions = 10 number of tasks = 4 number of workers = 4
In this case, if all tasks are evenly balanced across all workers, you would need to set the heap size to 100MB * 3 + 100MB. However, if you set it like this and one worker fails, you will need to pick up the task for that worker and thus the extra partitions.
The S3 connector does not split records in parts. Thus, transferring records of 1GB each with s3.part.size = 100MB won't work. Nonetheless, this requirement seems rather excessive for regular use. Normally, reasonable values for part size should fit several records (again whole records). Based on that, a single file containing several records is uploaded to S3 using a multi-part upload, where each part has size approximately equal to s3.part.size (depending how many records can fit in s3.part.size) plus, potentially, a last part with the remaining records of the file.