RevolutionAnalytics / RHadoop

RHadoop
https://github.com/RevolutionAnalytics/RHadoop/wiki
763 stars 278 forks source link

Hadoop streaming issue (apparently) with mapreduce #21

Closed kagharpure closed 12 years ago

kagharpure commented 13 years ago

Hi all,

Firstly, I'd like to apologize for not sticking to gfm conventions: it's almost midnight, my brain is nowhere near the functional level required to figure something new out and I'm posting here for the first time. Now:

I am somewhat familiar with R (~1.5 years), fairly new to Hadoop and very new to RHadoop. I've recently been trying to run a couple of simple linear regression codes written using RHadoop; however, I seem to be running repeatedly into what seems to be a hadoop streaming issue. Before I jump into it, I shall explain the codes and where I'm running them a little.

Both of the codes mentioned above expect an input csv on the HDFS with no headers and only containing values for the predictor variables and the dependent variable, which must be the last variable (counting left to right)

Since I'm not entirely sure yet of the way RHadoop works, I wrote both codes with different assumptions on RHadoop's working. One code is based on the assumption that text files will be processed line-by-line by the mapper and the second one is a logical rip-off from an equivalent Rhipe code, where the assumption is that for each mapper there will exist (magically) a list of values (a la map.values in Rhipe).

First code (assuming mapper processes line-by-line):

### Script: RHadoopReg.R ###
# load required libraries
library(rhdfs)
library(rmr)

# define function rHdp.lm
rHdp.lm <- function(input,output = NULL){

    # define mapper as function
    map <- function(k,v){
        # split input line on commas
        val <- strsplit(v, ',')[[1]] 

        # extract predictors and bind
        #1 to the right
        x <- c(1,as.numeric(val[-length(val)]))

        # extract dependent
        y <- as.numeric(val[length(val)])

        # compute outer of x and x
        # for m predictors, this should
        # yield a (m + 1)x(m + 1) matrix
        val1 <- outer(x,x)

        # multiply x with dependent
        # this should yield a vector
        # of length (m + 1)
        val2 <- x*y

        # cbind val1 and val2 and write
        # out with NULL key
        keyval('dummy_key',cbind(val1,val2))
    }

    # define reducer as function
    reduce <- function(k,vv){

        # initialize val as zero
        val <- 0

        # loop through each (matrix)
        # element of vv and take matrix
        # sums
        for(i in 1:length(vv)){
            val = val + as.numeric(vv[[i]])
        }

        # extract t(x)*x
        x <- val[,-dim(val)[2]]

        # extract t(x)*y
        y <- val[,dim(val)[2]]

        # use solve to get betas
        betas <- solve(as.matrix(x),as.matrix(y))

        # write out with key betas
        keyval('betas',betas)
    }

    # define as a map-reduce job
    mapreduce(input = input, 
            output = output, 
            map = map,
            reduce = reduce)
}

rHdp.lm('/user/hadoop/regData','/user/hadoop/rTestRegOut')

Second code (assuming mappers process lists of values):

### Script: RHadoopReg1.R ###
# load required libraries
library(rhdfs)
library(rmr)

# define function rHdp.lm1
rHdp.lm1 <- function(input,output = NULL){

    # define mapper as function
    map <- function(k,v){
        # split input line on commas
        datMat <- do.call('rbind',lapply(v, function(r){
            as.numeric(strsplit(r,',')[[1]])
        }))

        # extract dependent
        y <- as.matrix(datMat[,dim(datMat)[2]])

        # extract predictors and bind
        #1 to the right
        x <- cbind(rep(1,dim(datMat)[1]),as.matrix(datMat[,-dim(datMat)[2]]))

        # compute crossprod of x and x
        # for m predictors, this should
        # yield a (m + 1)x(m + 1) matrix
        val1 <- crossprod(as.matrix(x),as.matrix(x))

        # multiply x with dependent
        # this should yield a vector
        # of length (m + 1)
        val2 <- crossprod(as.matrix(x),as.matrix(y))

        # cbind val1 and val2 and write
        # out with NULL key
        keyval('temp_out',cbind(val1,val2))
    }

    # define reducer as function
    reduce <- function(k,vv){

        # initialize val as zero
        val <- 0

        # loop through each (matrix)
        # element of vv and take matrix
        # sums
        for(i in 1:length(vv)){
            val = val + as.numeric(vv[[i]])
        }

        # extract t(x)*x
        x <- val[,-dim(val)[2]]

        # extract t(x)*y
        y <- val[,dim(val)[2]]

        # use solve to get betas
        betas <- solve(as.matrix(x),as.matrix(y))

        # write out with key betas
        keyval('betas',betas)
    }

    # define as a map-reduce job
    mapreduce(input = input, 
            output = output, 
            map = map,
            reduce = reduce)
}

