nielsbasjes / splittablegzip

Splittable Gzip codec for Hadoop
Apache License 2.0
69 stars 8 forks source link

Fails on spark with "The provided InputSplit bytes which is too small" #4

Closed AbdullaevAPo closed 3 years ago

AbdullaevAPo commented 4 years ago

Hi! Thank you for this great library. We used it to process our large input gz files, but we faced with some problem.

java.lang.IllegalArgumentException: The provided InputSplit (786432000;786439029] is 7029 bytes which is too small. (Minimum is 65536)

In our company we use HDP 2.6 with spark 2.3. I tried to find min split parameter for spark, but spark.hadoop.mapreduce.input.fileinputformat.split.minsize doesn't work. Only spark.sql.files.maxPartitionBytes setting realy works. Give me some advice, please, what can I do? Or may be it's possible to fix problem in library?

nielsbasjes commented 4 years ago

First of all I assume you are using version 1.3. I had to check my code and apparently this limit of 65536 stems from the io.file.buffer.size setting in your environment. The default value in my code is 4096 bytes. What I think you have is a file that is split into multiple pieces by the framework and the last piece is very small. It seems to me the part creating the splits is using a different minimal split size value then what is defined in io.file.buffer.size.

Apparently when I wrote this (long time ago) I explicitly wrote that this should not happen.

In my test code I even have: fail("Test definition error: The last split must be the same or larger as the other splits.");

Note that my code only handles the splits that have been provided. It does not create the splits.

nielsbasjes commented 4 years ago

@AbdullaevAPo I'm no Spark expert so I was wondering: Can you please provide me with a way to reproduce the problem you are seeing?

nielsbasjes commented 4 years ago

At this point my guess is that the spark.hadoop.mapreduce.input.fileinputformat.split.minsize you mentioned (and perhaps some related settings too) must have a value that is compatible with the io.file.buffer.size my library looks at.

At this point based on the limited information I have right now my guess is that spark.hadoop.mapreduce.input.fileinputformat.split.minsize >= io.file.buffer.size

nielsbasjes commented 4 years ago

@AbdullaevAPo Have you been able to experiment with the settings I mentioned? Or perhaps you have a (small) way for me to reproduce this?

nielsbasjes commented 4 years ago

I'm closing this as you are not responding to any of my questions.

guyshemer commented 3 years ago

Hi @nielsbasjes , I tried using your codec recently and bumped into the same exception as described in this issue. We are using spark 3.0.1 op top of hadoop 3.1.3 . The value of io.file.buffer.size property on my cluster is the default (65536).

I tried using your tip regarding the size of spark.hadoop.mapreduce.input.fileinputformat.split.minsize , and even tried setting spark.hadoop.mapreduce.input.fileinputformat.split.minsize.per.rack and spark.hadoop.mapreduce.input.fileinputformat.split.minsize.per.node , but it seems that spark engine ignores those parameters when setting the split size.

The only parameter that actually affect the split size is spark.sql.files.maxPartitionBytes. When choosing relatively small value, it determine precisely the size of the split and cause failure because the last split is too small. When using the default value of this property (134217728), or some other big enough number (the size of mu gzipped test file is ~200MB), the split mechanism succeeded to set by himself split size that causes the job not to fail.

Since the cluster can process gzip files without size limitations, I prefer not to count on "max size" property, since I'm afraid that I'll bumped into a scenario where again, the last split will be too small. Rather that, when using "min size" configuration which I can count on not to choose split size that will fail my job.

Do you have any clue why the spark cluster ignores that "min size" value?

nielsbasjes commented 3 years ago

Hi @guyshemer ,

The main problem here is that I myself do not have any experience in using Spark; the documentation around Spark usage was kindly provided by @nchammas (perhaps he knows this).

At the time I created this code I used it in conjunction with good old MapReduce which includes the setting mapreduce.input.fileinputformat.split.minsize which ensures the splits don't go below the threshold.

Do note that because a compressed file outputs more bytes than are read from disk it is essential to have a lower limit on the split size of 4 KiB. So at this point I'm really curious if Spark is capable of guaranteeing a lower limit on a split size at all. For this tool this capability is essential and my code (which was based on how Hadoop Mapreduce does things) assumes this to be the io.file.buffer.size setting.

So I downloaded the Spark sourcecode and found this https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartition.scala#L51

What I see here is that the code determines the maximum splitsize (partially based up the spark.sql.files.maxPartitionBytes ) and then combines the provided files into partitions (which can be multiple small files).

The way I look at this code it seems that you may actually run into the scenario that the last split is 1 byte.

I'm reopening this as it seems to be a Spark specific problem.

nielsbasjes commented 3 years ago

I created a gzipped file and if I set the maxPartitionBytes to exactly 1 byte less than the file at hand I get

The provided InputSplit (562686;562687] is 1 bytes which is too small. (Minimum is 65536)

Going to submit an enhancement request at the Spark side.

nielsbasjes commented 3 years ago

I submitted https://issues.apache.org/jira/browse/SPARK-33534 with a proposed enhancement for Spark.

nielsbasjes commented 3 years ago

I have documented this problem: https://github.com/nielsbasjes/splittablegzip/blob/master/README-Spark.md

I'm closing this issue because there is nothing for me to fix in my code.

nchammas commented 3 years ago

The main problem here is that I myself do not have any experience in using Spark; the documentation around Spark usage was kindly provided by @nchammas (perhaps he knows this).

From what I could tell when I last looked into this, there is no way to set the minimum split size, so I added this comment to the usage notes:

        # I don't think Spark DataFrames offer an equivalent setting for
        # mapreduce.input.fileinputformat.split.minsize.

I think filing SPARK-33534 is the best we can do for now.