damballa / parkour

Hadoop MapReduce in idiomatic Clojure.
Apache License 2.0
257 stars 19 forks source link

chaining and multiplexing issue when no data to write #15

Closed sroee closed 9 years ago

sroee commented 9 years ago

Hi, When chaining output to input, and output has 0 records to write (in case of multiplexing), then file won't be created and the chained input will fail the whole job.

In my case:

(let [[err-node data-node] (-> node
                                 (pg/output :err (seqf/dsink [BytesWritable BytesWritable])
                                            :data (text/dsink)))]
      [(pg/input data-node) (pg/input err-node)])

In case of no items to write in :err, the folder won't be created and the input, right after will fail with the following error:

15/02/03 19:11:27 ERROR security.UserGroupInformation: PriviledgedActionException as:Roee cause:org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: file:/tmp/Roee-1422963858120-743895824-parkour-transient/t-15354
15/02/03 19:11:27 ERROR parkour.tool: Uncaught exception: org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: file:/tmp/Roee-1422963858120-743895824-parkour-transient/t-15354
org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: file:/tmp/Roee-1422963858120-743895824-parkour-transient/t-15354
    at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:235)
    at org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.java:55)
    at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:252)
    at parkour.remote.mux$input_format$reify__12695$fn__12697.invoke(mux.clj:72)
    at clojure.core.reducers$mapcat$fn__3638$fn__3639.invoke(reducers.clj:178)
    at clojure.lang.ArrayChunk.reduce(ArrayChunk.java:63)
    at clojure.core.protocols$fn__6051.invoke(protocols.clj:98)
    at clojure.core.protocols$fn__6015$G__6010__6024.invoke(protocols.clj:19)
    at clojure.core.protocols$seq_reduce.invoke(protocols.clj:31)
    at clojure.core.protocols$fn__6036.invoke(protocols.clj:54)
    at clojure.core.protocols$fn__5989$G__5984__6002.invoke(protocols.clj:13)
    at clojure.core.reducers$folder$reify__3608.coll_reduce(reducers.clj:126)
    at clojure.core$reduce.invoke(core.clj:6177)
    at clojure.core$into.invoke(core.clj:6229)
    at parkour.remote.mux$input_format$reify__12695.getSplits(mux.clj:66)
    at parkour.hadoop.ProxyInputFormat.getSplits(ProxyInputFormat.java:20)
    at org.apache.hadoop.mapred.JobClient.writeNewSplits(JobClient.java:1054)
    at org.apache.hadoop.mapred.JobClient.writeSplits(JobClient.java:1071)
    at org.apache.hadoop.mapred.JobClient.access$700(JobClient.java:179)
    at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:983)
    at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:936)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:394)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)
    at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:936)
    at org.apache.hadoop.mapreduce.Job.submit(Job.java:550)
    at parkour.graph$run_job$fn__6012.invoke(graph.clj:363)
    at parkour.graph$run_job.invoke(graph.clj:361)
    at parkour.graph$eval6031$fn__6032$fn__6033.doInvoke(graph.clj:393)
        ...
llasram commented 9 years ago

Definitely a bug. This might be somewhat tricky to fix while still preserving lazy file-creation. Any ideas or proposals @sroee before I start exploring potential solutions?

sroee commented 9 years ago

Well I thought that the output might better stay lazy, but is it possible that the input will

  1. fallback to some empty collection? or
  2. stop execution of the task following it?

for example of 2:

(let [[err-node data-node] (-> node
                                 (pg/output :err (seqf/dsink [BytesWritable BytesWritable])
                                            :data (text/dsink)))]
      [(-> (pg/input data-node)
         (pg/map .. will happen...)
         ...)
       (-> (pg/input err-node)
         (pg/map .. will not be executed)
         ...)])

though for my needs solution 1 sounds good enough.

sroee commented 9 years ago

BTW my version of parkour is 0.6.1

llasram commented 9 years ago

It turns out this is pretty difficult to handle. We want to be able to detect and error on missing input paths, but (a) Hadoop's handling of 0-split inputs skips directory-creation, and (b) the current multi-job graph support in Parkou doesn't currently support any state between jobs. This is still solvable, but is going to take a little bit of design (re)thinking.

llasram commented 9 years ago

It turns out I had a separate bug plus some Hadoop version differences obscuring this issue. Fixed in the develop branch; I'll push a new release with the fix in the next few days.

llasram commented 9 years ago

Parkour 0.6.2 is now released and includes the fix for this issue.

sroee commented 9 years ago

thanks!