Aiven-Open / cloud-storage-connectors-for-apache-kafka

Repository containing Cloud Storage Connectors for Apache Kafka®
Apache License 2.0
9 stars 16 forks source link

[File size flush inteverval] Best practices for reducing network costs #255

Open liraric opened 1 year ago

liraric commented 1 year ago

Hi all, hope everithing is fine. Currently we are working on a feature that uses this connector to dump debezium CDC messages. Our deployed connector is currently working based on default values, and as far as I can get, the only ways to control flush intervals are based on time (offset.flush.interval.ms, which defaults to 60 secs) and num of records per flushed files (file.max.records, didn't see a default value, so I'm assuming it will write as many lines as possible to the file during the flush interval window).

Our current cost for writing these files is pretty large, so I was wondering if you'd have any bet practices to improve performance of the connectors in order to reduce network costs during the write to gcs.

Thanks for any help! Cheers

chadleeshaw commented 1 year ago

I am also struggling with flush intervals and file sizes. I want to try and only flush every ten minutes or when the heap is filled up. I currently have these settings:

connector.class=io.aiven.kafka.connect.gcs.GcsSinkConnector
topics=MyTopic
tasks.max=4
format.output.fields=key,value,offset,timestamp
gcs.bucket.name=MyBucket
gcs.credentials.path=MyCreds
file.name.timestamp.timezone=America/Denver
format.output.type=jsonl
file.name.template={{topic}}/{{timestamp:unit=yyyy}}{{timestamp:unit=MM}}{{timestamp:unit=dd}}/{{timestamp:unit=HH}}/{{topic}}-{{partition}}-{{start_offset}}.gz
file.max.records=1000000
offset.flush.interval.ms=600000
value.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter

But I'm still flushing every couple of minutes even though I have 32GB of heap... I don't get it. I'm getting about 250K messages (77MB) compressed to 7MB.

Looks like someone stated that the flush interval has to be set on the worker not the connector settings: https://github.com/Aiven-Open/commons-for-apache-kafka-connect/issues/263 https://docs.confluent.io/platform/current/connect/references/allconfigs.html

chadleeshaw commented 1 year ago

Looks like you have to set offset.flush.interval.ms inside the worker.properties file. Setting inside the connector config will not work.

Setsushin commented 7 months ago

Hi guys, I also met this issue. With enlarging the offset.flush.interval.ms inside the worker.properties file, the sink speed becomes normal. But it does affect a lot to other topics/connectors. Their speed also become slow. So I wonder if there an optimal solution or a plan of improving this point. Thanks!