databricks / iceberg-kafka-connect

Apache License 2.0
218 stars 47 forks source link

[OutOfMemoryError: Java heap space] after starting a sink task #227

Open almir-magazord opened 7 months ago

almir-magazord commented 7 months ago

Hello!

We started to use this connector to sink data from Kafka to S3 in Iceberg format.
The Kafka topic has 375,522 messages wth 231 MiB on size.

The connector config looks as follows:

{
    "name": "kafka-to-s3-order",
    "config": {
        "connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector",
        "tasks.max": "1",
        "topics": "order",                    
                "transforms": "timestampConverter",
                "transforms.timestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
                "transforms.timestampConverter.field": "order_datetime",
                "transforms.timestampConverter.target.type": "Date",
                "transforms.timestampConverter.format": "yyyy-MM-dd",

        "iceberg.control.commit.threads": 1,

        "iceberg.table.kafka.order.id-columns": "database,order_id",
        "iceberg.table.kafka.order.partition-by": "order_datetime,database",

        "iceberg.tables": "kafka.order",
        "iceberg.tables.cdc-field": "__op",

        "iceberg.tables.upsert-mode-enabled": true,
        "iceberg.tables.auto-create-enabled": true,        
        "iceberg.tables.evolve-schema-enabled": true,

        "iceberg.catalog.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
        "iceberg.catalog.warehouse": "s3a://kafka-test",
        "iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",

        "iceberg.tables.write-props.write.metadata.delete-after-commit.enabled": true,
        "iceberg.tables.write-props.write.metadata.previous-versions-max": 1,

        "iceberg.tables.auto-create-props.write.metadata.delete-after-commit.enabled": true,
        "iceberg.tables.auto-create-props.write.metadata.previous-versions-max": 1,
        "iceberg.tables.auto-create-props.write.parquet.compression-codec": "snappy"
   }
}

We can see the table created in AWS Glue, a lot of folders in S3 (data and metadata)... but some minutes after the sink task starts, we get this error:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:618)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
    at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.OutOfMemoryError: Java heap space
    at java.base/java.io.ByteArrayOutputStream.<init>(ByteArrayOutputStream.java:79)
    at org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.<init>(CodecFactory.java:158)
    at org.apache.parquet.hadoop.CodecFactory.createCompressor(CodecFactory.java:219)
    at org.apache.parquet.hadoop.CodecFactory.getCompressor(CodecFactory.java:202)
    at org.apache.iceberg.parquet.ParquetWriter.<init>(ParquetWriter.java:90)
    at org.apache.iceberg.parquet.Parquet$WriteBuilder.build(Parquet.java:337)
    at org.apache.iceberg.parquet.Parquet$DeleteWriteBuilder.buildEqualityWriter(Parquet.java:855)
    at org.apache.iceberg.data.GenericAppenderFactory.newEqDeleteWriter(GenericAppenderFactory.java:187)
    at org.apache.iceberg.io.BaseTaskWriter$RollingEqDeleteWriter.newWriter(BaseTaskWriter.java:378)
    at org.apache.iceberg.io.BaseTaskWriter$RollingEqDeleteWriter.newWriter(BaseTaskWriter.java:371)
    at org.apache.iceberg.io.BaseTaskWriter$BaseRollingWriter.openCurrent(BaseTaskWriter.java:303)
    at org.apache.iceberg.io.BaseTaskWriter$BaseRollingWriter.<init>(BaseTaskWriter.java:265)
    at org.apache.iceberg.io.BaseTaskWriter$BaseRollingWriter.<init>(BaseTaskWriter.java:255)
    at org.apache.iceberg.io.BaseTaskWriter$RollingEqDeleteWriter.<init>(BaseTaskWriter.java:373)
    at org.apache.iceberg.io.BaseTaskWriter$BaseEqualityDeltaWriter.<init>(BaseTaskWriter.java:121)
    at io.tabular.iceberg.connect.data.BaseDeltaTaskWriter$RowDataDeltaWriter.<init>(BaseDeltaTaskWriter.java:89)
    at io.tabular.iceberg.connect.data.PartitionedDeltaWriter.route(PartitionedDeltaWriter.java:73)
    at io.tabular.iceberg.connect.data.BaseDeltaTaskWriter.write(BaseDeltaTaskWriter.java:77)
    at io.tabular.iceberg.connect.data.PartitionedDeltaWriter.write(PartitionedDeltaWriter.java:36)
    at io.tabular.iceberg.connect.data.BaseDeltaTaskWriter.write(BaseDeltaTaskWriter.java:37)
    at io.tabular.iceberg.connect.data.IcebergWriter.write(IcebergWriter.java:69)
    at io.tabular.iceberg.connect.channel.Worker.lambda$routeRecordStatically$5(Worker.java:197)
    at io.tabular.iceberg.connect.channel.Worker$$Lambda$1101/0x00000008411dd040.accept(Unknown Source)
    at java.base/java.util.Arrays$ArrayList.forEach(Arrays.java:4390)
    at io.tabular.iceberg.connect.channel.Worker.routeRecordStatically(Worker.java:195)
    at io.tabular.iceberg.connect.channel.Worker.save(Worker.java:184)
    at io.tabular.iceberg.connect.channel.Worker$$Lambda$1100/0x00000008411ddc40.accept(Unknown Source)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
    at io.tabular.iceberg.connect.channel.Worker.save(Worker.java:171)
    at io.tabular.iceberg.connect.IcebergSinkTask.put(IcebergSinkTask.java:156)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)

