cloudera / poisson_sampling

10 stars 13 forks source link

Only 2 mappers are working on data in parallel (default submitted is 3) and specifying larger number of reducer via mapred.reduce.tasks=10 causes only 2 reduce tasks to work on data in parallel #6

Closed andrewmilkowski closed 10 years ago

andrewmilkowski commented 10 years ago

complete source code for fitRandomForest.R (line changed relevant to the ticket is addition of D="mapred.reduce.tasks=10")

also attached screen shots

Also, I have noticed that despite the fact the 3 mappers were created, 3rd mapper began working on the input data ONLY after 2 mappers completed their process ![screen shot 2013-10-01 at 9 11 43 am]

#!/usr/bin/env Rscript

# Fits a Random Forest to the Bulldozer data via Hadoop

library(rmr2)
library(randomForest)

# PARAM
# A great advantage of RHadoop is that the R environment I'm defining here will be
# packaged and distributed to each mapper/reducer, so there is no need to mess with
# Hadoop configuration variables or distributed cache
frac.per.model <- 0.1
num.models <- 50

# here we manually set the schema for the input data
# printed with dput(names(training.data))
column.names <- c("MachineID", "SalePrice", "ModelID.x", "datasource", "auctioneerID", 
                  "YearMade", "MachineHoursCurrentMeter", "UsageBand", "saledate", 
                  "fiModelDesc.x", "fiBaseModel.x", "fiSecondaryDesc.x", "fiModelSeries.x", 
                  "fiModelDescriptor.x", "ProductSize", "fiProductClassDesc.x", 
                  "state", "ProductGroup.x", "ProductGroupDesc.x", "Drive_System", 
                  "Enclosure", "Forks", "Pad_Type", "Ride_Control", "Stick", "Transmission", 
                  "Turbocharged", "Blade_Extension", "Blade_Width", "Enclosure_Type", 
                  "Engine_Horsepower", "Hydraulics", "Pushblock", "Ripper", "Scarifier", 
                  "Tip_Control", "Tire_Size", "Coupler", "Coupler_System", "Grouser_Tracks", 
                  "Hydraulics_Flow", "Track_Type", "Undercarriage_Pad_Width", "Stick_Length", 
                  "Thumb", "Pattern_Changer", "Grouser_Type", "Backhoe_Mounting", 
                  "Blade_Type", "Travel_Controls", "Differential_Type", "Steering_Controls", 
                  "saledatenumeric", "ageAtSale", "saleYear", "saleMonth", "saleDay", 
                  "saleWeekday", "MedianModelPrice", "ModelCount", "ModelID.y", 
                  "fiModelDesc.y", "fiBaseModel.y", "fiSecondaryDesc.y", "fiModelSeries.y", 
                  "fiModelDescriptor.y", "fiProductClassDesc.y", "ProductGroup.y", 
                  "ProductGroupDesc.y", "MfgYear", "fiManufacturerID", "fiManufacturerDesc", 
                  "PrimarySizeBasis", "PrimaryLower", "PrimaryUpper")

# here we pick the actual variables to use for building the model
# note that randomForest doesn't like missing data, so we'll just
# nix some of those variables
# TODO
model.formula <- SalePrice ~ datasource + auctioneerID + YearMade + saledatenumeric + ProductSize +
                               ProductGroupDesc.x + Enclosure + Hydraulics + ageAtSale + saleYear +
                               saleMonth + saleDay + saleWeekday + MedianModelPrice + ModelCount +
                               MfgYear
# target <- "SalePrice"
# predictors <- c("datasource", )

# here's an input format tailored for the task
bulldozer.input.format = 
    make.input.format(
        "csv",
        sep=",",
        quote="\"",
        row.names=NULL,
        col.names=column.names,
        fill=TRUE,
        na.strings=c("NA"),
        colClasses=c(MachineID="NULL",
                                 SalePrice="numeric",
                                 YearMade="numeric",
                                 MachineHoursCurrentMeter="numeric",
                                 ageAtSale="numeric",
                                 saleYear="numeric",
                                 ModelCount="numeric",
                                 MfgYear="numeric",
                                 ModelID.x="factor",
                                 ModelID.y="factor",
                                 fiManufacturerID="factor",
                                 datasource="factor",
                                 auctioneerID="factor",
                                 saledatenumeric="numeric",
                                 saleDay="factor",
                                 Stick_Length="numeric"))

