RevolutionAnalytics / RHadoop

RHadoop
https://github.com/RevolutionAnalytics/RHadoop/wiki
763 stars 278 forks source link

No of map task launched is less than the available map slot #110

Closed souvikbanerjee closed 12 years ago

souvikbanerjee commented 12 years ago

Hi,

I am facing a problem with the no of map tasks being launched from R is less than than the available map slots. In fact it is always 2. Here is the brief. I have a small cluster of 2 nodes. I node being master (All the daemons running, namenode, datanode, tasktracker, jobtracker) and another slave node (running datanode and tasktracker). Each of the machine has 2 map slots each. (Configured by setting mapred.tasktracker.map.tasks.maximum = 2). Now when I launch a test job from using Hadoop command /usr/local/hadoop-1.0.2/bin/hadoop jar /usr/local/hadoop-1.0.2/hadoop-examples-1.0.2.jar pi 4 10 it is launching 4 map tasks simultaneously and all the slots are being used.

Just a note that I have installed R in both the machine and rmr package has been installed in both the machine. Now I launched R in the master node. I just tested the following simple code. library("rmr") small.ints = to.dfs(1:10) out = mapreduce(input = small.ints, map = function(k,v) keyval(v, v^2))

it is only launching two map tasks. Note is that the map tasks are being assigned to both master and slave, by this I mean one task is assigned to master and another task is assigned to slave.

I tried with some complex script as well. I wrote one custom function to calculate distance of two vectors. For that I had to compare some 153 vectors each with 7 other vectors. It launches too many tasks (Fine with me, it submits 153 streaming tasks to the cluster) but each task only launches two map tasks.

If required I can paste the code over here. Please advise.

Thanks.

piccolbo commented 12 years ago

How many map tasks would you like to see with an input of 10 records? Input size is an important factor in how hadoop operates.

souvikbanerjee commented 12 years ago

Hi,

Thanks a lot for your quick reply.

For 10 records 2 map task is fine. But for the real example I am working on for I need to launch one map task for each operation. Because the amount of operation it huge. It takes more than 15 minutes to compute at times for each operation. I had to set the timeout to 30 minutes otherwise my tasks was getting killed. Apart from calculation on vector comparison I had launched another job which works on input of 150 records. Records are just file name. Files are processed and matrix is formed. I have calculation on top of that matrix. So here the size of the input is very less (Cause each record only contains the file name, not the actual file content) but calculation is huge and if I can use all the available map slots I can really make it fast. If it scales I will add more nodes to complete the calculation faster.

Thanks once again.

piccolbo commented 12 years ago

That's an incorrect use of Hadoop. Job input has to be the bulk data, not filenames. This is not fixable, it is fundamental to how Hadoop works. A timeout of 30 minutes should have sounded the alert. Launch one job per file. Or give me an idea of what your input and desired outputs are and I'll think about it.

Antonio

On Fri, Jun 22, 2012 at 2:37 PM, souvikbanerjee < reply@reply.github.com

wrote:

Hi,

Thanks a lot for your quick reply.

For 10 records 2 map task is fine. But for the real example I am working on for I need to launch one map task for each operation. Because the amount of operation it huge. It takes more than 15 minutes to compute at times for each operation. I had to set the timeout to 30 minutes otherwise my tasks was getting killed. Apart from calculation on vector comparison I had launched another job which works on input of 150 records. Records are just file name. Files are processed and matrix is formed. I have calculation on top of that matrix. So here the size of the input is very less (Cause each record only contains the file name, not the actual file content) but calculation is huge and if I can use all the available map slots I can really make it fast. If it scales I will add more nodes to complete the calculation faster.

Thanks once again.


Reply to this email directly or view it on GitHub:

https://github.com/RevolutionAnalytics/RHadoop/issues/110#issuecomment-6518888

souvikbanerjee commented 12 years ago

Hi,

I completely agree with you. What I am looking for is to use Hadoop for parallel computation. I am using R for more of computation than handing more data. Had it been only huge data and small computation I could have opted for writing an application in Java. For my case input is only file name. A list of files. (May be image file, may be sound file) File sizes are few KB (~ 100 KB) I want to read those files then do some computation heavy operations. Say I want to run fft on all the channels of the sound file. I haven't kept those files in HDFS.

