tabular-io / iceberg-kafka-connect

Apache License 2.0
192 stars 41 forks source link

Error java.io.IOException: No space left on device #114

Closed haripriyarhp closed 11 months ago

haripriyarhp commented 11 months ago

Hi, I have the connector running on k8s. It runs fine for the initial few commits and then always fails with the below error. Setting jvmoptions to the connect did not work. Below are the parameters used

      connector.class: io.tabular.iceberg.connect.IcebergSinkConnector
      topics: test-topic-1
      iceberg.tables: db.iceberg_connector_table
      iceberg.tables.auto-create-enabled: true
      iceberg.catalog.catalog-impl: org.apache.iceberg.aws.glue.GlueCatalog
      iceberg.catalog.warehouse: s3://bucketname/object_key
      iceberg.catalog.io-impl: org.apache.iceberg.aws.s3.S3FileIO
      iceberg.control.commit.interval-ms: 30000
      value.converter.schemas.enable: true
      key.converter: org.apache.kafka.connect.storage.StringConverter
      value.converter: io.confluent.connect.avro.AvroConverter 
      value.converter.schema.registry.url: http://localhost:8081 
      value.converter.value.subject.name.strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy

"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257) at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:833) Caused by: java.io.UncheckedIOException: Failed to flush row group at org.apache.iceberg.parquet.ParquetWriter.flushRowGroup(ParquetWriter.java:221) at org.apache.iceberg.parquet.ParquetWriter.close(ParquetWriter.java:254) at org.apache.iceberg.io.DataWriter.close(DataWriter.java:82) at org.apache.iceberg.io.BaseTaskWriter$BaseRollingWriter.closeCurrent(BaseTaskWriter.java:314) at org.apache.iceberg.io.BaseTaskWriter$BaseRollingWriter.close(BaseTaskWriter.java:341) at org.apache.iceberg.io.UnpartitionedWriter.close(UnpartitionedWriter.java:47) at org.apache.iceberg.io.BaseTaskWriter.complete(BaseTaskWriter.java:96) at io.tabular.iceberg.connect.data.IcebergWriter.flush(IcebergWriter.java:130) at io.tabular.iceberg.connect.data.IcebergWriter.complete(IcebergWriter.java:145) at io.tabular.iceberg.connect.channel.Worker.lambda$receive$2(Worker.java:116) at java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:273) at java.base/java.util.HashMap$ValueSpliterator.forEachRemaining(HashMap.java:1779) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682) at io.tabular.iceberg.connect.channel.Worker.receive(Worker.java:116) at io.tabular.iceberg.connect.channel.Channel.lambda$consumeAvailable$2(Channel.java:129) at java.base/java.lang.Iterable.forEach(Iterable.java:75) at io.tabular.iceberg.connect.channel.Channel.consumeAvailable(Channel.java:119) at io.tabular.iceberg.connect.channel.Worker.process(Worker.java:105) at io.tabular.iceberg.connect.IcebergSinkTask.processControlEvents(IcebergSinkTask.java:134) at io.tabular.iceberg.connect.IcebergSinkTask.put(IcebergSinkTask.java:121) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:583) ... 11 more Caused by: java.io.IOException: No space left on device at java.base/sun.nio.ch.FileDispatcherImpl.write0(Native Method) at java.base/sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:62) at java.base/sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:132) at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:97) at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:67) at java.base/sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:288) at java.base/java.nio.channels.Channels.writeFullyImpl(Channels.java:74) at java.base/java.nio.channels.Channels.writeFully(Channels.java:96) at java.base/java.nio.channels.Channels$1.write(Channels.java:171) at java.base/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:81) at java.base/java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) at org.apache.iceberg.relocated.com.google.common.io.CountingOutputStream.write(CountingOutputStream.java:54) at org.apache.iceberg.aws.s3.S3OutputStream.write(S3OutputStream.java:201) at java.base/java.io.OutputStream.write(OutputStream.java:127) at org.apache.parquet.io.DelegatingPositionOutputStream.write(DelegatingPositionOutputStream.java:56) at org.apache.parquet.bytes.ConcatenatingByteArrayCollector.writeAllTo(ConcatenatingByteArrayCollector.java:46) at org.apache.parquet.hadoop.ParquetFileWriter.writeColumnChunk(ParquetFileWriter.java:903) at org.apache.parquet.hadoop.ParquetFileWriter.writeColumnChunk(ParquetFileWriter.java:848) at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writeToFileWriter(ColumnChunkPageWriteStore.java:310) at org.apache.parquet.hadoop.ColumnChunkPageWriteStore.flushToFileWriter(ColumnChunkPageWriteStore.java:458) at org.apache.iceberg.parquet.ParquetWriter.flushRowGroup(ParquetWriter.java:213) ... 35 more "