rHdp.lm1('/user/hadoop/regData/','/user/hadoop/rTestRegOut1')

I've tried running these codes both on a 14-node hadoop cluster, with nodes running hadoop-0.20.2.append on either Ubuntu Lucid or Maverick, and a single node installation, running on Ubuntu Lucid which itself runs on VMPlayer on Windows 7.

Now, upon sourcing either of these two codes in R, I repeatedly keep running into the following error:

> source("RHadoopReg.R")
packageJobJar: [/tmp/RtmpIKd3Fr/rhstr.map287d3049, /tmp/RtmpIKd3Fr/rhstr.reduce20ae0aad, /tmp/RtmpIKd3Fr/rmrParentEnv, /tmp/RtmpIKd3Fr/rmrLocalEnv, /usr/local/hadoop/hadoop-datastore/hadoop-hadoop/hadoop-unjar7948228214583466863/] [] /tmp/streamjob2980539178006618836.jar tmpDir=null
11/10/20 10:51:33 INFO mapred.FileInputFormat: Total input paths to process : 1
11/10/20 10:51:34 INFO streaming.StreamJob: getLocalDirs(): [/usr/local/hadoop/hadoop-datastore/hadoop-hadoop/mapred/local]
11/10/20 10:51:34 INFO streaming.StreamJob: Running job: job_201110200316_0004
11/10/20 10:51:34 INFO streaming.StreamJob: To kill this job, run:
11/10/20 10:51:34 INFO streaming.StreamJob: /usr/local/hadoop/hadoop/bin/../bin/hadoop job  -Dmapred.job.tracker=localhost:54311 -kill job_201110200316_0004
11/10/20 10:51:35 INFO streaming.StreamJob: Tracking URL: http://localhost.localdomain:50030/jobdetails.jsp?jobid=job_201110200316_0004
11/10/20 10:51:36 INFO streaming.StreamJob:  map 0%  reduce 0%
11/10/20 10:52:38 INFO streaming.StreamJob:  map 100%  reduce 0%
11/10/20 10:52:46 INFO streaming.StreamJob:  map 0%  reduce 0%
11/10/20 10:53:17 INFO streaming.StreamJob:  map 100%  reduce 100%
11/10/20 10:53:17 INFO streaming.StreamJob: To kill this job, run:
11/10/20 10:53:17 INFO streaming.StreamJob: /usr/local/hadoop/hadoop/bin/../bin/hadoop job  -Dmapred.job.tracker=localhost:54311 -kill job_201110200316_0004
11/10/20 10:53:17 INFO streaming.StreamJob: Tracking URL: http://localhost.localdomain:50030/jobdetails.jsp?jobid=job_201110200316_0004
11/10/20 10:53:17 ERROR streaming.StreamJob: Job not Successful!
11/10/20 10:53:17 INFO streaming.StreamJob: killJob...
Streaming Job Failed!
Error in rhstream(map = map, reduce = reduce, reduceondataframe = reduceondataframe,  : 
  hadoop streaming failed with error code 1

Further exploring the trace on the jobtracker url, this is what I find (for each attempt):

java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1 at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:311) at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:545) at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:132) at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:57) at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:358) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:307) at org.apache.hadoop.mapred.Child.main(Child.java:170) 

While this trace seems somewhat common on Google, I cannot seem to find anything specifically about it in an RHadoop context.

Additionally, I've also been observing some strange behavior from rhdfs.

This set of commands run fine:

> to.dfs(lapply(1:100, function(i){eps = rnorm(1,sd=10);keyval(i, list(x=c(i,i+eps),y=2*(eps > 0) - 1))}),"/user/hadoop/logreg")
> a <- from.dfs("/user/hadoop/logreg")

However, when I try the same trick with a file already on the HDFS (put there previously using hadoop fs -copyFromLocal), I seem to run into a strange error:

> b <- from.dfs("/user/hadoop/regData")
Error in file(con, "r") : cannot open the connection

I initially suspected that this might have something to do with permissions and I was partially right; when I tried the first set of commands as root, I got the following error:

> to.dfs(lapply(1:100, function(i){eps = rnorm(1,sd=10);keyval(i, list(x=c(i,i+eps),y=2*(eps > 0) - 1))}),"/user/hadoop/logreg")
put: org.apache.hadoop.security.AccessControlException: Permission denied: user=root, access=WRITE, inode="hadoop":hadoop:supergroup:rwxr-xr-x
[1] "/user/hadoop/logreg"

