getindata / kafka-connect-iceberg-sink

Apache License 2.0
77 stars 28 forks source link

Connector Fails with "No space left on device" Error #24

Closed dilverse closed 1 year ago

dilverse commented 1 year ago

Hi,

I am trying to run the iceberg sync connector in K8s I tried to use both Nessie/hive catalog based connector ran into same No space left on device after few mins. In the sample setup Producer creates 1000 records per second.

Here is the connector configuration for Nessie catalog:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: iceberg-hive-sink-connector
  namespace: kafka
  labels:
    strimzi.io/cluster: my-connect-cluster
  annotations:
    strimzi.io/restart: "true"
spec:
  class: com.getindata.kafka.connect.iceberg.sink.IcebergSink
  tasksMax: 1
  config:
    task.max: '1'
    topics: nyc-avro-topic
    flush.size: '10000'
    timezone: UTC
    schema.compatibility: NONE
    behavior.on.null.values: ignore
    auto.register.schemas: false
    schema.registry.url: http://my-cluster-kafka-schema-registry-cp-schema-registry:8081
    value.converter: io.confluent.connect.avro.AvroConverter
    key.converter: org.apache.kafka.connect.storage.StringConverter
    partitioner.class: io.confluent.connect.storage.partitioner.DefaultPartitioner
    value.converter.schema.registry.url: http://my-cluster-kafka-schema-registry-cp-schema-registry:8081
    table.namespace: "kib-data"
    table.prefix: ""
    table.auto-create: true
    table.write-format: "parquet"
    iceberg.catalog-impl: org.apache.iceberg.nessie.NessieCatalog
    iceberg.uri: http://NESSIE_URL/api/v1
    iceberg.ref: "dev"
    iceberg.authentication.type: "NONE"
    iceberg.fs.defaultFS: "s3a://warehouse/"
    iceberg.warehouse: "s3a://warehouse/kib"
    # Minio S3
    iceberg.io-impl: "org.apache.iceberg.aws.s3.S3FileIO"
    iceberg.s3.endpoint: https://MINIO_ENDPOINT
    iceberg.fs.s3.path-style-access: true
    iceberg.fs.s3.access-key-id: 'ACCESS_KEY'
    iceberg.fs.s3.secret-access-key: 'SECRET_KEY'
    consumer.override.max.poll.records: 10000

The CPU and Memory seems to keeps spiking as the time progresses and after sometime end up with the below stacktrace. Not sure why this occurs. I also set the broker log.retention.ms to 10000 since I found in Buch of places broker could be running out of space but that doesn't seem to be the case here. To validate I even tried a different connector (io.confluent.connect.s3.S3SinkConnector) that seems to work fine and I was able to store almost ~112M records in to the S3 bucket.