# MAP function
poisson.subsample <- function(k, input) {
  # this function is used to generate a sample from the current block of data
  generate.sample <- function(i) {
    # generate N Poisson variables
    draws <- rpois(n=nrow(input), lambda=frac.per.model)
    # compute the index vector for the corresponding rows,
    # weighted by the number of Poisson draws
    indices <- rep((1:nrow(input)), draws)
    # emit the rows; RHadoop takes care of replicating the key appropriately
    # and rbinding the data frames from different mappers together for the
    # reducer
    keyval(i, input[indices, ])
  }

  # here is where we generate the actual sampled data
  c.keyval(lapply(1:num.models, generate.sample))

}

# REDUCE function
fit.trees <- function(k, v) {
  # rmr rbinds the emited values, so v is a dataframe
  # note that do.trace=T is used to produce output to stderr to keep
  # the reduce task from timing out
  rf <- randomForest(formula=model.formula, data=v, na.action=na.roughfix, ntree=10, do.trace=FALSE)
  # rf is a list so wrap it in another list to ensure that only
  # one object gets emitted. this is because keyval is vectorized
  keyval(k, list(forest=rf))
}

# backend.parameters=list(hadoop=list(D='mapred.reduce.tasks=10')),

mapreduce(input="/poisson/training.csv",
               backend.parameters=list(hadoop=list(D="mapred.task.timeout=3600000",                                  D="mapred.job.name=fitRandomForest",
                                D="mapred.reduce.tasks=10"
                                ),local=list()),
               input.format=bulldozer.input.format,
               map=poisson.subsample,
               reduce=fit.trees,
               output="/poisson/output")

raw.forests <- values(from.dfs("/poisson/output"))
forest <- do.call(combine, raw.forests)

-- MAPPER --

mapper (3 per default specified) only 2 are processing data initially, later 3rd mapper starts processing data (but not all 3 in parallel)

screen shot 2013-10-01 at 9 21 17 am

--- REDUCER ---

reducers (10 specified, only 2 are processing data)

screen shot 2013-10-01 at 9 11 43 am

also result from top shows 2 R processes (one per reducer) mapper task breakdown is similar while 3 mappers are active only 2 R Processes are running (3rd one is sitting idle)

PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
31373 mapred 20 0 667m 480m 4284 R 98.0 12.6 7:09.47 R
31370 mapred 20 0 670m 483m 4284 R 93.4 12.6 7:07.21 R

andrewmilkowski commented 10 years ago

please close this ticket... fundamental user error

specifying in

mapred-site.xml


<property>
    <name>mapred.tasktracker.map.tasks.maximum</name>
    <value>4</value>
    <description>The maximum number of map tasks that will be run simultaneously by a task tracker.</description>
  </property>

  <property>
    <name>mapred.tasktracker.reduce.tasks.maximum</name>
    <value>10</value>
    <description>The maximum number of reduce tasks that will be run simultaneously by a task tracker.</description>
  </property>

creates (up to) 10 reducers and in the sample of my run I specified 3 reducers (hence 3 R processes), although for a test run on a singular machine this is just not appropriate. (not with the size of the input data in this project)

Swap: 3260408k total, 628824k used, 2631584k free, 226596k cached

PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
12662 mapred 20 0 667m 480m 4284 R 93.8 12.6 2:59.71 R
12702 mapred 20 0 662m 475m 4284 R 49.2 12.4 2:09.04 R
12694 mapred 20 0 819m 632m 4392 R 48.9 16.5 2:04.52 R

laserson commented 10 years ago

Glad you found the error. I'll close...

andrewmilkowski commented 10 years ago

no problem @laserson , now after all these tests I need to get new Mac motherboard (joking)

so the only thing that remains is the small input data in rmr2 (https://github.com/RevolutionAnalytics/rmr2/issues/69)

if you have an idea also please do let know... this is towards general stability of the solution (graceful run)

in addition I am looking to swap standard R random forest implementation with random jungler (this might shave some cycles, although Revolution R is already precompiled with Intel MKL tight...

https://r-forge.r-project.org/projects/rjungler/