delta-rho / RHIPE

R and Hadoop Integrated Programming Environment
58 stars 28 forks source link

reduce issue when incrementally building lists of reduce.values #34

Closed hafen closed 8 years ago

hafen commented 8 years ago

@saptarshiguha, could you take a look at this and see if you can replicate?

The following does not work correctly:

# write key: i, value: i for i = 1, ..., 10000
rhwrite(lapply(1:10000, function(x) list(x, x)), file = "/tmp/rhtest")

# map that collects "1" as key and a single-row data-frame
map <- expression({
  for(i in seq_along(map.values)) {
    rhcollect("1", data.frame(key = map.keys[[i]], val = map.values[[i]], stringsAsFactors = FALSE))
  }
})

reduce <- expression(
  pre = {
    adata <- list()
  },
  reduce = {
    adata[[length(adata) + 1]] <- reduce.values
  },
  post = {
    adata <- do.call(rbind, unlist(adata, recursive = FALSE))
    rhcollect(reduce.key, adata)
  }
)

b <- rhwatch(map = map, reduce = reduce, input = "/tmp/rhtest", output = "/tmp/rhtestout")

b <- rhread("/tmp/rhtestout")

length(unique(b[[1]][[2]]$val))
#6000 (should be 10000)

However, if you change the reduce to the following:

reduce <- expression(
  pre = {
    adata <- list()
  },
  reduce = {
    adata <- c(adata, reduce.values)
  },
  post = {
    adata <- do.call(rbind, adata)
    rhcollect(reduce.key, adata)
  }
)

You get the correct result. This is really strange. Things like this used to work, and a lot of your template functions in rhoptions() do this type of thing. I don't know if it's something in the internals of R that has changed or what. Here's my session info:

> sessionInfo()
R version 3.2.2 (2015-08-14)
Platform: x86_64-apple-darwin13.4.0 (64-bit)
Running under: OS X 10.11.1 (El Capitan)

locale:
[1] en_US.UTF-8/en_US.UTF-8/en_US.UTF-8/C/en_US.UTF-8/en_US.UTF-8

attached base packages:
[1] stats     graphics  grDevices utils     datasets  methods   base     

other attached packages:
[1] Rhipe_0.75.1.6   rJava_0.9-7      codetools_0.2-14 colorout_1.1-0  

loaded via a namespace (and not attached):
[1] data.table_1.9.6 chron_2.3-47  
hafen commented 8 years ago

To add to the mystery, talking to @jrounds, we thought maybe it had to do with the rbind(unlist(..., recursive = TRUE)). But check out this example:

# write key: i, value: i for i = 1, ..., 10000
rhwrite(lapply(1:10000, function(x) list(x, x)), file = "/tmp/rhtest")

# map that collects "1" as key and a single-row data-frame
map <- expression({
  for(i in seq_along(map.values)) {
    rhcollect("1", data.frame(key = map.keys[[i]], val = map.values[[i]], stringsAsFactors = FALSE))
  }
})

reduce <- expression(
  pre = {
    adata <- list()
  },
  reduce = {
    adata[[length(adata) + 1]] <- reduce.values
  },
  post = {
    rhcollect(reduce.key, adata)
  }
)

b <- rhwatch(map = map, reduce = reduce, input = "/tmp/rhtest", output = "/tmp/rhtestout")
b <- rhread("/tmp/rhtestout")

head(b[[1]][[2]][[1]])
# [[1]]
#    key  val
# 1 4000 4000
# [[2]]
#    key  val
# 1 3999 3999
# [[3]]
#    key  val
# 1 3998 3998

head(b[[1]][[2]][[2]])
# [[1]]
#    key  val
# 1 4000 4000
# [[2]]
#    key  val
# 1 3999 3999
# [[3]]
#    key  val
# 1 3998 3998

length(b[[1]][[2]][[1]])
# 6000

length(b[[1]][[2]][[2]])
# 4000

In this case, it looks like the first pass through reduce$reduce, adata[[1]] is set by reduce.values which has length 6000. But in the second pass, reduce.values has length 4000 and it looks like both adata[[1]][1:4000] and adata[[2]] are set to this reduce.values.

To check this, see:

digest::digest(b[[1]][[2]][[1]])
# "32b71ff2deb98db18ecb67b7b2935dd7"
digest::digest(b[[1]][[2]][[2]])
# "e1597289afb05fdd6a47a3493f20b50e"
digest::digest(b[[1]][[2]][[1]][1:4000])
# "e1597289afb05fdd6a47a3493f20b50e"

