RevolutionAnalytics / RHadoop

RHadoop
https://github.com/RevolutionAnalytics/RHadoop/wiki
763 stars 278 forks source link

Problem in output.format #132

Closed SudipSinha closed 11 years ago

SudipSinha commented 12 years ago

I tried running this simple code. [Here "iris.csv" can be replaced by any csv file in the HDFS.]

Code

mapreduce(
  input = "iris.csv", output = NULL,
  input.format  = make.input.format("csv", sep=","),
  output.format = make.output.format("csv", sep=","),
  vectorized = TRUE, structured = TRUE
)

The code exited with error code 1. I've put the logs below. If I remove the output.format argument, the code works. I'm confused that such a simple code is not working for me.

My system configuration is as follows: CentOS; 8GB RAM; CDH3u4; rmr v1.3.1

Runtime Output

packageJobJar: [/tmp/Rtmpsjr7nh/rmr-local-env, /tmp/Rtmpsjr7nh/rmr-global-env, /tmp/Rtmpsjr7nh/rhstr.mapf673b9d9ebd, /usr/local/hadoop/hadoop-datastore/hadoop-hadoop/hadoop-unjar7199379222409918582/] [] /tmp/streamjob7688681986774163414.jar tmpDir=null 12/09/21 11:33:45 WARN snappy.LoadSnappy: Snappy native library is available 12/09/21 11:33:45 INFO util.NativeCodeLoader: Loaded the native-hadoop library 12/09/21 11:33:45 INFO snappy.LoadSnappy: Snappy native library loaded 12/09/21 11:33:45 INFO mapred.FileInputFormat: Total input paths to process : 1 12/09/21 11:33:45 INFO streaming.StreamJob: getLocalDirs(): [/usr/local/hadoop/hadoop-datastore/hadoop-hadoop/mapred/local] 12/09/21 11:33:45 INFO streaming.StreamJob: Running job: job_201209211122_0003 12/09/21 11:33:45 INFO streaming.StreamJob: To kill this job, run: 12/09/21 11:33:45 INFO streaming.StreamJob: /usr/local/hadoop/hadoop/bin/hadoop job -Dmapred.job.tracker=localhost:54311 -kill job_201209211122_0003 12/09/21 11:33:45 INFO streaming.StreamJob: Tracking URL: http://localhost.localdomain:50030/jobdetails.jsp?jobid=job_201209211122_0003 12/09/21 11:33:46 INFO streaming.StreamJob: map 0% reduce 0% 12/09/21 11:34:09 INFO streaming.StreamJob: map 100% reduce 100% 12/09/21 11:34:09 INFO streaming.StreamJob: To kill this job, run: 12/09/21 11:34:09 INFO streaming.StreamJob: /usr/local/hadoop/hadoop/bin/hadoop job -Dmapred.job.tracker=localhost:54311 -kill job_201209211122_0003 12/09/21 11:34:09 INFO streaming.StreamJob: Tracking URL: http://localhost.localdomain:50030/jobdetails.jsp?jobid=job_201209211122_0003 12/09/21 11:34:09 ERROR streaming.StreamJob: Job not successful. Error: NA 12/09/21 11:34:09 INFO streaming.StreamJob: killJob... Streaming Command Failed! Error in mr(map = map, reduce = reduce, combine = combine, in.folder = if (is.list(input)) { : hadoop streaming failed with error code 1

Task logs from Job tracker

stdout logs

stderr logs

Loading required package: Rcpp Loading required package: RJSONIO Loading required package: methods Loading required package: itertools Loading required package: iterators Loading required package: digest Error in data.frame(..., check.names = FALSE) : arguments imply differing number of rows: 0, 75 Calls: ... write.table -> is.data.frame -> cbind -> cbind -> data.frame Execution halted java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1 at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:362) at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:572) at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:136) at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:57) at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:391) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325) at org.apache.hadoop.mapred.Child$4.run(Child.java:270) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1177) at org.apache.hadoop.mapred.Child.main(Child.java:264)

syslog logs