t recover until manually restarted. Error: Failed to write json to file: s3a://warehouse/kib/kib-data/nyc_avro_topic/metadata/02641-082efb95-6282-483a-baa0-beca77663794.metadata.json (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-iceberg-debezium-sink-connector-0]
org.apache.iceberg.exceptions.RuntimeIOException: Failed to write json to file: s3a://warehouse/kib/kib-data/nyc_avro_topic/metadata/02641-082efb95-6282-483a-baa0-beca77663794.metadata.json
        at org.apache.iceberg.TableMetadataParser.internalWrite(TableMetadataParser.java:128)
        at org.apache.iceberg.TableMetadataParser.overwrite(TableMetadataParser.java:110)
        at org.apache.iceberg.BaseMetastoreTableOperations.writeNewMetadata(BaseMetastoreTableOperations.java:162)
        at org.apache.iceberg.nessie.NessieTableOperations.doCommit(NessieTableOperations.java:159)
        at org.apache.iceberg.BaseMetastoreTableOperations.commit(BaseMetastoreTableOperations.java:133)
        at org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:357)
        at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:402)
        at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:212)
        at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196)
        at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:189)
        at org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:331)
        at com.getindata.kafka.connect.iceberg.sink.tableoperator.IcebergTableOperator.addToTablePerSchema(IcebergTableOperator.java:171)
        at com.getindata.kafka.connect.iceberg.sink.tableoperator.IcebergTableOperator.addToTable(IcebergTableOperator.java:69)
        at com.getindata.kafka.connect.iceberg.sink.IcebergChangeConsumer.accept(IcebergChangeConsumer.java:51)
        at com.getindata.kafka.connect.iceberg.sink.IcebergSinkTask.put(IcebergSinkTask.java:38)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.IOException: No space left on device
        at java.base/java.io.FileOutputStream.writeBytes(Native Method)
        at java.base/java.io.FileOutputStream.write(FileOutputStream.java:354)
        at java.base/java.io.BufferedOutputStream.write(BufferedOutputStream.java:123)
        at org.apache.iceberg.relocated.com.google.common.io.CountingOutputStream.write(CountingOutputStream.java:52)
        at org.apache.iceberg.aws.s3.S3OutputStream.write(S3OutputStream.java:200)
        at java.base/sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:233)
        at java.base/sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:303)
        at java.base/sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:281)
        at java.base/sun.nio.cs.StreamEncoder.write(StreamEncoder.java:125)
        at java.base/java.io.OutputStreamWriter.write(OutputStreamWriter.java:208)
        at org.apache.iceberg.shaded.com.fasterxml.jackson.core.json.WriterBasedJsonGenerator._flushBuffer(WriterBasedJsonGenerator.java:2034)
        at org.apache.iceberg.shaded.com.fasterxml.jackson.core.json.WriterBasedJsonGenerator.writeRawLong(WriterBasedJsonGenerator.java:632)
        at org.apache.iceberg.shaded.com.fasterxml.jackson.core.json.WriterBasedJsonGenerator.writeRaw(WriterBasedJsonGenerator.java:565)
        at org.apache.iceberg.shaded.com.fasterxml.jackson.core.util.DefaultPrettyPrinter.writeObjectFieldValueSeparator(DefaultPrettyPrinter.java:310)
        at org.apache.iceberg.shaded.com.fasterxml.jackson.core.json.JsonGeneratorImpl._verifyPrettyValueWrite(JsonGeneratorImpl.java:224)
        at org.apache.iceberg.shaded.com.fasterxml.jackson.core.json.WriterBasedJsonGenerator._verifyValueWrite(WriterBasedJsonGenerator.java:935)
        at org.apache.iceberg.shaded.com.fasterxml.jackson.core.json.WriterBasedJsonGenerator.writeNumber(WriterBasedJsonGenerator.java:766)
        at org.apache.iceberg.shaded.com.fasterxml.jackson.core.JsonGenerator.writeNumberField(JsonGenerator.java:2184)
        at org.apache.iceberg.SnapshotParser.toJson(SnapshotParser.java:62)
        at org.apache.iceberg.TableMetadataParser.toJson(TableMetadataParser.java:223)
        at org.apache.iceberg.TableMetadataParser.internalWrite(TableMetadataParser.java:125)
        ... 25 more
2023-03-01 15:26:17,044 ERROR [iceberg-debezium-sink-connector|task-0] WorkerSinkTask{id=iceberg-debezium-sink-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-iceberg-debezium-sink-connector-0]
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.iceberg.exceptions.RuntimeIOException: Failed to write json to file: s3a://warehouse/kib/kib-data/nyc_avro_topic/metadata/02641-082efb95-6282-483a-baa0-beca77663794.metadata.json
        at org.apache.iceberg.TableMetadataParser.internalWrite(TableMetadataParser.java:128)
        at org.apache.iceberg.TableMetadataParser.overwrite(TableMetadataParser.java:110)
        at org.apache.iceberg.BaseMetastoreTableOperations.writeNewMetadata(BaseMetastoreTableOperations.java:162)
        at org.apache.iceberg.nessie.NessieTableOperations.doCommit(NessieTableOperations.java:159)
        at org.apache.iceberg.BaseMetastoreTableOperations.commit(BaseMetastoreTableOperations.java:133)
        at org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:357)
        at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:402)
        at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:212)
        at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196)
        at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:189)
        at org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:331)
        at com.getindata.kafka.connect.iceberg.sink.tableoperator.IcebergTableOperator.addToTablePerSchema(IcebergTableOperator.java:171)
        at com.getindata.kafka.connect.iceberg.sink.tableoperator.IcebergTableOperator.addToTable(IcebergTableOperator.java:69)
        at com.getindata.kafka.connect.iceberg.sink.IcebergChangeConsumer.accept(IcebergChangeConsumer.java:51)
        at com.getindata.kafka.connect.iceberg.sink.IcebergSinkTask.put(IcebergSinkTask.java:38)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
        ... 10 more
