damballa / parkour

Hadoop MapReduce in idiomatic Clojure.
Apache License 2.0
257 stars 19 forks source link

Read simultaneously from multiple inputs #18

Closed stanfea closed 9 years ago

stanfea commented 9 years ago

Sorry hope I'm not being a bother,

In the docs: https://github.com/damballa/parkour/blob/master/doc/multi-io.md

You mention under Multiple inputs:

The second means that tasks (usually map tasks) read simultaneously from multiple inputs e.g. for >performing joins. Parkour does not as yet have any special support for this type of multiple input.

I thought I could resolve this like this:

(->
                    [(pg/input (mra/dseq [A] A-path))
                     (pg/input (mra/dseq [B] B-path))]
                    (pg/map #'tokeyvalue-mapper)
                    (pg/partition (mra/shuffle [:string [A B]]))
                    (pg/reduce #'calc-reducer)
                    (pg/output :calc (mra/dsink [C] C-path))
                    (pg/execute conf "JOB"))

But I don't want a reducer agreggating all the data I'd like a mapper so I can process each [A B] dataset and sink that but a map node can't take a partition node as input.

Is there any way to get this "Read simultaneously from multiple inputs" functionality to work?

Thanks,

Stefan

ps: The tokeyvalue-mapper does V => K,V where K is a date string extracted from V.

llasram commented 9 years ago

Check out the Hadoop CompositeInputFormat class in the org.apache.hadoop.mapreduce.lib.join package. I haven't done this myself, but you should be able to write a dseq configuring it, then use that as the (to Parkour) single input to your graph API job. Do note the limitations regarded sortedness and identical partitions, which are unavoidable for this sort of map-side join. The documentation on CompositeInputFormat seems to be a bit sparse, but there's a few more details and an example in Hadoop: the Definitive Guide.

Alternatively, if one of your datasets is small enough to fit in memory, you can ship it via the distributed cache as a dval and do the join exactly as you would write a normal Clojure function to "join" a collection against a map.

HTH, and no bother!

stanfea commented 9 years ago

Hey,

I've implemented it like this:

(defn dseq
  "Distributed sequence for composite avro inputs"
  [[ks vs] & paths]
  (fn [^Job job]
    (let [jobconf (.getConfiguration job)
          fmt (CompositeInputFormat/compose
               "inner"
               ClojureAvroKeyValueInputFormat
               (fs/path-array paths))]
      (.set jobconf "mapred.join.expr" fmt)
      (AvroJob/setInputKeySchema job (avro/parse-schema ks))
      (AvroJob/setInputValueSchema job (avro/parse-schema vs))
      (doto job
        (AvroJob/setDataModelClass ClojureData)
        (.setInputFormatClass CompositeInputFormat)))))

and I'm using it like this

(-> (composite-avro/dseq [:string [A B]] A-path B-path)
      (pg/input)
      (pg/map #'mapper)
      (pg/output :out (mra/dsink [:string C] C-path))
      (pg/execute conf "JOB"))

Unfortunately the AvroKey doesn't seem to get unpacked to a string and it's complaning:

INFO  org.apache.hadoop.mapred.MapTask: Processing split: org.apache.hadoop.mapreduce.lib.join.CompositeInputSplit@5d0587f2
INFO  org.apache.hadoop.mapred.LocalJobRunner: map task executor complete.
WARN  org.apache.hadoop.mapred.LocalJobRunner: job_local73519654_0001
java.lang.Exception: java.lang.ClassCastException: org.apache.avro.mapred.AvroKey cannot be cast to org.apache.hadoop.io.WritableComparable
    at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
Caused by: java.lang.ClassCastException: org.apache.avro.mapred.AvroKey cannot be cast to org.apache.hadoop.io.WritableComparable
    at org.apache.hadoop.mapreduce.lib.join.WrappedRecordReader.next(WrappedRecordReader.java:191)
    at org.apache.hadoop.mapreduce.lib.join.WrappedRecordReader.nextKeyValue(WrappedRecordReader.java:179)
    at org.apache.hadoop.mapreduce.lib.join.WrappedRecordReader.initialize(WrappedRecordReader.java:90)
    at org.apache.hadoop.mapreduce.lib.join.CompositeRecordReader.initialize(CompositeRecordReader.java:100)
    at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.initialize(MapTask.java:545)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:783)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
    at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Any ideas on how to get this to work?

Thanks!

llasram commented 9 years ago

Hmm. I don't know that this is the only problem, but it appears that CompositeInputFormat only works with objects which implement WritableComparable. The AvroKey and AvroValue objects used by the Avro input formats do not, and hence just aren't compatible. I'm unfortunately not sure there's a good work-around without re-writing lots of code.

stanfea commented 9 years ago

Right I saw I think I'll discard avro for this usecase in the mean time as don't have time but would like to come back to this.

Thanks :)