bryanck commented 11 months ago

S3FileIO stores temporary files to local disk, by default this will use java.io.tmpdir which is likely /tmp. If using Strimzi, for example, that mount is very small. You can override the staging file directory by setting iceberg.catalog.s3.staging-dir.

haripriyarhp commented 11 months ago

Hi, thanks for the suggestion. It works now. :)

okayhooni commented 10 months ago

@bryanck @haripriyarhp

I got the same issue on the Kafka Connect cluster deployed by Strimzi Kafka operator. and I noticed the size of /tmp directory was only 5mb, as @bryanck said.

But, when I inspect KafkaConnect custom resource template below, there is no options to mount the additional volumes except ConfigMap volume and Secret volume via spec.externalConfiguration.volumes key.

How can I use sufficient size volume for S3FileIO with the Kafka Connect cluster deployed by Strimzi operator?

Is it only available way to use /tmp volume with more big tmpDirSizeLimit option..? (but its medium is hard-coded with memory..)

Other paths except /tmp/ cannot be used for iceberg sink connector due to no write-privilege.

/ $ df -h
Filesystem      Size  Used Avail Use% Mounted on
overlay          80G   14G   67G  17% /
tmpfs            64M     0   64M   0% /dev
tmpfs            31G     0   31G   0% /sys/fs/cgroup
tmpfs           5.0M  5.0M     0 100% /tmp
/dev/nvme0n1p1   80G   14G   67G  17% /etc/hosts
shm              64M     0   64M   0% /dev/shm
tmpfs           8.0G   12K  8.0G   1% /run/secrets/kubernetes.io/serviceaccount
tmpfs           8.0G  4.0K  8.0G   1% /run/secrets/eks.amazonaws.com/serviceaccount
tmpfs            31G     0   31G   0% /proc/acpi
tmpfs            31G     0   31G   0% /sys/firmware
/ $ ls -lr
total 0
drwxr-xr-x   1 root root  17 Mar 22  2023 var
drwxr-xr-x   1 root root  19 Mar 22  2023 usr
drwxrwxrwt   4 root root 100 Oct 27 11:20 tmp
dr-xr-xr-x  13 root root   0 Oct 27 10:48 sys
drwxr-xr-x   2 root root   6 Jun 21  2021 srv
lrwxrwxrwx   1 root root   8 Jun 21  2021 sbin -> usr/sbin
drwxr-xr-x   1 root root  21 Mar 24  2023 run
dr-xr-x---   3 root root 213 Mar 22  2023 root
dr-xr-xr-x 248 root root   0 Oct 27 10:49 proc
drwxr-xr-x   1 root root  19 Oct 27 10:48 opt
drwxr-xr-x   2 root root   6 Jun 21  2021 mnt
drwxr-xr-x   2 root root   6 Jun 21  2021 media
drwx------   2 root root   6 Mar 22  2023 lost+found
lrwxrwxrwx   1 root root   9 Jun 21  2021 lib64 -> usr/lib64
lrwxrwxrwx   1 root root   7 Jun 21  2021 lib -> usr/lib
drwxr-xr-x   1 root root  19 Mar 24  2023 home
drwxr-xr-x   1 root root  64 Mar 24  2023 etc
drwxr-xr-x   5 root root 360 Oct 27 10:49 dev
dr-xr-xr-x   2 root root   6 Jun 21  2021 boot
lrwxrwxrwx   1 root root   7 Jun 21  2021 bin -> usr/bin
/mnt $ $ touch test.txt
touch: cannot touch 'test.txt': Permission denied
Caused by: java.io.IOException: Permission denied
    at java.base/java.io.UnixFileSystem.createFileExclusively(Native Method)
    at java.base/java.io.File.createTempFile(File.java:2170)
    at org.apache.iceberg.aws.s3.S3OutputStream.newStream(S3OutputStream.java:220)
    at org.apache.iceberg.aws.s3.S3OutputStream.<init>(S3OutputStream.java:151)
    at org.apache.iceberg.aws.s3.S3OutputFile.createOrOverwrite(S3OutputFile.java:70)
    ... 35 more
bryanck commented 10 months ago

It is a limitation of the Strimzi operator, I do not believe you can mount volumes on a Kafka Connect container. There have been some issues opened on this, I'd definitely like to see this added. You may want to open one as well or comment on an existing ticket.

For now, try setting iceberg.catalog.s3.staging-dir to /home/kafka/iceberg-staging.

mattssll commented 7 months ago

Here is how you can increase the size of /tmp for a pod

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: strimzi-connect-cluster
  namespace: kafka-connect
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  # Setting up environment variables first
    pod:
      tmpDirSizeLimit: "5Gi"
bryanck commented 7 months ago

Increasing the size of /tmp could use more memory as it is a tmpfs mount.