In many Hadoop production environments you get gzipped files as the raw input. Usually these are Apache HTTPD logfiles. When putting these gzipped files into Hadoop you are stuck with exactly 1 map task per input file. In many scenarios this is fine.
However when doing a lot of work in this very first map task it may very well be advantageous to dividing the work over multiple tasks, even if there is a penalty for this scaling out.
This addon for Hadoop makes this possible.
I did benchmarking jobs to see how this solution scales and performs. The software and the results can be found in the Benchmark folder.
In general we can say that you win as long as there is unused capacity in your cluster.
First of all this only works with Hadoop 1.1.0 and up because this depends on the presence of the SplittableCompressionCodec interface.
I tested it with Hortonworks 2.1.2 and Cloudera CDH 4.5.0.
Currently it can only be downloaded via github.
https://github.com/nielsbasjes/splittablegzip
For normal projects you can simply download the prebuilt version from maven central. So when using maven you can simply add this to your project
<dependency>
<groupId>nl.basjes.hadoop</groupId>
<artifactId>splittablegzip</artifactId>
<version>1.3</version>
</dependency>
On CDH the build fails if the native gzip could not be loaded for running the unit tests. To fix this you need to install the native package and set the environment so it can find them for your platform. I.e. Do something like this before starting the build or loading your IDE.
export LD_LIBRARY_PATH=/usr/lib/hadoop-0.20/lib/native/Linux-amd64-64
Place the jar file in the classpath of your hadoop installation.
Enable this codec and make sure the regular GzipCodec is NOT used. This can be done by changing the io.compression.codecs property to something like this:
org.apache.hadoop.io.compress.DefaultCodec, nl.basjes.hadoop.io.compress.SplittableGzipCodec, org.apache.hadoop.io.compress.BZip2Codec
Set the split size to something that works in your situation. This can be done by setting the appropriate values for mapreduce.input.fileinputformat.split.minsize and/or mapreduce.input.fileinputformat.split.maxsize.
For each "split" the gzipped input file is read from the beginning of the file till the point where the split starts, thus reading, decompressing and discarding (wasting!) everything that is before the start of the split.
FACT: Files compressed with the Gzip codec are NOT SPLITTABLE. Never have been, never will be.
This codec offers a trade off between "spent resources" and "scalability" when reading Gzipped input files by simply always starting at the beginning of the file.
So in general this "splittable" Gzip codec will WASTE CPU time and FileSystem IO (HDFS) and probably other system resources (Network) too to reduce the "wall clock" time in some real-life situations.
Assume you have a heavy map phase for which the input is a 1GiB Apache httpd logfile. Now assume this map takes 60 minutes of CPU time to run. Then this task will take 60 minutes to run because all of that CPU time must be spent on a single CPU core ... Gzip is not splittable!
This codec will waste CPU power by always starting from the start of the gzipped file and discard all the decompressed data until the start of the split has been reached.
Decompressing a 1GiB Gzip file usually takes only a few (2-4) minutes.
So if a "60 minutes" input file is split into 4 equal parts then:
Because all tasks run in parallel the running time in this example would be 18 minutes (i.e. the worst split time) instead of the normal 60 minutes. We have wasted about 6 minutes of CPU time and completed the job in about 30% of the original wall clock time.
The overall advise is to EXPERIMENT with the settings and do benchmarks.
Remember that:
A possible optimum:
Upload the input files into HDFS with a blocksize that is equal (or a few bytes bigger) than the file size.
hadoop fs -Ddfs.block.size=1234567890 -put access.log.gz /logs
This has the effect that all nodes that have "a piece of the file" always have "the entire file". This ensures that no network IO is needed for a single node to read the file IFF it has it locally available.
Always remember that there are alternative approaches:
There were two major hurdles that needed to be solved to make this work:
The reported position depends on the read blocksize.
If you read information in "records" the getBytesRead() will return a value that jumps incrementally. Only after a new disk block has been read will the getBytesRead return a new value. "Read" means: read from disk an loaded into the decompressor but does NOT yet mean that the uncompressed information was read.
The solution employed is that when we get close to the end of the split we switch to a crawling mode. This simply means that the disk reads are reduced to 1 byte, making the position reporting also 1 byte accurate. This was implemented in the ThrottleableDecompressorStream.
The input is compressed.
If you read 1 byte (uncompressed) you do not always get an increase in the reported getBytesRead(). This happens because the value reported by getBytesRead is all about the filesize on disk (= compressed) and compressed files have less bytes than the uncompressed data. This makes it impossible to make two splits meet accurately. The solution is based around the concept that we try to report the position as accurately as possible but when we get really close to the end we stop reporting the truth and we start lying about the position. The lie we use to cross the split boundry is that 1 uncompressed byte read is reported as 1 compressed byte increase in position. This was implemented using a simple state machine with 3 different states on what position is reported through the getPos(). The state is essentially selected on the distance to the end. These states are:
REPORT
Normally read the bytes and report the actual disk position in the getPos().
HOLD
When very close to the end we no longer change the reported file position for a while.
SLOPE
When we are at the end: start reporting 1 byte increase from the getPos for every uncompressed byte that was read from the stream.
The overall effect is that the position reporting near the end of the split no longer represents the actual position and this makes the position usable for reliably splitting the input stream. The actual point where the file is split is shifted a bit to the back of the file (we're talking bytes, not even KiB) where this shift actually depends on the compression levels of the data in the stream. If we start too early the split may happen a byte too early and in the end the last split may lose the last record(s). So that's why we hold for a while and only start the slope at the moment we are certain we are beyond the indicated "end". To ensure the split starts at exactly the same spot as the previous split would end: we find the start of a split by running over the "part that must be discarded" as-if it is a split.
Originally this feature was submitted to be part of the core of Hadoop.
See: Gzip splittable (HADOOP-7076).
This idea was conceived and implemented by Niels Basjes.