RevolutionAnalytics / rmr2

A package that allows R developer to use Hadoop MapReduce
160 stars 149 forks source link

rmr2 mapreduce does not produce any output #179

Open byu777 opened 8 years ago

byu777 commented 8 years ago

I tried the following simple script on rmr2 in Cloudera Quickstart 5.7.0 but mapreduce does not generate any results. Here is the script:

small.ints <- to.dfs(1:10)
out <- mapreduce(input = small.ints, map = function(k, v) keyval(v, v^2))
from.dfs(out)

Here is the output:

> small.ints <- to.dfs(1:10)
16/08/07 20:14:42 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
16/08/07 20:14:42 INFO compress.CodecPool: Got brand-new compressor [.deflate]
Warning message:
S3 methods ‘gorder.default’, ‘gorder.factor’, ‘gorder.data.frame’, ‘gorder.matrix’, ‘gorder.raw’ were declared in NAMESPACE but not found 
> out <- mapreduce(input = small.ints, map = function(k, v) keyval(v, v^2))
16/08/07 20:14:48 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
packageJobJar: [] [/usr/jars/hadoop-streaming-2.6.0-cdh5.7.0.jar] /tmp/streamjob543400947433267521.jar tmpDir=null
16/08/07 20:14:49 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
16/08/07 20:14:49 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
16/08/07 20:14:50 INFO mapred.FileInputFormat: Total input paths to process : 1
16/08/07 20:14:50 INFO mapreduce.JobSubmitter: number of splits:2
16/08/07 20:14:50 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1470447912721_0016
16/08/07 20:14:50 INFO impl.YarnClientImpl: Submitted application application_1470447912721_0016
16/08/07 20:14:50 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1470447912721_0016/
16/08/07 20:14:50 INFO mapreduce.Job: Running job: job_1470447912721_0016
16/08/07 20:15:00 INFO mapreduce.Job: Job job_1470447912721_0016 running in uber mode : false
16/08/07 20:15:00 INFO mapreduce.Job:  map 0% reduce 0%
16/08/07 20:15:12 INFO mapreduce.Job:  map 50% reduce 0%
16/08/07 20:15:13 INFO mapreduce.Job:  map 100% reduce 0%
16/08/07 20:15:13 INFO mapreduce.Job: Job job_1470447912721_0016 completed successfully
16/08/07 20:15:13 INFO mapreduce.Job: Counters: 30
    File System Counters
        FILE: Number of bytes read=0
        FILE: Number of bytes written=236342
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=1001
        HDFS: Number of bytes written=244
        HDFS: Number of read operations=14
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=4
    Job Counters 
        Launched map tasks=2
        Data-local map tasks=2
        Total time spent by all maps in occupied slots (ms)=19917
        Total time spent by all reduces in occupied slots (ms)=0
        Total time spent by all map tasks (ms)=19917
        Total vcore-seconds taken by all map tasks=19917
        Total megabyte-seconds taken by all map tasks=9958500
    Map-Reduce Framework
        Map input records=3
        Map output records=0
        Input split bytes=208
        Spilled Records=0
        Failed Shuffles=0
        Merged Map outputs=0
        GC time elapsed (ms)=115
        CPU time spent (ms)=1300
        Physical memory (bytes) snapshot=239222784
        Virtual memory (bytes) snapshot=2127200256
        Total committed heap usage (bytes)=121503744
    File Input Format Counters 
        Bytes Read=793
    File Output Format Counters 
        Bytes Written=244
16/08/07 20:15:13 INFO streaming.StreamJob: Output directory: /tmp/file10106a0b36b6
> from.dfs(out)
$key
NULL

$val
NULL

to.dfs and from.dfs do work since I tried the following:

> small.ints <- to.dfs(1:10)
16/08/07 07:15:34 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
16/08/07 07:15:34 INFO compress.CodecPool: Got brand-new compressor [.deflate]
> out <- from.dfs(small.ints)
16/08/07 07:15:44 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
16/08/07 07:15:44 INFO compress.CodecPool: Got brand-new decompressor [.deflate]
> out
$key
NULL

$val
 [1]  1  2  3  4  5  6  7  8  9 10
byu777 commented 8 years ago

I figured this out now. I installed rmr2 from within RStudio and somehow the library was not available to the script even though the mapreduce function seems to run successfully. I was surprised that in one of the logs, I read that rmr2 was not found, but the script still gave me a _SUCCESS!

I eventually installed rmr2 fresh in R (using sudo R), with the required packages, reshape2 and caTools, and everything seems to work fine now.

thecodeflash commented 7 years ago

Hey @byu777 , I am facing the same problem. Can you please help me out with this? I tried installing rmr2 using R CMD but the output is still the same.

Surender1984 commented 7 years ago

Hi byu777/VJ-Vikvy, , any luck to solve this issue. I a, also facing the same issue.

thecodeflash commented 7 years ago

Hey @Surender1984 , Please install the RMR2 package using:

sudo R CMD INSTALL rmr2.tar.gz

But before doing that install all the required packages, as root user this will solve your issue. Please let me know if you face any issues.

Also, I think RMR2 and RHDFS is dead and we need to switch to Spark. What are your opinion on this?

sharwinbobde commented 6 years ago

@byu777 Problem still not solved for me :(

Code :

#########################################################################
#########################################################################
Sys.setenv(HADOOP_HOME="/home/sharwin/Programs/hadoop-2.7.5") 
Sys.setenv(HADOOP_CMD="/home/sharwin/Programs/hadoop-2.7.5/bin/hadoop")
Sys.setenv(HADOOP_STREAMING="/home/sharwin/Programs/hadoop-2.7.5/share/hadoop/tools/lib/hadoop-streaming-2.7.5.jar")
Sys.setenv(JAVA_HOME="/usr/lib/jvm/java-9-oracle")

library(rJava)
library(rhdfs)
library(rmr2)
library(reshape2)
library(caTools)

hdfs.init() 
# Clear previous output
hdfs.rmr('/test/out')

#============================================================

map <- function(k,lines) {
     words.list <- strsplit(lines, '\\s')
     words <- unlist(words.list)
     return( keyval(words, 1) )
}

reduce <- function(word, counts) {
     keyval(word, sum(counts))
}

wordcount <- function (input, output=NULL) {
     mapreduce(input=input, output=output, input.format="text", map=map, reduce=reduce)
}

## read text files from folder example/wordcount/data
hdfs.root <- '/test'
hdfs.data <- file.path(hdfs.root, 'data')

## save result in folder example/wordcount/out
hdfs.out <- file.path(hdfs.root, 'out')

## Submit job
out <- wordcount(hdfs.data, hdfs.out) 

## Fetch results from HDFS
results <- from.dfs(out)
results.df <- as.data.frame(results, stringsAsFactors=F)
colnames(results.df) <- c('word', 'count')

head(results.df)
thecodeflash commented 6 years ago

@sharwinbobde Please install the RMR2 package using:

sudo R CMD INSTALL rmr2.tar.gz

But before doing that install all the required packages, as root user this will solve your issue. Please let me know if you face any issues.