I have a vector in which I have kept the filenames. an example.

filenamelist.csv contains list of 150 files. format is file_name, file_path.

somedata <- as.matrix(read.table("filenamelist.csv", header=TRUE, sep=",")); i <- 0; somefile <- as.vector(length(somedata[, 1])); for ( i in 1:length(basedata[, 1] ) ) { somefile[i] <- paste(somedata[i, 2], somedata[i, 1], sep = "/") # composing the absolute path of that file } input = to.dfs(somefile);

Now actually the files are not in HDFS. The vector containing the filenames are put into HDFS by to.dfs(somefile)

I want my mapreduce to work on this input. The computation will happen on each file and is taking in the tune of 30 sec for each file.

So in ideal case if I have 4 map slots available in the cluster I would like to use all of them to process 4 files in each map wave. So if 150 map tasks can be launched then I can increase the node in the cluster and compute quickly.

But what is happening is that only 2 map tasks are getting launched. Each of the map task has to handle around 75 files. So for that map task it is taking more than 30 minutes and that is why I had to increase the mapred.task.timeout to more than 30 minutes.

Not only that I have other 2 map slots sitting idle. (One in each machine).

Hope I have clearly been able to describe the problem.

I know that Hadoop works based on the input size. I have posted this to get some idea how I can solve this problem. The work around way to do it. There is nothing to be fixed for this other than the way I am thinking or writing the code.

If you can share some thoughts that will be great. Just asking whether placing the files in the HDFS and lowering the block size of the HDFS help? But in that case also I have a question. In the map task I read the sound file using loadSample("/root/sample.wav") Can loadSample (from sound package) read from HDFS? How do I specify the path of HDFS to loadSample or similar functions to read image files.

Thanks once again.

souvikbanerjee commented 12 years ago

Just wanted to post that what you wrote in the earlier post (launch one job per file) is what exactly I am looking for. If you can help me achieving that I'll be grateful to you.

Thanks.

piccolbo commented 12 years ago

I would suggest reading the paper and the code about the record setting computation of pi (the code is in the distribution). That is one serious Hadoop computation with a single number as input and should give you some ideas. Your input is so small that how you read it is probably of no consequence on performance other than Hadoop thinks it has to deal with very little input (the names only). The setting you mentioned is the right one but it is a tasktracker setting so it can't be configured by the client. Hadoop is under no obligation to use all the slots. Besides that you may have better luck asking in Hadoop circles just because there are more pairs of eyes and the range of application people have tackled is bigger than for rmr only.

Antonio

On Fri, Jun 22, 2012 at 3:18 PM, souvikbanerjee < reply@reply.github.com

wrote:

Hi,

I completely agree with you. What I am looking for is to use Hadoop for parallel computation. I am using R for more of computation than handing more data. Had it been only huge data and small computation I could have opted for writing an application in Java. For my case input is only file name. A list of files. (May be image file, may be sound file) File sizes are few KB (~ 100 KB) I want to read those files then do some computation heavy operations. Say I want to run fft on all the channels of the sound file. I haven't kept those files in HDFS.

I have a vector in which I have kept the filenames. an example.

filenamelist.csv contains list of 150 files. format is file_name, file_path.

somedata <- as.matrix(read.table("filenamelist.csv", header=TRUE, sep=",")); i <- 0; somefile <- as.vector(length(somedata[, 1])); for ( i in 1:length(basedata[, 1] ) ) { somefile[i] <- paste(somedata[i, 2], somedata[i, 1], sep = "/") # composing the absolute path of that file } input = to.dfs(somefile);

Now actually the files are not in HDFS. The vector containing the filenames are put into HDFS by to.dfs(somefile)

I want my mapreduce to work on this input. The computation will happen on each file and is taking in the tune of 30 sec for each file.

So in ideal case if I have 4 map slots available in the cluster I would like to use all of them to process 4 files in each map wave. So if 150 map tasks can be launched then I can increase the node in the cluster and compute quickly.

But what is happening is that only 2 map tasks are getting launched. Each of the map task has to handle around 75 files. So for that map task it is taking more than 30 minutes and that is why I had to increase the mapred.task.timeout to more than 30 minutes.

Not only that I have other 2 map slots sitting idle. (One in each machine).

Hope I have clearly been able to describe the problem.

I know that Hadoop works based on the input size. I have posted this to get some idea how I can solve this problem. The work around way to do it. There is nothing to be fixed for this other than the way I am thinking or writing the code.

If you can share some thoughts that will be great. Just asking whether placing the files in the HDFS and lowering the block size of the HDFS help? But in that case also I have a question. In the map task I read the sound file using loadSample("/root/sample.wav") Can loadSample (from sound package) read from HDFS? How do I specify the path of HDFS to loadSample or similar functions to read image files.

Thanks once again.


Reply to this email directly or view it on GitHub:

https://github.com/RevolutionAnalytics/RHadoop/issues/110#issuecomment-6519616

piccolbo commented 12 years ago

for (fname in filenames) {mapreduce(input = fname, ....)}

But you knew that already. The problem seems to be that you have only a sequential algorithm for each file, so you would like to start the 150 jobs in parallel. We don't have features for that in rmr at this time.

Antonio

On Fri, Jun 22, 2012 at 3:32 PM, souvikbanerjee < reply@reply.github.com

wrote:

Just wanted to post that what you wrote in the earlier post (launch one job per file) is what exactly I am looking for. If you can help me achieving that I'll be grateful to you.

Thanks.


Reply to this email directly or view it on GitHub:

https://github.com/RevolutionAnalytics/RHadoop/issues/110#issuecomment-6519829

souvikbanerjee commented 12 years ago

Sorry to bother you again. For an pi estimator when I am running /usr/local/hadoop-1.0.2/bin/hadoop jar /usr/local/hadoop-1.0.2/hadoop-examples-1.0.2.jar pi 4 10

it is launching 4 map tasks simultaneously and all the slots are being used. But here also the input file size is not huge right? It is using Hadoop for parallel computation.

But it has got an way to specify the no of map tasks. While writing this comment I just received our comments which points to pi calculation as well.

So is there any example from RHadoop (rmr) for pi calculation?

piccolbo commented 12 years ago

On Fri, Jun 22, 2012 at 3:42 PM, souvikbanerjee < reply@reply.github.com

wrote:

Sorry to bother you again. For an pi estimator when I am running /usr/local/hadoop-1.0.2/bin/hadoop jar /usr/local/hadoop-1.0.2/hadoop-examples-1.0.2.jar pi 4 10

it is launching 4 map tasks simultaneously and all the slots are being used. But here also the input file size is not huge right? It is using Hadoop for parallel computation.

But it has got an way to specify the no of map tasks. While writing this comment I just received our comments which points to pi calculation as well.

So is there any example from RHadoop (rmr) for pi calculation?

No, but there is in Java. Would love to see how you solve that with RHadoop.

Antonio


Reply to this email directly or view it on GitHub:

https://github.com/RevolutionAnalytics/RHadoop/issues/110#issuecomment-6519992

souvikbanerjee commented 12 years ago

Okay. Let me dig into this. But as from java it is possible I think that also can be done from R as well. I'll have to try it out. Meanwhile if you come across any useful hint or upgrade in this regard, just do post me.

Thanks a lot for your time and effort.

piccolbo commented 12 years ago

On Fri, Jun 22, 2012 at 3:51 PM, souvikbanerjee < reply@reply.github.com

wrote:

Okay. Let me dig into this. But as from java it is possible I think that also can be done from R as well.

Well there are some features that are missing in streaming (e.g. multiple output) and some that can not be accessed from rmr (custom partitioners). But I think you have a fighting chance and I know other users who have a very similar problem, I will try to put you in touch.

I'll have to try it out.

Meanwhile if you come across any useful hint or upgrade in this regard, just do post me.

I sure will.

A

Thanks a lot for your time and effort.


Reply to this email directly or view it on GitHub:

https://github.com/RevolutionAnalytics/RHadoop/issues/110#issuecomment-6520096

souvikbanerjee commented 12 years ago

I know other users who have a very similar problem, I will try to put you in touch.

That would be really great. Hope to hear from you soon.

Thanks.

piccolbo commented 12 years ago

Some ideas from Saar, with his permission:

If I understand correctly, the problem is that the data, as is captured by file names, is not 'big enough' to make hadoop assign enough map jobs, but if the data was indeed in hdfs hadoop would assign more mappers. I'll skip options that i don't really understand (like the backend.parameters list with "mapred.map.tasks=4"), and get straight to the rmr-centered ideas:

  1. If the case is "not-so-many-small-files" (not sure if the 150 files number is real or not), maybe to.dfs can work as a way to make sure the "right" amount of data is available in hdfs. Something like:

listOfFileNames <- as.list(c("file1.csv", "file2.csv"))

dataKeyValList <- lapply(X=listOfFileNames, FUN = function(x) { key <- x value <- read.table(x, header=TRUE, sep=",")

return(list(key, value)) })

myData <- to.dfs(dataKeyValList)

There may be enough data there to entice hadoop to launch more mappers...

  1. If there are lots and lots of files, too many for the whole data to be shipped into hdfs using to.dfs in this way, it's a bit harder, but doable if the files are row-centric (for example, this can be done on csv files) as a map-reduce job, as follows:
  2. copy all data files to one folder on hdfs.
  3. the map-reduce job will take the collection of files (not file names) as an input.
  4. the mapper will work on one line of input at a time. After parsing a single row if data, this row will become a value. The key will be the file name, accessed using the following; inputFileName <- basename(Sys.getenv("map_input_file"))
  5. The reducer will get a file name, a list of rows of data, and will have to (a) bind the rows together to get all the dataset in a single file, and (b) do the heavy computation.
  6. This will potentially mess up the order of rows in the incoming data. Overcoming that will probably be easier by making sure the original data has row numbers, and then just using a simple 'sort' within the reducer.
  7. This solution relies on our ability to determine the number of reducers. If we can determine the number of reducers, there is probably an even easier way to achieve this...
  8. (too simple to be true, and depends on our ability to determine the number of reducer jobs, even though we can't just determine the number of mapping jobs)
  9. Use to.dfs to send a list of file names into hdfs, and make this list the input of a map-reduce job.
  10. have the mapper read the file. make the file name key, and the content of the file the value.
  11. have the reducer do the heavy lifting, after you made sure that you have 'enough' reducers set up correctly.

Hope this all makes sense. Let me know if you have questions, or if any of these seems more promising than others.

souvikbanerjee commented 12 years ago

Thanks to all of you. I'll go through in details and try to evaluate all of them and share the result with you. Allow me couple of days time, I'll surely get back to you with my observation.

If the case is "not-so-many-small-files" (not sure if the 150 files number is real or not)

No it's not real number. For a test cluster of just 2 small machines I opted to test with this number of files. The idea is to scale to use all 4 slots available with just 150 files, because processing of these 150 files are taking hours.

  • copy all data files to one folder on hdfs. - the map-reduce job will take the collection of files (not file names) as an input.

Say I have copied all the files to a directory in HDFS. May be lets assume that using -copyFromLocal I have uploaded the files to a directory called /tmp/testfiles. So can I use the folder name of the HDFS as input to the mapreduce function?

May be can I try this mapreduce(input = "/tmp/testfiles", map = function(k,v) {...})

Will this work?

I tried one more example. Here it goes. I have created two set of vectors. First set contains 150 vectors of length 10 each. The second set contains 10 vectors of length 10 each.

Now I wrote a rmr code to do some calculation for 150 X 10 vectors. Vectors are put into HDFS. (using to,dfs)

So I have a for loop to iterate over the larger set and then call mapreduce function to operate on smaller set (10 vectors)

There I am observing 150 streaming jobs being submitted each having 2 map tasks. If I do the calculation without map reduce I can obviously complete the job in much smaller time as this will eliminate the overhead to launch 150 streaming jobs. May be reversing the loop I'll test whether it launches more map task. That will give a certain heads up.

If you any view on this please share.

Just allow me couple of days time and I'll share more with you. Thanks once again.