The docker instance where the sink is running has 8 GB of RAM with 6 GB allocated to Java Heap Space.

What could be causing this error? Thanks!

tabmatfournier commented 7 months ago

Are you succeeding in committing to the table at all, or are you OOMing before that?

While looking at S3, can you get an estimate of how many files are being written at once? This is on the list to get a custom jmx metric on in the future to make this easier.

almir-magazord commented 7 months ago

Are you succeeding in committing to the table at all, or are you OOMing before that?

While looking at S3, can you get an estimate of how many files are being written at once? This is on the list to get a custom jmx metric on in the future to make this easier.

We have orders on the Kafka topic for every day since year 2011. Since we are partitioning by day and database (customer), there are a lot of folders (and files) created on S3. The OOM happens in the middle of process... on Kafka side, we can see the Sink connector "Rebalancing".... "Healthy"... and some minutes after, It went into error state. Looking into the consumer groups, the "lag" metrics show to us that no one message was consumed.... (lag = total messages).

To make another test, changed the partitioning fields from order_datetime,database to __source_ts_ms,database. Now we have only few partitions and no erros on sink. It's working... but we prefer to have this data partitioned by order_datetime.

It seems the connector is making to much operations in memory and with small files before sync... (at least is what I'm thinking).

What could I try to deal with a bigger number of partitions?

Edit: after talking with a more experienced data engeneering, we concluded I'm partitioning my data to much. Changed partitioning by day + database to year + database. No errors after changing this. But despite my mistake, maybe a good idea checking why this error occured.

tabmatfournier commented 7 months ago

My theory is when you are cunrching through the backlog, you are creating a lot of FileAppenders. I don't know your exact setting but take the simplest case of a single target table and topic with one partition with no fan out. You would have an appender per partition in the messages it is going through. Writers are closed every five minutes. If you go over 50 partitions/customers in the records in that five minute period, you will have 50 open writers since the way Iceberg works is you need a distinct file per partition.

Each writer buffers some amount of data and flushes periodically --there are some settings to control this, including the parquet row size. As those fill up, it's a lot of heap being used up when you have many open writers.

Unfortunately if you want to do this you need to run Kafka Connect w/ more memory.

almir-magazord commented 7 months ago

My theory is when you are cunrching through the backlog, you are creating a lot of FileAppenders. I don't know your exact setting but take the simplest case of a single target table and topic with one partition with no fan out. You would have an appender per partition in the messages it is going through. Writers are closed every five minutes. If you go over 50 partitions/customers in the records in that five minute period, you will have 50 open writers since the way Iceberg works is you need a distinct file per partition.

Each writer buffers some amount of data and flushes periodically --there are some settings to control this, including the parquet row size. As those fill up, it's a lot of heap being used up when you have many open writers.

Unfortunately if you want to do this you need to run Kafka Connect w/ more memory.

This makes a lot of sense. We reduced the partition granularity and the error disappeared.

Thanks!