Open raphaelhamonnais opened 6 years ago
I was able to remove the OOMs errors by correctly setting the configuration.
Given that I'm using a Record
timestamp, setting rotate.interval.ms
is very important because it actually uses the timestamp
returned by the timestamp extractor to calculate whether or not it has to flush the buffers.
Therefore, when processing 24h of late data partitioned by Record
, if I set rotate.interval.ms
to 10 minutes, the connector will flush every 10 record-time-minutes, meaning every second or so wallclock-time wise.
This keeps the connector from having too many buffers allocated at the same time when the throughput is lower than the flush.size
and when it reads all of the late data in less than rotate.schedule.interval.ms
.
Conclusion: the documentation https://docs.confluent.io/current/connect/connect-storage-cloud/kafka-connect-s3/docs/configuration_options.html would really beneficiate from more detailed explanations and examples.
I was finally able to understand how the Heap Size usage works in the Kafka Connect S3 Connector
paths
paths
are partitioned depends on the partitioner.class
parameter;partition.duration.ms
will then determine the duration of each partitioned paths
.s3.part.size
Bytes per Kafka partition (for all topics read) and per partitioned paths
timestamp.extractor
set to Record
, partition.duration.ms
set to 1h, s3.part.size
set to 50 MB
20 * 50 MB
= 1 GB;timestamp.extractor
being set to Record
, messages having a timestamp corresponding to an earlier hour then the one at which they are read will be buffered in this earlier hour buffer. Therefore, in reality, the connector will need minimum 20 * 50 MB * 2h
= 2 GB of memory because there is always late events, and more if there is events with a lateness superior to 1 hour;timestamp.extractor
is set to Wallclock
because there will virtually never be late events as far as Kafka Connect is concerned.rotate.schedule.interval.ms
time has passed
rotate.interval.ms
time has passed in terms of timestamp.extractor
time
timestamp.extractor
is set to Record
, 10 minutes of Record
time can pass in less or more and 10 minutes of actual time
rotate.interval.ms
is set to 10 minutes then this condition will trigger every second (as it should);rotate.interval.ms
has passed since the condition last triggered.flush.size
messages have been read in less than min(rotate.schedule.interval.ms
, rotate.interval.ms)
rotate.interval.ms
, this condition might never trigger if there is not enough messages.Kafka partitions * s3.part.size
Heap Size at the very least
Record
timestamp for partitioning, you should multiply it by max lateness in milliseconds / partition.duration.ms
max lateness in milliseconds
.consumer.max.partition.fetch.bytes
bytes per partition when it reads from Kafka
I'll let you close the issue if my answer about memory usage is correct π
RE: the documentation would really beneficiate from more detailed explanations and examples.
PR's are welcome - https://github.com/confluentinc/kafka-connect-storage-cloud/blob/master/kafka-connect-s3/docs/configuration_options.rst
@raphaelhamonnais Thank you! That was really helpful.
I'm curious about the following point:
Is this a bug or by design? We are currently running into an issue with this one when we process a lot of historic data. We try to write data based on the time partitioning by day. Even though the records are all ordered by time, the connector will create a new buffer for each day and not commit until we actually reach the rotate.schedule.interval.ms
or flush.size
. We will run out of memory within minutes if we run this against our historic data in kafka. From my point of view it would make sense to commit the buffer whenever we see a new entry that is out of the current rotate.interval.ms
frame or keep the latest buffer to handle late arrivals.
I'd love to contribute here, if that change of logic is appreciated.
Hi @nicolasbaer, I'm glad this was helpful for you π.
As for the rotate.interval.ms, this condition might never trigger if there is not enough messages
The important part of this point is if there is not enough messages. Meaning that if there is not enough messages flowing in to trigger the fact that rotate.interval.ms
has passed since the last time it triggered, it will never trigger the flushing. To my understanding, this is correctly designed.
What is the value of your rotate.interval.ms
setting?
Edit: could you give your S3 Connector JSON settings?
@raphaelhamonnais This should reflect the relevant parts from our configuration:
"s3.part.size": "5242880",
"flush.size": "100",
"rotate.schedule.interval.ms": "86400000",
"partition.duration.ms": "86400000",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"timestamp.extractor": "Record",
For the test we have a single topic with 16 partitions. If I understand the settings above correct it should keep the following size in memory: 16 partitions * 5mb s3.part.size + 16 partitions * ~2mb consumer.max.partition.fetch.bytes
However, when we do not reach the flush.size
setting and let's say worst case write a single line per time partition (in this case a day), the memory consumption would actually be: 100 flush.size * 16 partitions * 5mb s3.part.size + 16 partitions * ~2mb consumer.max.partition.fetch.bytes
.
I think you are missing the rotate.interval.ms
parameter allowing Kafka Connect to rotate when it sees a message that goes over the rotate.interval.ms
since the last message it has seen.
If you set it for instance to 10 minutes and send a message with a Record timestamp equal to 12pm and right after another message with a Record timestamp equal to 12:11pm, it should trigger the flushing.
In general when using Record
timestamp, you should always set the rotate.interval.ms
parameter in addition to the rotate.schedule.interval.ms
parameter.
Could you try to set it and tell me how it goes?
However, when we do not reach the flush.size setting and let's say worst case write a single line per time partition (in this case a day), the memory consumption would actually be: 100 flush.size 16 partitions 5mb s3.part.size + 16 partitions * ~2mb consumer.max.partition.fetch.bytes
I think this is normal because the flush.size
is equal to 100, thus it will flush after seeing 100 messages. But you send 100 messages with a timestamp in a different partition.duration.ms
location (here in a different day0 and thus it will require as many buffers as partition.duration.ms
he founds. And because there is no rotate.interval.ms
specified, it will never trigger unless it sees +100 messages or rotate.schedule.interval.ms
real wall clock time passes. Does that make sense?
@raphaelhamonnais thanks again. You're right, I got somewhat confused with the partitioner.duration.ms
and forget to configure the rotate.interval.ms
. Now it seems that my memory issue is gone.
@nicolasbaer Glad to know it worked out π.
Hi,
I am trying to analyze the heap size for the s3-sink connector. We have 2 K8s pods (replica) running with 6 registered different connectors. Each connector is pointing to a different topic, each with 16 partitions. As we have tasks.max =16, I guess each pod would have 8 tasks (assuming the distribution of tasks among pods)running in parallel. I am seeing that the heap size of that container is always ~7GB in stage (not much data coming) and ~12GB in prod (having the same config). I am not sure how come it is deciding to take fix constant such high memory My understanding was each pod should take (5MB(s3 part size) + ~2MB(consumer.max.partition.fetch.bytes)) 8(partitions) 6(connectors) = 340MB But seeing container is taking 7GB memory.
I am seeing it is flushing every 5 min (rotate.schedule.interval.ms). We don't have much data coming so seeing max 10KB (around 10 records) files getting flushed every 5 min.
Below is the configuration {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"partition.duration.ms": "300000",
"s3.region": "us-east-1",
"topics.dir": "data/live",
"schema.compatibility": "NONE",
"flush.size": "50000",
"topics": "commerce.activity.subscription",
"timezone": "UTC",
"tasks.max": "16",
"s3.part.size": "5242880",
"locale": "US",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"name": "event-s3-sink.commerce.activity.subscription",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"s3.bucket.name": "peopleconnect-prod-datalake-ldz",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH'00'",
"rotate.schedule.interval.ms": "300000"
}
Thanks in advance.
Niket
Niket, have you tried taking a heap dump?
No, I haven't, as those are running in k8s pods. As there is nothing unusual setup of this s3-sink-connector, I am trying to learn how heap size one should set up and how are my current running connectors taking so my much memory?
Are you not allowed to kubectl exec into the container?
Do you observe the same if using your local machine to join the connector group?
When I run Kafka connect (pointing to prod Kafka) with one connector in standalone mode locally, I am seeing that heap is not so high and it lowers down after a while (5min) but in PROD, it is running with 6 connectors but heap size is constant high but stable. I will try to run all 6 connectors locally to see the heap size.
Thanks
On Sat, Feb 8, 2020 at 2:40 PM Jordan Moore notifications@github.com wrote:
Do you observe the same if using your local machine to join the connector group?
β You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/confluentinc/kafka-connect-storage-cloud/issues/177?email_source=notifications&email_token=ABOVNGITOLUL2STPIDPWVODRB4YFNA5CNFSM4FG4DFM2YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOELF5LKA#issuecomment-583783848, or unsubscribe https://github.com/notifications/unsubscribe-auth/ABOVNGN3PPE5DJOS4UX2F4LRB4YFNANCNFSM4FG4DFMQ .
@OneCricketeer @raphaelhamonnais Thanks for this detailed explanation. Could you also tell how exactly did you configure the heap size for the connector? Is configuring KAFKA_HEAP_OPTS with the required memory enough?
- name: KAFKA_HEAP_OPTS
value: -Xms256m -Xmx4g
by exporting KAFKA_HEAP_OPTS
indeed
we generally set it to be a bit less than the memory allowed to the Pod (in k8s world) to avoid OOMs
Thank you very much both of you. It works now.
set it to be a bit less than the memory allowed to the Pod
If you use Java11, this is done automatically for you.
If you use Java11, this is done automatically for you.
π π ... soon π€
Hi @raphaelhamonnais , thanks for the post.
I'm using "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner"
with "rotate.interval.ms": "60000"
and "path.format": "'date'=YYYY-MM-dd/'hour'=HH", and in fact 1 minute worth of kafka messages are stored on each S3 file, but there are as many files as partitions, so I end up with <#partitions * 60> files on each hour directory. Is there a way to get one S3 file with one minute worth of messages of all the topic's partitions?
Is there a way to get one S3 file with one minute worth of messages of all the topic's partitions?
There isn't @pablocavalieri because there's one consumer thread per partition, as per standard Kafka consumer group protocol.
Hello,
As this si still opened, I jump on it ! @raphaelhamonnais I'm curious about that: "The S3 Connector will allocate a buffer of s3.part.size Bytes per Kafka partition (for all topics read) and per partitioned paths" What is for you the partitioned path ?
Here is my configuration:
I'm in nominal situation where there is no late data. I plan to have 50MB files (s3.part.size).
But of course, I face OOM issues (that's why I'm here !) For now, what I calculate for heap size is: (6 x 4 x 50MB + 6 x 4 x 2MB) x 2 (50% of heap size) ~ 2.5 Gb But with 3Gb in Xmx it's still go out of memory.
I do not get the point ... Do I need to multiply it by 80 ?
Thank you !
Hey @remy-voodoo
What I mean by "partitioned paths" is how is your end data partition by.
Some examples:
YYYY/MM/DD/HH
, meaning that data files are written under the hour they belong too
1
rotate.interval.ms
set correctly is needed to avoid OOMs (see https://github.com/confluentinc/kafka-connect-storage-cloud/issues/177#issuecomment-421944366)YYYY/MM/DD/HH
, I'm also reading an int value from the payload, and adding a modulo 64 suffix:
YYYY/MM/DD/HH/mod-1
, YYYY/MM/DD/HH/mod-2
, YYYY/MM/DD/HH/mod-3
, etc.How are your output files partitioned exactly?
I plan to have 50MB files (s3.part.size)
That setting doesn't control flushed file sizes
Personally, we run connect with over 8G xmx because we keep adding more and more topics
Thanks for your replies ! @OneCricketeer @raphaelhamonnais I finally succeed in not getting OOM anymore. But now I have huge lag !
One example: Producer = topic1 - 6 partitions - 2k5 messages / sec - 400kb / sec - constant producing workflow Kafka Connect Consumer = 6 KafkaConnect servers - 6 tasks.max - ~80kB / sec triggered every 3min
Kafka connect config is as follow: "flush.size": 2000000, "s3.part.size": 52428800, "rotate.interval.ms": "300000", "rotate.schedule.interval.ms": "300000",
I don't understand the behavior of the connect ...
Why I have such lag as a classic consumer can handle the workload ? (Meaning that nb of partitions is enough) Is the heap size influences the lag ? (Kafka connect servers monitoring shows that all the servers mem usage stay at 3Gb constantly) Why Kafka Connect acts like flushing data and not consuming and storing it in memory ? What's the config controlling that ? Why I do not have OOM anymore but lag now ? xD
Seems that this issue looks like StackOverflow discussion now ... Sorry for that !
Based on your config, your Kafka Connect consumer should commit files
"rotate.interval.ms": "300000",
"rotate.schedule.interval.ms": "300000",
What exactly is a "huge" lag?
Lag was due to my custom partitioner that is inefficient I think ... If I use the classic TimeBasedPartitioner it's working perfectly fine and is able to commit all my workload -_- Sorry and thanks for the help @OneCricketeer @raphaelhamonnais
Hello @raphaelhamonnais,
As you said yesterday: Say that in addition of partitioning by YYYY/MM/DD/HH, I'm also reading an int value from the payload, and adding a modulo 64 suffix: e.g YYYY/MM/DD/HH/mod-1, YYYY/MM/DD/HH/mod-2, YYYY/MM/DD/HH/mod-3, etc. In this case, the number of partitioned paths just got multiplied by 64 That's what I have but reversed field_n/YYYY/MM/DD/HH where field_n can have 80 different values. So the number of partitioned paths increase and that's what I need ! My partitioner extends TimeBasedPartitioner adding the extra field which is read from the payload.
Using this partitioner cause my lag (a lot of lag) and strange behavior while consuming the data from topics. I opened a separate issue here you can find more info there : if you can take a look π
Thanks again for your time.
As far as queries go, Hive/Spark/Presto prefer lower cardinality partition values first
And partitioning doesn't cause lag... Keeping files uncommitted does
Of course it's not caused by partitioner but if I have latency in the partitioner, it will result in uncommited files. The partitioner is not so complicated and time spent partitioning the data is not so huge. The strange thing is that it's because I'm using this partitioner that I see this lag.
I did not get the point on the data cardinality.
The purpose of partitioning (other than path separation) is to have predicate push down in your query engine. Imagine like a funnel, for example, 5 years, 12 months, 365 days, etc. In other words, your field of 80 might make sense between month and year rather than after the day
Oh yes as I was saying before my field is at the higher level in fact: field_n/YYYY/MM/DD/HH My field make sense all time.
Hello! We have following s3 connector configuration:
flush.size: 100
format.class: io.confluent.connect.s3.format.json.JsonFormat
key.converter: org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable: false
transforms: createKey
transforms.createKey.fields: id
transforms.createKey.type: org.apache.kafka.connect.transforms.ValueToKey
locale: en
partition.duration.ms: 600000
partitioner.class: io.confluent.connect.storage.partitioner.TimeBasedPartitioner
prefix.add.prefix: event=
prefix.remove.regex: kafka-connect-
path.format: "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH"
rotate.interval.ms: 600000
rotate.schedule.interval.ms: 600000
s3.bucket.name: $DATALAKE_S3_BUCKET
s3.compression.type: none
topics.dir: raw
s3.part.size: 5242880
s3.region: eu-central-1
schema.compatibility: NONE
storage.class: io.confluent.connect.s3.storage.S3Storage
timestamp.extractor: RecordField
timestamp.field: updated_at
timezone: UTC
topics.regex: kafka-connect-
value.converter: org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable: true
As far as I understood from the previous comments, if we ingest historical data rotate.interval.ms
should be set to avoid having a OOM.
@raphaelhamonnais Could you please have a look and share your input if this configuration makes sense ?
Also does it make sense to set partition.duration.ms: 600000
?
Thanks!
As far as I understood from the previous comments, if we ingest historical data rotate.interval.ms should be set to avoid having a OOM.
Yes, although in your case, because you have flush.size: 100
, you're going to commit every 100 payloads read, which should be fine unless you have really big payloads.
Your setting looks fine to me. You can try it out by resetting your consumer offset to ~2 days in the past (or less if you have less retention in your Kafka topic(s)) and see what happens. You should get a higher workload than usual, so you might need to bump your Kafka Connect cluster size a bit, but looking at the config, you shouldn't have OOMs.
Not sure what I need to set for my heap size. There are about 38 topics ,3400 partitions. i have set the rotate.interval.ms to 20 minutes and rotate.schedule.interval.ms to -1. I have 1 connector 3 worker nodes for these 38 topics. These are low volume topics we get ~300MB of data in a day. Also my "s3.part.size": "5242880" Can some one help me ?
Are you currently getting OOM? Default heap is 2G... If you need more, then I suggest monitoring it using external tooling
@sxganapa I suggest using Stackoverflow for diagnostic issues
@raphaelhamonnais Hi, thank you for your explanation. It was very helpful!
But I still have confusion about rotate.interval.ms
and rotate.schedule.interval.ms
.
tl;dr - Is there any duplicate message if I use both rotate.interval.ms
and rotate.schedule.interval.ms
?
I'm using s3 sink connector and I set config below.
My requirement
flush.size: 10000
rotate.interval.ms: 3600000 # 1H
timestamp.field: my_timestamp_field
timestamp.extractor: RecordField
partition.duration.ms: 3600000 # 1H
path.format: YYYY/MM/dd/HH
But the connector didn't flush message to s3 because there aren't enough messages in broker(less than 10000)
How can I fix connector config? Should I use rotate.interval.ms
and rotate.schedule.interval.ms
? Is there any duplicate message if I use both rotate.interval.ms
and rotate.schedule.interval.ms
?
@JaeHyeokLee Yes you should use rotate.schedule.interval.ms
set to 1H too. There is no duplication between rotate.interval.ms
and rotate.schedule.interval.ms
.
If you have a low throughput, it's totally possible that you didn't saw more than 1H in record time. Using rotate.schedule.interval.ms
will guarantee that your connector will flush every hour at minute 00
no matter what.
@JaeHyeokLee there is good documentation here: https://docs.confluent.io/5.5.3/connect/kafka-connect-s3/index.html#s3-object-uploads Also scroll down to Exactly-once delivery on top of eventual consistency.
As @raphaelhamonnais suggests, it often seems to not be working on low-throughput topics. That is because time is extracted from records. Until there are new records on partition, time does not advance.
However, with rotate.schedule.interval.ms
the server's time will be used to determine when to flush: daily=00:00, hourly=00:00, 01:00, 02:00, etc... It says this in the documentation I've shared: setting rotate.schedule.interval.ms
(or combining them) is nondeterministic and will invalidate exactly-once guarantees, leaving the possibility of duplicates.
@raphaelhamonnais @dylanmei
Thanks for reply. However Iβm not sure whether messages arrive more than flush.size or not.
(Because I canβt predict server traffic exactly)
If I set
rotate.interval.ms: 10m
rotate.schedule.interval.ms: 10m
flush.size: 10000
And if more than 10000 messages arrive within 10 minute, is this config guarantee exactly-once and fine for late arriving data? (With no duplicate messages)
I didn't understand config priority (or interaction) when I use both rotate.interval.ms
and rotate.schedule.interval.ms
Hi @raphaelhamonnais and others, i am using S3 Sink connector and able to write data to S3 successfully , but i have below questions. I have 64 partitions in my topic , i see there are 64 output files in my s3 bucket per hour (one file per partition) , is there a way we can reduce the number of output files ? (without reducing the partitions), because due to too many files in the out put our downstream jobs are suffering while reading and doing some transformations.
And also is there a way to redirect few records to a different topic based on filter condition using s3 sink connector (RegexRouter is not working with S3 Sink connector) ?
You reduce amount of files using the configs mentioned above - flush, rotation, or partition duration. Larger values for any of these will accumulate records longer.
RegexRouter should work fine. What issues are you having with it? Any transform will apply to every record.
@OneCricketeer We have below settings as we need to rotate only for every 1 hour , we see 64 output files as minimum. rotate.schedule.interval.ms=3600000 flush.size=125000000
We have below configs for RegexRouter, but we see NullPointerException when using with S3SinkConnector transforms=dropPrefix transforms.dropPrefix.type=org.apache.kafka.connect.transforms.RegexRouter transforms.dropPrefix.regex=soe-(.*) transforms.dropPrefix.replacement=$0
I can see there is a issue with RegexRouter in the below link , iam facing the same issue. https://github.com/confluentinc/kafka-connect-hdfs/issues/236
Hello,
I am currently having a hard time figuring out the correct Heap Size provisioning for a S3 Sink Connector (version 3.3.1) cluster using the
timestamp.extractor=Record
property.Here is the connector's configuration:
Note that the
com.datadog.kafka.connect.storage.partitioner.DatadogTimeBasedPartitioner
just put the topic at the end of the result ofgeneratePartitionedPath
method.The connector runs fine in real time, but throws
OutOfMemory
errors when processing late data because of the partitioning in different files every 10 minutes (partition.duration.ms
is equal to 10 minutes). Each node has a Heap Size of 12GiB and when recovering from late data, there is as many nodes as partitions to read from (a partition is processed by a single node). Note that using theWallclock
timestamp extractor removes the OOMs, meaning that the file partitioning adds memory usage. Our assumption is that all of the partitioned files are processed in a very short time, leading to a lot of parallel buffering.From what I read in https://github.com/confluentinc/kafka-connect-storage-cloud/issues/29, here is the minimum Heap Size I should provision when reading 24h of late data:
s3.part.size
* 1 partition >= 7.54 GiB.Thus, there is something that I don't understand in the Heap Size usage of the S3 Connector.
My main guess is that:
flush.size
messages for each partitioned file (6 * 24hours in my case) or reach therotate.schedule.interval.ms
limitflush.size
messages in S3flush.size
new messages again...I read the documentation (and then read it again) searching for a Heap Size usage explanation but didn't find anything.
I think that reading the events also uses some Heap Size, and that usage would be equal to
6 * 24h * flush.size * sizeOfMessageInBytes
in the worst case, but I'm not sure about that. If that is the case, then theflush.size
will have to be adjusted accordingly, considering that only12 GiB - 7.54 GiB = 4.46 GiB
are available (without taking into account the other objects in the Heap, that use memory as well).I'd really appreciate it if you could explain to me the details of the Heap Size usage and what would be the calculation to make in order to process late events with the current partitioning and the
Record
timestamp.