Aiven-Open / cloud-storage-connectors-for-apache-kafka

Repository containing Cloud Storage Connectors for Apache Kafka®
Apache License 2.0
11 stars 16 forks source link

Heap OOM issue when trying to sing a Debezium snapshot to GCS #259

Open mkherlakian opened 2 years ago

mkherlakian commented 2 years ago

I'm running into an issue where the connector's JVM exists with an OOM error when sinking a topic freshly snapshotted through a PG database through Debezium. The setup:

This is all running on Aiven.

Topic A successfully sinks into GCS. The parquet file gets uploaded and all the data that we expect is there. Topic B consistently runs OOM.

We've tried a variety of values for file.max.records ranging from 50 to 1000, and for offset.flush.interval.ms, lowest being 50ms, but we still experience the OOMs

Part of the issue we believe is coming from the fact that since this starts with a PG snapshot, the timestamps are all within an hour of each other for the 1M records already in the topic. Therefore the connector's grouping logic would consider the entire topic's content to be part of 1 group - and if the GCS connector behaves the same as the S3 one, we thought this could be an indication - https://help.aiven.io/en/articles/4775651-kafka-outofmemoryerror-exceptions. However, we would've expected the file.max.records to compensate for this.

Also while ugrading plans is an option, we'd like to understand what knobs to turn to control memory utilization. Full cleaned up config attached:

{
  "name": "gcs",
  "connector.class": "io.aiven.kafka.connect.gcs.GcsSinkConnector",
  "tasks.max": "1",

  "key.converter": "io.confluent.connect.avro.AvroConverter",
  "key.converter.schema.registry.url": "https://<karapace-service>.aivencloud.com:10034",
  "key.converter.basic.auth.credentials.source": "USER_INFO",
  "key.converter.schema.registry.basic.auth.user.info": "<user:pass>",

  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url": "https://<karapace>.aivencloud.com:10034",
  "value.converter.basic.auth.credentials.source": "USER_INFO",
  "value.converter.schema.registry.basic.auth.user.info": "<user:pass>",

  "topics": "pg.public.A,pg.public.B",

  "gcs.credentials.json": "<GCP_CREDENTIALS>",
  "gcs.bucket.name": "data-lake",
  "file.name.template": "{{topic}}/{{partition}}-{{start_offset}}-{{timestamp:unit=yyyy}}{{timestamp:unit=MM}}{{timestamp:unit=dd}}-{{timestamp:unit=HH}}.parquet.gz",

  "file.compression.type": "gzip",
  "file.max.records": "200",
  "format.output.type": "parquet",
  "format.output.fields": "key,offset,timestamp,value",

  "offset.flush.interval.ms": 50, //tried different values here, none seem to have an effect
}

Any insight into what might be happening?

mkokho commented 2 years ago

thank you, sounds like a tricky problem to reproduce. could you also share configuration of the kafka cluster where gcs connector is running?

mkherlakian commented 2 years ago

@mkokho that might be a little trickier because it's fully managed by Aiven - Here are all the details I have access to:

3 node cluster, 1 CPU per cluster, 600Gb storage - from the logs, it looks like the connectors start with 768Mb heap. I believe that other than us increasing the max message size (we do have some rows that have blobs), everything else is the default config.

We did also test with a dataset of 1M rows, with no blobs where the size per message is predictable and ended up having the same issue...