The other set of commands also gave the same error as the first when run as root:

> a <- from.dfs("/user/hadoop/regData/regTestData.csv")
get: org.apache.hadoop.security.AccessControlException: Permission denied: user=root, access=EXECUTE, inode="regData":hadoop:supergroup:rw-r--r--
Error in if (file.info(tmp)[1, "isdir"]) { : 
  missing value where TRUE/FALSE needed

Still clinging to the idea that the issue may be due to permissions, I checked the permissions on the files involved in both sets of commands. However, much to my chagrin, they were both the same (discovered using hadoop fs -ls):

-rw-r--r--   1 hadoop supergroup       4698 2011-10-20 11:16 /user/hadoop/logreg
-rw-r--r--   1 hadoop supergroup      15226 2011-10-20 11:13 /user/hadoop/regData

I have now come to one of the following four conclusions:

1) This is due to a very stupid error on my part 2) This is due to some incompatibility issues between hadoop and RHadoop 3) I should be specifying some parameters that I am not (a la the mapred list in Rhipe) 4) None of the above, this is just plain beyond me

Therefore, I now find myself posting here what's ultimately quite a long post, in the hope that someone will know exactly (or at least vaguely) what's going wrong and point me in the right direction.

Thanks,

Krish.

piccolbo commented 13 years ago

Hi and thanks for your report. Rhipe and rmr are very different in the API and the guts of the implementation, so watch out. Probably better to start mentally from your knowledge of R and Hadoop and not think of Rhipe at all. One problem in both versions of your program is that in rmr the parsing of the input is abstracted away, so that you don't have to repeat the same boilerplate code in every mapper. The goal is to have map(k,v) get two legitimate R objects as arguments. To do that, we use a default format, which is <json object>\t<json object>\n. Of course that doesn't cut it for everyone, so you can specify a parsing function as a parameter to mapreduce, textinputformat. That function takes a line of text and returns a key-value pair. Since you did not specify that function, the input has to be in the default format and that's probably the beginning of your problems -- haven't checked the rest of the code, one thing at a time. The good news is, I added support for csv in dev yesterday. Probably buggy but I'd love some help in getting it in good shape for release and you wouldn't have to invent the wheel again. You need to build the package from src if you go down that route. So you'd just do

mapreduce(your other args here, textinputformat = csvtextinputformat(options describing your specific csv dialect here, as in read.table))

Also I would recommend (beg for?) two things: first read the manual. We are trying very hard to keep it up to date and usable and it is an immense help if people read it and help me improve it rather than retyping select parts of the manual as replies here. People think that to help an open source project they have to understand i-node internals, but the fact is just giving feedback on the manual is so important while maybe unglamorous. The other thing, I know you are after your problem, but it seems to me you'd want to build up to that through simpler jobs, like: mapreduce(in, out, textinputformat = csvtextinputformat(...)), default mapper and reducers and default everything else. This will read your input and write it out unchanged, only going from csv to json. Not very useful but you make sure the parsing is right, check, move on to learn how mappers work, check that works, and so on. At least, that's what works for me. If you are familiar with google Hangout we can have a pair programming session (you need to be confortable with screen sharing of course). Contact me as +piccolbo on google plus and we'll take it from there.

kagharpure commented 13 years ago

Hi Antonio,

Thanks so much for the blazingly fast reply :) Fair, I didn't actually work my way up on this one - guess I made a mistake in assuming that RHadoop and Rhipe couldn't be too dissimilar. I would definitely be willing to build the latest dev branch and play around with the csv format and help whichever way I could.