2012-09-21 11:37:50,319 INFO org.apache.hadoop.util.NativeCodeLoader: Loaded the native-hadoop library 2012-09-21 11:37:50,438 INFO org.apache.hadoop.filecache.TrackerDistributedCacheManager: Creating symlink: /usr/local/hadoop/hadoop-datastore/hadoop-hadoop/mapred/local/taskTracker/hadoop/jobcache/job_201209211122_0003/jars/rmr-local-env <- /usr/local/hadoop/hadoop-datastore/hadoop-hadoop/mapred/local/taskTracker/hadoop/jobcache/job_201209211122_0003/attempt_201209211122_0003_m_000001_3/work/rmr-local-env 2012-09-21 11:37:50,447 INFO org.apache.hadoop.filecache.TrackerDistributedCacheManager: Creating symlink: /usr/local/hadoop/hadoop-datastore/hadoop-hadoop/mapred/local/taskTracker/hadoop/jobcache/job_201209211122_0003/jars/.job.jar.crc <- /usr/local/hadoop/hadoop-datastore/hadoop-hadoop/mapred/local/taskTracker/hadoop/jobcache/job_201209211122_0003/attempt_201209211122_0003_m_000001_3/work/.job.jar.crc 2012-09-21 11:37:50,448 INFO org.apache.hadoop.filecache.TrackerDistributedCacheManager: Creating symlink: /usr/local/hadoop/hadoop-datastore/hadoop-hadoop/mapred/local/taskTracker/hadoop/jobcache/job_201209211122_0003/jars/job.jar <- /usr/local/hadoop/hadoop-datastore/hadoop-hadoop/mapred/local/taskTracker/hadoop/jobcache/job_201209211122_0003/attempt_201209211122_0003_m_000001_3/work/job.jar 2012-09-21 11:37:50,449 INFO org.apache.hadoop.filecache.TrackerDistributedCacheManager: Creating symlink: /usr/local/hadoop/hadoop-datastore/hadoop-hadoop/mapred/local/taskTracker/hadoop/jobcache/job_201209211122_0003/jars/rmr-global-env <- /usr/local/hadoop/hadoop-datastore/hadoop-hadoop/mapred/local/taskTracker/hadoop/jobcache/job_201209211122_0003/attempt_201209211122_0003_m_000001_3/work/rmr-global-env 2012-09-21 11:37:50,451 INFO org.apache.hadoop.filecache.TrackerDistributedCacheManager: Creating symlink: /usr/local/hadoop/hadoop-datastore/hadoop-hadoop/mapred/local/taskTracker/hadoop/jobcache/job_201209211122_0003/jars/rhstr.mapf67208bd2e5 <- /usr/local/hadoop/hadoop-datastore/hadoop-hadoop/mapred/local/taskTracker/hadoop/jobcache/job_201209211122_0003/attempt_201209211122_0003_m_000001_3/work/rhstr.mapf67208bd2e5 2012-09-21 11:37:50,516 INFO org.apache.hadoop.metrics.jvm.JvmMetrics: Initializing JVM Metrics with processName=MAP, sessionId= 2012-09-21 11:37:50,648 INFO org.apache.hadoop.util.ProcessTree: setsid exited with exit code 0 2012-09-21 11:37:50,651 INFO org.apache.hadoop.mapred.Task: Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@6244b0f8 2012-09-21 11:37:50,773 WARN org.apache.hadoop.io.compress.snappy.LoadSnappy: Snappy native library is available 2012-09-21 11:37:50,773 INFO org.apache.hadoop.io.compress.snappy.LoadSnappy: Snappy native library loaded 2012-09-21 11:37:50,780 INFO org.apache.hadoop.mapred.MapTask: numReduceTasks: 1 2012-09-21 11:37:50,786 INFO org.apache.hadoop.mapred.MapTask: io.sort.mb = 100 2012-09-21 11:37:50,811 INFO org.apache.hadoop.mapred.MapTask: data buffer = 79691776/99614720 2012-09-21 11:37:50,811 INFO org.apache.hadoop.mapred.MapTask: record buffer = 262144/327680 2012-09-21 11:37:50,838 INFO org.mortbay.log: Logging to org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via org.mortbay.log.Slf4jLog 2012-09-21 11:37:50,839 INFO org.apache.hadoop.streaming.PipeMapRed: PipeMapRed exec [/usr/bin/Rscript, rhstr.mapf67208bd2e5] 2012-09-21 11:37:50,859 INFO org.apache.hadoop.streaming.PipeMapRed: R/W/S=1/0/0 in:NA [rec/s] out:NA [rec/s] 2012-09-21 11:37:50,860 INFO org.apache.hadoop.streaming.PipeMapRed: R/W/S=10/0/0 in:NA [rec/s] out:NA [rec/s] 2012-09-21 11:37:51,566 INFO org.apache.hadoop.streaming.PipeMapRed: MRErrorThread done 2012-09-21 11:37:51,567 INFO org.apache.hadoop.streaming.PipeMapRed: PipeMapRed failed! 2012-09-21 11:37:51,592 INFO org.apache.hadoop.mapred.TaskLogsTruncater: Initializing logs' truncater with mapRetainSize=-1 and reduceRetainSize=-1 2012-09-21 11:37:51,595 WARN org.apache.hadoop.mapred.Child: Error running child java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1 at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:362) at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:572) at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:136) at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:57) at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:391) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325) at org.apache.hadoop.mapred.Child$4.run(Child.java:270) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1177) at org.apache.hadoop.mapred.Child.main(Child.java:264) 2012-09-21 11:37:51,598 INFO org.apache.hadoop.mapred.Task: Runnning cleanup for the task

piccolbo commented 12 years ago

Thanks for the thorough report, that gave me a very good starting point. This is a bug in rmr. The problem is that when the structured option is on and the key is NULL, the conversion to data frame routine returns a 0x0 data frame instead of just NULL. The output format tries to convert k and v to data frame (they are already in this case) and the does a cbind before writing out and that fails because the two data frames have different numbers of rows (recycling is not applied uniformly in R and it would fail in this case anyway). So the workarounds are:

  1. Upgrade to rmr2. Unless you are in production, it seems like a good idea. It's available for download albeit marked experimental or from git as rmr-2.0 branch. It's a lot better for structured data, no question, but the API is not backwards compatible, some porting is required. I ported all my examples and I don't have any scars to show for it, but if you have lots of code it may take some time. By they way all my examples are shorter, faster and more readable. I verified that rmr2 doesn't have this problem.
  2. Make sure the key being written out is not NULL, writing your own mapper or reducer, whichever is last.
  3. Write your own output format that can handle that. This is what we have now

    csv.output.format = function(...) function(k, v, con, vectorized) 
    write.table(file = con, 
               x = if(is.null(k)) v else cbind(k,v), 
               ..., 
               row.names = FALSE, 
               col.names = FALSE)

    just replace that if with a more detailed check

  4. wait for me to issue a hotfix. With the next release imminent and some workarounds available, I am not sure we are going to do it. Will get back to you on this one.

I hope this helps

Antonio

piccolbo commented 11 years ago

rmr-2.0.2 is now the stable release