This verifies that adata[[1]][1:4000] is the same as adata[[2]]. Perhaps this has to do with R only making copies when it thinks it absolutely needs to, such that the assignment of adata[[1]] is a pointer to reduce.values instead of a copy and it is updated in the second pass. Perhaps there's some logic that has changed in the R internals that needs to be reflected in RHIPE's C code. Or maybe I'm way off :).

hafen commented 8 years ago

Here's a hacky solution if you want to write reduces this way. You can force a copy of reduce.values by appending NULL to it:

reduce <- expression(
  pre = {
    adata <- vector("list", length = 2)
    idx <- 1
  },
  reduce = {
    adata[[idx]] <- c(reduce.values, NULL)
    idx <- idx + 1
  },
  post = {
    rhcollect(reduce.key, adata)
  }
)

If you use this, you will get the correct result. This is ugly though and doesn't get to the root of the problem, but I think it does verify it's a copying issue.

saptarshiguha commented 8 years ago

Sooo, Use a combiner.

b <- rhwatch(map = map, reduce = structure(reduce,combine=TRUE), input = " /tmp/rhtest", output = "/tmp/rhtestout")

This works. That said, I'm not sure why it doesn't work when a combiner is not used.

They should both work exactly the same.

On Fri, Dec 4, 2015 at 3:55 PM, hafen notifications@github.com wrote:

Here's a hacky solution if you want to write reduces this way. You can force a copy of reduce.values by appending NULL to it:

reduce <- expression( pre = { adata <- vector("list", length = 2) idx <- 1 }, reduce = { adata[[idx]] <- c(reduce.values, NULL) idx <- idx + 1 }, post = { rhcollect(reduce.key, adata) } )

If you use this, you will get the correct result. This is ugly though and doesn't get to the root of the problem, but I think it does verify it's a copying issue.

— Reply to this email directly or view it on GitHub https://github.com/tesseradata/RHIPE/issues/34#issuecomment-162112954.

saptarshiguha commented 8 years ago

Also, this is less of a RHIPE problem and something with R. For example consider

reduce <- expression( pre = { adata <- list() }, reduce = { adata[[length(adata) + 1]] <- reduce.values }, post = {

adata <- do.call(rbind, unlist(adata, recursive = FALSE))

rhcollect(reduce.key, adata)

} ) hdfs.setwd("/user/sguha/tmp/") b <- rhwatch(map = map, reduce = reduce, input = "rhtest", output ="rhtestout")

Now read it in, adata <- rhread("rhtestout")[[1]][[2]]

