damballa / parkour

Hadoop MapReduce in idiomatic Clojure.
Apache License 2.0
257 stars 19 forks source link

Use existing output directory #17

Closed stanfea closed 9 years ago

stanfea commented 9 years ago

Hi,

I need to be able to run an incremental job that will update a folder on hdfs, so have to be able to reuse the output folder. I've seen this solution from http://stackoverflow.com/questions/7713316/how-to-overwrite-reuse-the-exisitng-output-path-for-hadoop-jobs-again-and-agin :

public class OverwriteTestOutputFormat<K, V> extends TextOutputFormat<K, V>
{
      public RecordWriter<K, V> 
      getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException 
      {
          Configuration conf = job.getConfiguration();
          boolean isCompressed = getCompressOutput(job);
          String keyValueSeparator= conf.get("mapred.textoutputformat.separator","\t");
          CompressionCodec codec = null;
          String extension = "";
          if (isCompressed) 
          {
              Class<? extends CompressionCodec> codecClass = 
                      getOutputCompressorClass(job, GzipCodec.class);
              codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
              extension = codec.getDefaultExtension();
          }
          Path file = getDefaultWorkFile(job, extension);
          FileSystem fs = file.getFileSystem(conf);
          FSDataOutputStream fileOut = fs.create(file, true);
          if (!isCompressed) 
          {
              return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
          } 
          else 
          {
              return new LineRecordWriter<K, V>(new DataOutputStream(codec.createOutputStream(fileOut)),keyValueSeparator);
          }
      }
}

the FSDataOutputStream fileOut = fs.create(file, true); sets overwrite to true

I'm using ::mr/sink-as dux/prefix-keys and I've hunted arround the code and it seems to be in parkour.remote.dux namespace. Could you give me a hint?

Thank you very much,

Stefan

llasram commented 9 years ago

There's a check in FileOutputFormat to ensure during job initialization that the top-level specified output directory doesn't exist, so I don't think this approach will work. Fortunately, I don't think it is necessary. I've added an answer to your linked SO question describing the approach I believe is the best solution to this problem: http://stackoverflow.com/questions/7713316/how-to-overwrite-reuse-the-exisitng-output-path-for-hadoop-jobs-again-and-agin/29654818#29654818

stanfea commented 9 years ago

I do need it to overwrite existing files in case we want to reprocess the same input files. I've removed the check in FileOutputFormat.java but need to set overwrite true on FSDataOutputStream fileOut = fs.create(file, true);

I see this in parkour.remote.dux:

(defn record-writer*
  {:tag `IRecordWriter}
  [^TaskAttemptContext context]
  (reify IRecordWriter
    (write [_ key val]
      (throw (ex-info "Dux output tuple written to non-named output"
                      {:key key, :val val})))
    (close [_ context] #_pass)))

but not sure how to specify the fileOut like in the java code return new LineRecordWriter<K, V>(fileOut, keyValueSeparator); ?

Thanks!

llasram commented 9 years ago

If you've got a modified TextOutputFormat with the overwrite behavior you want, then that's all you need. I had to refresh my memory of how this code works, but the record writer implementation in parkour.remote.dux is just a stub to catch accidental attempts at normal (non-demultiplexed) output. The actually relevant code is in the parkour.io.dux/new-rw function, which just delegates to the wrapped OutputFormat. Just make a dseq specifying your OutputFormat class and you should be good to go.

That said, I will briefly try to dissuade you from this approach. Creating a custom output format with so much non-standard behavior adds cognitive overhead, and may be brittle over future changes in core Hadoop. You can get the same effect through either managing collections of output directories as I suggested (and deleting ones you want to "overwrite"), or if you really really want a single directory by post-processing output to temporary directories to replace-move results into your final aggregate directory.

stanfea commented 9 years ago

Thanks for your input :) I'll go with tmp dirs and aggregating output