Caused by: java.io.IOException: No space left on device
        at java.base/java.io.FileOutputStream.writeBytes(Native Method)
        at java.base/java.io.FileOutputStream.write(FileOutputStream.java:354)
        at java.base/java.io.BufferedOutputStream.write(BufferedOutputStream.java:123)
        at org.apache.iceberg.relocated.com.google.common.io.CountingOutputStream.write(CountingOutputStream.java:52)
        at org.apache.iceberg.aws.s3.S3OutputStream.write(S3OutputStream.java:200)
        at java.base/sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:233)
        at java.base/sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:303)
        at java.base/sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:281)
        at java.base/sun.nio.cs.StreamEncoder.write(StreamEncoder.java:125)
        at java.base/java.io.OutputStreamWriter.write(OutputStreamWriter.java:208)
        at org.apache.iceberg.shaded.com.fasterxml.jackson.core.json.WriterBasedJsonGenerator._flushBuffer(WriterBasedJsonGenerator.java:2034)
        at org.apache.iceberg.shaded.com.fasterxml.jackson.core.json.WriterBasedJsonGenerator.writeRawLong(WriterBasedJsonGenerator.java:632)
        at org.apache.iceberg.shaded.com.fasterxml.jackson.core.json.WriterBasedJsonGenerator.writeRaw(WriterBasedJsonGenerator.java:565)
        at org.apache.iceberg.shaded.com.fasterxml.jackson.core.util.DefaultPrettyPrinter.writeObjectFieldValueSeparator(DefaultPrettyPrinter.java:310)
        at org.apache.iceberg.shaded.com.fasterxml.jackson.core.json.JsonGeneratorImpl._verifyPrettyValueWrite(JsonGeneratorImpl.java:224)
        at org.apache.iceberg.shaded.com.fasterxml.jackson.core.json.WriterBasedJsonGenerator._verifyValueWrite(WriterBasedJsonGenerator.java:935)
        at org.apache.iceberg.shaded.com.fasterxml.jackson.core.json.WriterBasedJsonGenerator.writeNumber(WriterBasedJsonGenerator.java:766)
        at org.apache.iceberg.shaded.com.fasterxml.jackson.core.JsonGenerator.writeNumberField(JsonGenerator.java:2184)
        at org.apache.iceberg.SnapshotParser.toJson(SnapshotParser.java:62)
        at org.apache.iceberg.TableMetadataParser.toJson(TableMetadataParser.java:223)
        at org.apache.iceberg.TableMetadataParser.internalWrite(TableMetadataParser.java:125)
        ... 25 more
shepherd44 commented 1 year ago

In my case, the storage space for kafka-connect's /tmp directory is insufficient when using s3 write with gzip compression (default 5MB). To check the usage of the kafka-connect tmp storage, you can run the command:

df -h

If the /tmp volume is not large enough, you can increase its size by updating the tmpDirSizeLimit parameter to a larger value.

... apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: connect spec: template: pod: tmpDirSizeLimit: 1Gi ...

dilverse commented 1 year ago

In my case, the storage space for kafka-connect's /tmp directory is insufficient when using s3 write with gzip compression (default 5MB). To check the usage of the kafka-connect tmp storage, you can run the command:

df -h

If the /tmp volume is not large enough, you can increase its size by updating the tmpDirSizeLimit parameter to a larger value.

... apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: connect spec: template: pod: tmpDirSizeLimit: 1Gi ...

This fixed the issue for me. Thanks @shepherd44!