However, before all of that, I think I must listen to your first suggestion: to read the manual. And to that end, I'd (rather embarrassedly) like to ask - where exactly can I find it? Also, by this do you mean the documentation one gets upon entering '''?''' in the code?

Thanks,

Krish.

Sagely, he observed: one man’s playstation is another man bakery.

-----Original Message----- From: Antonio Piccolboni [mailto:reply@reply.github.com] Sent: Friday, October 21, 2011 12:44 AM To: Krishnaraj Ajay Gharpure Subject: Re: [RHadoop] Hadoop streaming issue (apparently) with mapreduce (#21)

Hi and thanks for your report. Rhipe and rmr are very different in the API and the guts of the implementation, so watch out. Probably better to start mentally from your knowledge of R and Hadoop and not think of Rhipe at all. One problem in both versions of your program is that in rmr the parsing of the input is abstracted away, so that you don't have to repeat the same boilerplate code in every mapper. The goal is to have map(k,v) get two legitimate R objects as arguments. To do that, we use a default format, which is <json object>\t<json object>\n. Of course that doesn't cut it for everyone, so you can specify a parsing function as a parameter to mapreduce, textinputformat. That function takes a line of text and returns a key-value pair. Since you did not specify that function, the input has to be in the default format and that's probably the beginning of your problems -- haven't checked the rest of the code, one thing at a time. The good news is, I added support f or csv in dev yesterday. Probably buggy but I'd love some help in getting it in good shape for release and you wouldn't have to invent the wheel again. You need to build the package from src if you go down that route. So you'd just do

mapreduce(your other args here, textinputformat = csvtextinputformat(options describing your specific csv dialect here, as in read.table)) ````  Also I would recommend (beg for?) two things: first read the manual. We are trying very hard to keep it up to date and usable and it is an immense help if people read it and help me improve it rather than retyping select parts of the manual as replies here. People think that to help an open source project they have to understand i-node internals, but the fact is just giving feedback on the manual is so important while maybe unglamorous. The other thing, I know you are after your problem, but it seems to me you'd want to build up to that through simpler jobs, like:
mapreduce(in, out, textinputformat = csvtextinputformat(...)), default mapper and reducers and default everything else. This will read your input and write it out unchanged, only going from csv to json. Not very useful but you make sure the parsing is right, check, move on to learn how mappers work, check that works, and so on. At least, that's what works for me. If you are familiar with google Hangout we can have a pair programming session (you need to be confortable with screen sharing of course).  Contact me as +piccolbo on google plus and we'll take it from there.

--
Reply to this email directly or view it on GitHub:
https://github.com/RevolutionAnalytics/RHadoop/issues/21#issuecomment-2473034

 This email message may contain proprietary, private and confidential information. The information transmitted is intended only for the person(s) or entities to which it is addressed. Any review, retransmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited and may be illegal. If you received this in error, please contact the sender and delete the message from your system.

Mu Sigma takes all reasonable steps to ensure that its electronic communications are free from viruses. However, given Internet accessibility, the Company cannot accept liability for any virus introduced by this e-mail or any attachment and you are advised to use up-to-date virus checking software.
piccolbo commented 13 years ago

We have a [Tutorial] here on the wiki for the warm up plus more advanced docs and examples and for the details, yes, ?<function name> should work and in particular, you have to do a ?mapreduce, I don't see how you can get away without reading that.

kagharpure commented 13 years ago

Hi Antonio,

Sorry about that, 'twas a particularly stupid question. So in answer yes, I have read through the tutorial, and in retrospect I realized I perhaps misunderstood the '''mapreduce''' documentation - working on the assumption that not specifying a '''textinputformat''' would result in the default being used, on top of which I was willing to live with a boilerplate '''strsplit''' as a stopgap.

However, as you've recommended, I'm trying now from the ground-up, beginning with a super-simple identity mapper and reducer in conjunction from the custom-defined csvinputformat from the deptdelay-rmr.R code in the rmr use cases section, just to see what happens to the lines in my csv as I push them through this combination. The code is:

    ### Script: RTestIdentityMapper ###
    # load required libraries
    library(rhdfs)
    library(rmr)

    #define csv input format
    csvinputformat <- function(line){
            keyval(NULL, unlist(strsplit(line, "\\,")))
    }

    #just read input from HDFS and pass it along relatively unmolested
    testIdentityMapper <- function(input,output=NULL){
            mapreduce(input = input,
                    output = output,
                    map = to.map(identity),
                    reduce = to.reduce(identity),
                    textinputformat = csvinputformat)
    }

    testIdentityMapper('/user/hadoop/regData','/user/hadoop/identityTestOut')

The output to this (head + tail) looks like:

    [ null ]        [ [ "0.404032916", "0", "0", "0.67472872", "0.99672565", "0.078959007", "0.309388681", "0.497276351" ],[ "0.235761758", "1", "1", "0.388492866", "0.397239717", "0.210877186", "0.40227439",    "0.586958232" ],.....,[ "0.058177569", "0", "0", "0.156842501", "0.306746016", "0.377249925", "0.628119007", "0.350995397" ] ]

Which looks to me like (not sure, working on pure guesswork here) the identity mapper is reading each line as a vector, and the identity reducer is thus emitting a list of all such vectors associated with the key '''[null]''' - is this even halfway close to correct?

Based on the idea that the above assumption is correct, I then tried the following code:

    ### Script: RTestVectorAdd ###
    # load required libraries
    library(rhdfs)
    library(rmr)

    #define csv input format
    csvinputformat <- function(line){
            keyval(NULL, unlist(strsplit(line, "\\,")))
    }

    testVectorAdd <- function(input,output=NULL){
            map <- function(k,v){
                    # split input line on commas
                    #val <- strsplit(v, ',')[[1]]
                    val <- as.numeric(as.vector(v))
                    keyval("random",val)
            }
            reduce <- function(k,vv){
                    val <- 0
                    for(i in 1:length(vv)){
                            val = val + as.numeric(vv[[i]])
                    }
                    keyval("out",val)
            }
            mapreduce(input = input,
                            output = output,
                            map = map,
                            #map = to.map(identity),
                            reduce = reduce,
                            #reduce = to.reduce(identity),
                            textinputformat = csvinputformat)
    }

    testVectorAdd('/user/hadoop/testVectors','/user/hadoop/vecAdd1Out')

The data I had used was a csv with the two lines:

    1,2,3,4,5,6,7,8,9
    9,8,7,6,5,4,3,2,1

This then gave me the result I was expecting - that is:

    [ "out" ]       [     10,     10,     10,     10,     10,     10,     10,     10,     10 ]

Will now try matrix manipulations in the reducer in my quest to understand RHadoop. Thanks again for all your help! :)

Before I sign off, I would like to ask one more basic doubt (and I've been through the help files and the unit tests to get an idea here, but seemingly to no avail), somewhat with regards to the latter part of my original query. In the '''to.dfs(lapply(...''' example I copied, I'm guessing that since the '''keyval''' function was used, I could then go ahead and use '''from.dfs''' with impunity and expect an object to be returned. However, how can I replicate this sort of behavior with files already on the HDFS? Specifically, How can I convert a flat file on the HDFS into a serialized R object on the HDFS without reading the entire file into memory in the process?

Thanks,

Krish.

Sagely, he observed: one man’s playstation is another man bakery.

-----Original Message----- From: Antonio Piccolboni [mailto:reply@reply.github.com] Sent: Friday, October 21, 2011 1:21 AM To: Krishnaraj Ajay Gharpure Subject: Re: [RHadoop] Hadoop streaming issue (apparently) with mapreduce (#21)

We have a [Tutorial] here on the wiki for the warm up plus more advanced docs and examples and for the details, yes, ? should work and in particular, you have to do a ?mapreduce, I don't see how you can get away without reading that.

Reply to this email directly or view it on GitHub: https://github.com/RevolutionAnalytics/RHadoop/issues/21#issuecomment-2473472

This email message may contain proprietary, private and confidential information. The information transmitted is intended only for the person(s) or entities to which it is addressed. Any review, retransmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited and may be illegal. If you received this in error, please contact the sender and delete the message from your system.

Mu Sigma takes all reasonable steps to ensure that its electronic communications are free from viruses. However, given Internet accessibility, the Company cannot accept liability for any virus introduced by this e-mail or any attachment and you are advised to use up-to-date virus checking software.

piccolbo commented 13 years ago

Your interpretation on what happened in the first program is correct as far as I can tell. As far as the second question, you need a mapreduce job.

mapreduce(input, output, textinputformat = flatfiletextinputformat)

where I have no idea what you mean by flat file, but that function will capture what you mean and create the objects you need. That will write an equivalent of your input file in output in the JSON-based format. As a matter of modularity, I would move any as.vector and as.numeric to the format function to leave only algorithm logic in the map.

kagharpure commented 13 years ago

Oh, sweet, yeah, moving the '''as.vector''' and '''as.numeric''' functions to the format function would be a very sensible move. Yet again, I cannot thank you enough for all your help, cheers! :)

Sagely, he observed: one man’s playstation is another man bakery.

-----Original Message----- From: Antonio Piccolboni [mailto:reply@reply.github.com] Sent: Friday, October 21, 2011 3:48 AM To: Krishnaraj Ajay Gharpure Subject: Re: [RHadoop] Hadoop streaming issue (apparently) with mapreduce (#21)

Your interpretation on what happened in the first program is correct as far as I can tell. As far as the second question, you need a mapreduce job.

mapreduce(input, output, textinputformat = flatfiletextinputformat) ``` where I have no idea what you mean by flat file, but that function will capture what you mean and create the objects you  need. That will write an equivalent of your input file in output in the JSON-based format. As a matter of modularity, I would move any `as.vector` and `as.numeric` to the format function to leave only algorithm logic in the map.

--
Reply to this email directly or view it on GitHub:
https://github.com/RevolutionAnalytics/RHadoop/issues/21#issuecomment-2475191

 This email message may contain proprietary, private and confidential information. The information transmitted is intended only for the person(s) or entities to which it is addressed. Any review, retransmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited and may be illegal. If you received this in error, please contact the sender and delete the message from your system.

Mu Sigma takes all reasonable steps to ensure that its electronic communications are free from viruses. However, given Internet accessibility, the Company cannot accept liability for any virus introduced by this e-mail or any attachment and you are advised to use up-to-date virus checking software.
RevolutionAnalytics commented 13 years ago

Hi Krish,

We'll take a look at this issue first thing tomorrow.

Thanks

David

On Thu, Oct 20, 2011 at 11:34 AM, kagharpure < reply@reply.github.com>wrote:

Hi all,

Firstly, I'd like to apologize for not sticking to gfm conventions: it's almost midnight, my brain is nowhere near the functional level required to figure something new out and I'm posting here for the first time. Now:

I am somewhat familiar with R (~1.5 years), fairly new to Hadoop and very new to RHadoop. I've recently been trying to run a couple of simple linear regression codes written using RHadoop; however, I seem to be running repeatedly into what seems to be a hadoop streaming issue. Before I jump into it, I shall explain the codes and where I'm running them a little.

Both of the codes mentioned above expect an input csv on the HDFS with no headers and only containing values for the predictor variables and the dependent variable, which must be the last variable (counting left to right)

Since I'm not entirely sure yet of the way RHadoop works, I wrote both codes with different assumptions on RHadoop's working. One code is based on the assumption that text files will be processed line-by-line by the mapper and the second one is a logical rip-off from an equivalent Rhipe code, where the assumption is that for each mapper there will exist (magically) a list of values (a la map.values in Rhipe).

First code (assuming mapper processes line-by-line):

   ### Script: RHadoopReg.R ###
   # load required libraries
   library(rhdfs)
   library(rmr)

   # define function rHdp.lm
   rHdp.lm <- function(input,output = NULL){

           # define mapper as function
           map <- function(k,v){
                   # split input line on commas
                   val <- strsplit(v, ',')[[1]]

                   # extract predictors and bind
                   # 1 to the right
                   x <- c(1,as.numeric(val[-length(val)]))

                   # extract dependent
                   y <- as.numeric(val[length(val)])

                   # compute outer of x and x
                   # for m predictors, this should
                   # yield a (m + 1)x(m + 1) matrix
                   val1 <- outer(x,x)

                   # multiply x with dependent
                   # this should yield a vector
                   # of length (m + 1)
                   val2 <- x*y

                   # cbind val1 and val2 and write
                   # out with NULL key
                   keyval('dummy_key',cbind(val1,val2))
           }

           # define reducer as function
           reduce <- function(k,vv){

                   # initialize val as zero
                   val <- 0

                   # loop through each (matrix)
                   # element of vv and take matrix
                   # sums
                   for(i in 1:length(vv)){
                           val = val + as.numeric(vv[[i]])
                   }

                   # extract t(x)*x
                   x <- val[,-dim(val)[2]]

                   # extract t(x)*y
                   y <- val[,dim(val)[2]]

                   # use solve to get betas
                   betas <- solve(as.matrix(x),as.matrix(y))

                   # write out with key betas
                   keyval('betas',betas)
           }

           # define as a map-reduce job
           mapreduce(input = input,
                           output = output,
                           map = map,
                           reduce = reduce)
   }

   rHdp.lm('/user/hadoop/regData','/user/hadoop/rTestRegOut')

Second code (assuming mappers process lists of values):

   ### Script: RHadoopReg1.R ###
   # load required libraries
   library(rhdfs)
   library(rmr)

   # define function rHdp.lm1
   rHdp.lm1 <- function(input,output = NULL){

           # define mapper as function
           map <- function(k,v){
                   # split input line on commas
                   datMat <- do.call('rbind',lapply(v, function(r){
                           as.numeric(strsplit(r,',')[[1]])
                   }))

                   # extract dependent
                   y <- as.matrix(datMat[,dim(datMat)[2]])

                   # extract predictors and bind
                   # 1 to the right
                   x <-

cbind(rep(1,dim(datMat)[1]),as.matrix(datMat[,-dim(datMat)[2]]))

                   # compute crossprod of x and x
                   # for m predictors, this should
                   # yield a (m + 1)x(m + 1) matrix
                   val1 <- crossprod(as.matrix(x),as.matrix(x))

                   # multiply x with dependent
                   # this should yield a vector
                   # of length (m + 1)
                   val2 <- crossprod(as.matrix(x),as.matrix(y))

                   # cbind val1 and val2 and write
                   # out with NULL key
                   keyval('temp_out',cbind(val1,val2))
           }

           # define reducer as function
           reduce <- function(k,vv){

                   # initialize val as zero
                   val <- 0

                   # loop through each (matrix)
                   # element of vv and take matrix
                   # sums
                   for(i in 1:length(vv)){
                           val = val + as.numeric(vv[[i]])
                   }

                   # extract t(x)*x
                   x <- val[,-dim(val)[2]]

                   # extract t(x)*y
                   y <- val[,dim(val)[2]]

                   # use solve to get betas
                   betas <- solve(as.matrix(x),as.matrix(y))

                   # write out with key betas
                   keyval('betas',betas)
           }

           # define as a map-reduce job
           mapreduce(input = input,
                           output = output,
                           map = map,
                           reduce = reduce)
   }

   rHdp.lm1('/user/hadoop/regData/','/user/hadoop/rTestRegOut1')

I've tried running these codes both on a 14-node hadoop cluster, with nodes running hadoop-0.20.2.append on either Ubuntu Lucid or Maverick, and a single node installation, running on Ubuntu Lucid which itself runs on VMPlayer on Windows 7.

Now, upon sourcing either of these two codes in R, I repeatedly keep running into the following error:

   > source("RHadoopReg.R")
   packageJobJar: [/tmp/RtmpIKd3Fr/rhstr.map287d3049,

/tmp/RtmpIKd3Fr/rhstr.reduce20ae0aad, /tmp/RtmpIKd3Fr/rmrParentEnv, /tmp/RtmpIKd3Fr/rmrLocalEnv, /usr/local/hadoop/hadoop-datastore/hadoop-hadoop/hadoop-unjar7948228214583466863/] [] /tmp/streamjob2980539178006618836.jar tmpDir=null 11/10/20 10:51:33 INFO mapred.FileInputFormat: Total input paths to process : 1 11/10/20 10:51:34 INFO streaming.StreamJob: getLocalDirs(): [/usr/local/hadoop/hadoop-datastore/hadoop-hadoop/mapred/local] 11/10/20 10:51:34 INFO streaming.StreamJob: Running job: job_201110200316_0004 11/10/20 10:51:34 INFO streaming.StreamJob: To kill this job, run: 11/10/20 10:51:34 INFO streaming.StreamJob: /usr/local/hadoop/hadoop/bin/../bin/hadoop job -Dmapred.job.tracker=localhost:54311 -kill job_201110200316_0004 11/10/20 10:51:35 INFO streaming.StreamJob: Tracking URL: http://localhost.localdomain:50030/jobdetails.jsp?jobid=job_201110200316_0004 11/10/20 10:51:36 INFO streaming.StreamJob: map 0% reduce 0% 11/10/20 10:52:38 INFO streaming.StreamJob: map 100% reduce 0% 11/10/20 10:52:46 INFO streaming.StreamJob: map 0% reduce 0% 11/10/20 10:53:17 INFO streaming.StreamJob: map 100% reduce 100% 11/10/20 10:53:17 INFO streaming.StreamJob: To kill this job, run: 11/10/20 10:53:17 INFO streaming.StreamJob: /usr/local/hadoop/hadoop/bin/../bin/hadoop job -Dmapred.job.tracker=localhost:54311 -kill job_201110200316_0004 11/10/20 10:53:17 INFO streaming.StreamJob: Tracking URL: http://localhost.localdomain:50030/jobdetails.jsp?jobid=job_201110200316_0004 11/10/20 10:53:17 ERROR streaming.StreamJob: Job not Successful! 11/10/20 10:53:17 INFO streaming.StreamJob: killJob... Streaming Job Failed! Error in rhstream(map = map, reduce = reduce, reduceondataframe = reduceondataframe, : hadoop streaming failed with error code 1

Further exploring the trace on the jobtracker url, this is what I find (for each attempt):

   java.lang.RuntimeException: PipeMapRed.waitOutputThreads():

subprocess failed with code 1 at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:311) at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:545) at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:132) at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:57) at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:358) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:307) at org.apache.hadoop.mapred.Child.main(Child.java:170)

While this trace seems somewhat common on Google, I cannot seem to find anything specifically about it in an RHadoop context.

Additionally, I've also been observing some strange behavior from rhdfs.

This set of commands run fine:

   > to.dfs(lapply(1:100, function(i){eps = rnorm(1,sd=10);keyval(i,

list(x=c(i,i+eps),y=2*(eps > 0) - 1))}),"/user/hadoop/logreg")

a <- from.dfs("/user/hadoop/logreg")

However, when I try the same trick with a file already on the HDFS (put there previously using hadoop fs -copyFromLocal), I seem to run into a strange error:

   > b <- from.dfs("/user/hadoop/regData")
   Error in file(con, "r") : cannot open the connection

I initially suspected that this might have something to do with permissions and I was partially right; when I tried the first set of commands as root, I got the following error:

   > to.dfs(lapply(1:100, function(i){eps = rnorm(1,sd=10);keyval(i,

list(x=c(i,i+eps),y=2*(eps > 0) - 1))}),"/user/hadoop/logreg") put: org.apache.hadoop.security.AccessControlException: Permission denied: user=root, access=WRITE, inode="hadoop":hadoop:supergroup:rwxr-xr-x [1] "/user/hadoop/logreg"

The other set of commands also gave the same error as the first when run as root:

   > a <- from.dfs("/user/hadoop/regData/regTestData.csv")
   get: org.apache.hadoop.security.AccessControlException: Permission

denied: user=root, access=EXECUTE, inode="regData":hadoop:supergroup:rw-r--r-- Error in if (file.info(tmp)[1, "isdir"]) { : missing value where TRUE/FALSE needed

Still clinging to the idea that the issue may be due to permissions, I checked the permissions on the files involved in both sets of commands. However, much to my chagrin, they were both the same (discovered using hadoop fs -ls):

   -rw-r--r--   1 hadoop supergroup       4698 2011-10-20 11:16

/user/hadoop/logreg -rw-r--r-- 1 hadoop supergroup 15226 2011-10-20 11:13 /user/hadoop/regData

I have now come to one of the following four conclusions:

1) This is due to a very stupid error on my part 2) This is due to some incompatibility issues between hadoop and RHadoop 3) I should be specifying some parameters that I am not (a la the mapred list in Rhipe) 4) None of the above, this is just plain beyond me

Therefore, I now find myself posting here what's ultimately quite a long post, in the hope that someone will know exactly (or at least vaguely) what's going wrong and point me in the right direction.

Thanks,

Krish.

Reply to this email directly or view it on GitHub: https://github.com/RevolutionAnalytics/RHadoop/issues/21

piccolbo commented 12 years ago

Looks like we can close this one.

kagharpure commented 12 years ago

Hi there,

If you've gotten this mail then I am currently out of office and plan to be so till the 6th of November. If you've mailed regarding:

1) HPC, please talk to Praveenesh/Kritisha

2) MxO, please talk to Ram/Subir

3) MMTEX/MMBUZZ, please talk to Gaurav/SMS

4) General R help, please talk to Bharat/Subir

5) VBA help, I can't help, sorry

6) Anything else - with any luck it can wait

In the off-chance that you need to get in touch with me very urgently, you can always try calling me on my cell at +919986050124, though I shall be traveling a lot, so an immediate telephonic response is not guaranteed. Got that same reason, neither is an immediate e-mail response guaranteed.

Thanks,

Krish.

Sagely, he observed: one man's playstation is another man bakery.


This email message may contain proprietary, private and confidential information. The information transmitted is intended only for the person(s) or entities to which it is addressed. Any review, retransmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited and may be illegal. If you received this in error, please contact the sender and delete the message from your system.

Mu Sigma takes all reasonable steps to ensure that its electronic communications are free from viruses. However, given Internet accessibility, the Company cannot accept liability for any virus introduced by this e-mail or any attachment and you are advised to use up-to-date virus checking software.

kagharpure commented 12 years ago

Hi Antonio,

Thanks for all your help. Sorry I couldn't respond over the past two weeks, was on vacation (a 6,000 km pilgrimage to watch formula 1). Yes, please close this - I still have many doubts but I do not believe that this is the forum to get them clarified.

Again, thanks for all your help :)

Thanks,

Krish.

Sagely, he observed: one man’s playstation is another man bakery.

-----Original Message----- From: Antonio Piccolboni [mailto:reply@reply.github.com] Sent: Wednesday, November 02, 2011 1:37 AM To: Krishnaraj Ajay Gharpure Subject: Re: [RHadoop] Hadoop streaming issue (apparently) with mapreduce (#21)

Looks like we can close this one.

Reply to this email directly or view it on GitHub: https://github.com/RevolutionAnalytics/RHadoop/issues/21#issuecomment-2595194

This email message may contain proprietary, private and confidential information. The information transmitted is intended only for the person(s) or entities to which it is addressed. Any review, retransmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited and may be illegal. If you received this in error, please contact the sender and delete the message from your system.

Mu Sigma takes all reasonable steps to ensure that its electronic communications are free from viruses. However, given Internet accessibility, the Company cannot accept liability for any virus introduced by this e-mail or any attachment and you are advised to use up-to-date virus checking software.