When a combiner is not used, then adata is what you have when your first ran the do.call(rbind.... bit in the MR job. So let's do that in the console

adata2 <- do.call(rbind, unlist(adata, recursive = FALSE))

you'll find the same issue:

head(adata2) key val 1 6001 6001 2 6002 6002 3 6003 6003 4 6004 6004 5 6005 6005 6 6006 6006 ...

On Fri, Dec 4, 2015 at 4:58 PM, Saptarshi Guha saptarshi.guha@gmail.com wrote:

Sooo, Use a combiner.

b <- rhwatch(map = map, reduce = structure(reduce,combine=TRUE), input = " /tmp/rhtest", output = "/tmp/rhtestout")

This works. That said, I'm not sure why it doesn't work when a combiner is not used.

They should both work exactly the same.

On Fri, Dec 4, 2015 at 3:55 PM, hafen notifications@github.com wrote:

Here's a hacky solution if you want to write reduces this way. You can force a copy of reduce.values by appending NULL to it:

reduce <- expression( pre = { adata <- vector("list", length = 2) idx <- 1 }, reduce = { adata[[idx]] <- c(reduce.values, NULL) idx <- idx + 1 }, post = { rhcollect(reduce.key, adata) } )

If you use this, you will get the correct result. This is ugly though and doesn't get to the root of the problem, but I think it does verify it's a copying issue.

— Reply to this email directly or view it on GitHub https://github.com/tesseradata/RHIPE/issues/34#issuecomment-162112954.

hafen commented 8 years ago

But in this case, the data written out by the mr job is bad, and the result has nothing to do with calling do.call(rbind, ...) in R:

> adata <- rhread("rhtestout")[[1]][[2]]
> head(adata[[2]], 2)
[[1]]
   key  val
1 4000 4000
[[2]]
   key  val
1 3999 3999

> head(adata[[1]], 2)
[[1]]
   key  val
1 4000 4000
[[2]]
   key  val
1 3999 3999

Which means something is going wrong in RHIPE, right?

saptarshiguha commented 8 years ago

Hmm, I'm getting something else

b = rhread("rhtestout")

lapply(b[[1]][[2]],length) [[1]] [1] 6000

[[2]] [1] 4000

On Fri, Dec 4, 2015 at 8:14 PM, hafen notifications@github.com wrote:

But in this case, the data written out by the mr job is bad, and the result has nothing to do with calling do.call(rbind, ...) in R:

adata <- rhread("rhtestout")[[1]][[2]] head(adata[[2]], 2) [[1]] key val 1 4000 4000 [[2]] key val 1 3999 3999

head(adata[[1]], 2) [[1]] key val 1 4000 4000 [[2]] key val 1 3999 3999

Which means something is going wrong in RHIPE, right?

— Reply to this email directly or view it on GitHub https://github.com/tesseradata/RHIPE/issues/34#issuecomment-162121827.

saptarshiguha commented 8 years ago

oops, I think i used the wrong validation. let me see more ...

On Thu, Dec 10, 2015 at 4:39 PM, Saptarshi Guha saptarshi.guha@gmail.com wrote:

Hmm, I'm getting something else

b = rhread("rhtestout")

lapply(b[[1]][[2]],length) [[1]] [1] 6000

[[2]] [1] 4000

On Fri, Dec 4, 2015 at 8:14 PM, hafen notifications@github.com wrote:

But in this case, the data written out by the mr job is bad, and the result has nothing to do with calling do.call(rbind, ...) in R:

adata <- rhread("rhtestout")[[1]][[2]] head(adata[[2]], 2) [[1]] key val 1 4000 4000 [[2]] key val 1 3999 3999

head(adata[[1]], 2) [[1]] key val 1 4000 4000 [[2]] key val 1 3999 3999

Which means something is going wrong in RHIPE, right?

— Reply to this email directly or view it on GitHub https://github.com/tesseradata/RHIPE/issues/34#issuecomment-162121827.

saptarshiguha commented 8 years ago

Yeah confirmed. No idea why this is so. It appears a copy is not being made (inside the C source).

By the way, your hacky version can be shortened

reduce = {

But it's still bandage . Will investigate soon

On Thu, Dec 10, 2015 at 4:40 PM, Saptarshi Guha saptarshi.guha@gmail.com wrote:

oops, I think i used the wrong validation. let me see more ...

On Thu, Dec 10, 2015 at 4:39 PM, Saptarshi Guha saptarshi.guha@gmail.com wrote:

Hmm, I'm getting something else

b = rhread("rhtestout")

lapply(b[[1]][[2]],length) [[1]] [1] 6000

[[2]] [1] 4000

On Fri, Dec 4, 2015 at 8:14 PM, hafen notifications@github.com wrote:

But in this case, the data written out by the mr job is bad, and the result has nothing to do with calling do.call(rbind, ...) in R:

adata <- rhread("rhtestout")[[1]][[2]] head(adata[[2]], 2) [[1]] key val 1 4000 4000 [[2]] key val 1 3999 3999

head(adata[[1]], 2) [[1]] key val 1 4000 4000 [[2]] key val 1 3999 3999

Which means something is going wrong in RHIPE, right?

— Reply to this email directly or view it on GitHub https://github.com/tesseradata/RHIPE/issues/34#issuecomment-162121827.

saptarshiguha commented 8 years ago

Submitted a pull request to fix this.

saptarshiguha commented 8 years ago

Adding this test code

hdfs.setwd("/user/sguha/tmp/")
rhdel("rhtest")
rhwrite(lapply(1:25, function(x) list(x, x)), file = "rhtest")

# map that collects "1" as key and a single-row data-frame
map <- expression({
    Map(function(k,v){
        rhcollect("1", data.frame(key = k, val = v, stringsAsFactors = FALSE))
    }, map.keys, map.values)
  })
reduce <- expression(
  pre = {
    adata <- list()
  },
  reduce = {
    adata[[length(adata) + 1]] <-reduce.values # c(reduce.values,NULL)
  },
  post = {
    #adata <- do.call(rbind, unlist(adata, recursive = FALSE))
    rhcollect(reduce.key, adata)
  }
)
b <- rhwatch(map = map, reduce = reduce, input = "rhtest", output ="rhtestout",mapred=list(mapred.reduce.tasks=1,rhipe_reduce_buff_size=10))

data.table(do.call(rbind,b[[1]][[2]][[1]]))
data.table(do.call(rbind,b[[1]][[2]][[2]]))
data.table(do.call(rbind,b[[1]][[2]][[3]]))
hafen commented 8 years ago

I believe this was fixed here: https://github.com/tesseradata/RHIPE/commit/3e6f3353708cb99824bb2fe2505ccd1b45b3df40