RevolutionAnalytics / RHadoop

RHadoop
https://github.com/RevolutionAnalytics/RHadoop/wiki
763 stars 278 forks source link

Complete support for structured data #102

Closed piccolbo closed 12 years ago

piccolbo commented 12 years ago

To be developed here how to continue developing the structured data features to achieve an "interrupted structured path" from input to output for convenience and speed using data frames

piccolbo commented 12 years ago

A lot of stuff went into 1.3 that I wonder what is left to do. But we want to wrap this up in 1.4 and maybe start looking in some efficiency issues (fewer copies etc)

piccolbo commented 12 years ago

Guiding examples (which are like use cases, but much simpler)

from.dfs(mapreduce(to.dfs(some.data.frame), map = to.map(identity), structured = TRUE), structured = TRUE)

returns something all.equal to some.data.frame and

from.dfs(mapreduce(to.dfs(some.data.frame), map = to.map(identity), reduce = to.reduce(identity), structured = TRUE), structured = TRUE)

does the same,

ryangarner commented 12 years ago

I really love this feature! Having the ability to use the apply function in the map and reduce step is really powerful.

piccolbo commented 12 years ago

Could you sketch some code (doesn't have to run)? Just put it in between triple backticks for proper formatting , like ``` but on a new line. Thanks.

ryangarner commented 12 years ago

I computed the mean of a field using two running sums.

library(rmr)
fields <- c('Year', 'Month', 'DayofMonth', 'DayOfWeek', 'DepTime', 
            'CRSDepTime', 'ArrTime', 'CRSArrTime', 'UniqueCarrier',
            'FlightNum', 'TailNum', 'ActualElapsedTime', 'CRSElapsedTime',
            'AirTime', 'ArrDelay', 'DepDelay', 'Origin', 'Dest', 'Distance',
            'TaxiIn', 'TaxiOut', 'Cancelled', 'CancellationCode',
            'Diverted', 'CarrierDelay', 'WeatherDelay', 'NASDelay',
            'SecurityDelay', 'LateAircraftDelay') 
field <- "ArrDelay"
input <- "/user/airline/sample.csv"
mapper <- function(key, value) {
  running.sum <- sum(value[,field], na.rm = TRUE)
  num.records <- sum(!is.na(value[,field]))
  value <- c(running.sum, num.records)
  keyval(1, value)
}
reducer <- function(key, value) {
  value <- apply(value, 2, sum)
  keyval(key, value)
}
out <- mapreduce(input = input,
                 input.format = make.input.format(mode = "text", format = "csv", sep = ",", col.names = fields),
                 map = mapper,
                 reduce = reducer,
                 combine = TRUE,
                 vectorized = TRUE,
                 structured = TRUE)
airline <- from.dfs(out, structured = TRUE) 
airline <- data.frame(airline$val)
names(airline) <- c("running.sum", "num.records")              
mean <- sum(airline$running.sum) / sum(airline$num.records)

It took about 35 seconds to compute the mean for arrival delay with 10 million records.

piccolbo commented 12 years ago

Thanks a lot, good stuff. Are you saying that it just works the way it is? Nothing to improve? My take here is:

  1. Having to list the fields by hand is cumbersome and will hardly generalize to another data set. Not your fault, this is covered in #72.
  2. Can we guess the right thing for vectorized and structured? You may have noticed that even if you turn off structured when reading from csv the map function still gets a data frame. Is it possible to generalize this behavior? Are there other cases where the right thing is clear from the input? I have some ideas scribbled down but I need to clean them up a bit.
  3. Can we preserve col names, other attributes that get lost when processing data frames? I think that's totally doable, we just have to avoid some redundant data conversions that are there more for historical reasons than anything else.
ryangarner commented 12 years ago
  1. Having to list the fields by hand is cumbersome and will hardly generalize to another data set. Not your fault, this is covered in #72.
    • I'm not too bothered by this as the data I work with doesn't have a header to begin with. Thus, I have to create the global variable fields to pass to the input.format function. Since format = "csv" uses read.table, I just pass the argument col.names = fields.
  2. Can we guess the right thing for vectorized and structured? You may have noticed that even if you turn off structured when reading from csv the map function still gets a data frame. Is it possible to generalize this behavior? Are there other cases where the right thing is clear from the input? I have some ideas scribbled down but I need to clean them up a bit.
    • With vectorized = FALSE, the map function gets data frames with 1 record instead of 1,000 records. If stuctured = FALSE, the map function still gets data frames as the input.format function is passing data frames with 1 record for vectorized = FALSE and 1,000 records with vectorized = TRUE. Now on the other hand, stuctured = TRUE is important for the reduce function if you plan to manipulate a data frame with apply as I do in the example above. The reason for this is that the map function is sending lists of data frames to the reduce function. Thus, the behavior of stuctured = TRUE or stuctured = list(reduce = TRUE) is similar to do.call("rbind", list.of.data.frames). Since I'm working with structured data to begin with and using the vectorized feature, I think stuctured = TRUE should be the default in general.
  3. Can we preserve col names, other attributes that get lost when processing data frames? I think that's totally doable, we just have to avoid some redundant data conversions that are there more for historical reasons than anything else.
    • The column names do stay preserved for me. In this example below, I compute running sums on DayOfWeek, ArrDelay, and, DepDelay.
library(rmr)
fields <- c('Year', 'Month', 'DayofMonth', 'DayOfWeek', 'DepTime', 
            'CRSDepTime', 'ArrTime', 'CRSArrTime', 'UniqueCarrier',
            'FlightNum', 'TailNum', 'ActualElapsedTime', 'CRSElapsedTime',
            'AirTime', 'ArrDelay', 'DepDelay', 'Origin', 'Dest', 'Distance',
            'TaxiIn', 'TaxiOut', 'Cancelled', 'CancellationCode',
            'Diverted', 'CarrierDelay', 'WeatherDelay', 'NASDelay',
            'SecurityDelay', 'LateAircraftDelay')
mapper <- function(key, value) {
  df <- subset(value, select = c("DayOfWeek", "ArrDelay", "DepDelay"))
  df <- as.data.frame(apply(df, 2, as.numeric))
  df <- apply(df, 2, sum, na.rm = TRUE)
  keyval(1, df)
}
reducer <- function(key, value) {
  df <- apply(value, 2, sum)
  keyval(key, df)
}
out <- mapreduce(input = input,
                 input.format = make.input.format(mode = "text", format = "csv", sep = ",", col.names = fields),
                 map = mapper,
                 reduce = reducer,
                 combine = TRUE,
                 vectorized = TRUE,
                 structured = TRUE)
airline <- from.dfs(out, structured = TRUE)$val
airline
  DayOfWeek ArrDelay DepDelay
1  39496487 75506379 73400204
piccolbo commented 12 years ago

On 1 and 3 you are right, for 3 I had a different scenario in mind, when you returned a vectorized keyval keyval(df1, df2, vectorized = T) in a map function it is serialized with typedbytes and some of the metadata is lost. That was a compromise with speed in the first place so it will take some work to recover more usability. As far as making vectorized and structured becoming the default, it seems like you are far from the only RHadoop user who has to deal mostly with structured data. On the other hand, the unstructured case is more general (not very convenient maybe but it won't break) and it's always hard to reverse the defaults. Will think hard about it.

piccolbo commented 12 years ago

Today I put in a number of small improvements

  1. to.dfs does a better job of converting matrices, data frames and atomic vector to keyvalue pairs. It chunks them in larger pieces than before and uses split to do so, which keep all the various metadata in place
  2. from.dfs tries hard to put pieces back together as they were when you set "structured" to T
  3. the 'structured' option is no longer a fancy name for ' make everything a data.frame'. It will keep matrices matrices and atomic vectors atomic vectors and it should work that way a bit everywhere that this option is available. Not passing all the tests right now so some debugging is in order