Closed piccolbo closed 9 years ago
Same problem I had with outer joins. I ended up doing a quick job to get all the columns and another one to do an explicit conversion. Workaround is returning list(as.data.frame(...)). Then there is no crash, but you are building a list and you can postprocess the list the way you want. It's not ideal but the way serialization is set up since 3.0 it's unlikely this usage will be supported. This is opinion, but I think supporting it was a stretch from the start, because the idea behind rmr2 distributed data structures is that one is storing or creating a data frame, matrix, vector or list. There's no variable column data frame that I know of in R, so why should it be supported in the distributed version? Yes in 2.x you could do this because there was an implicit rbind_fill kind of operation behind the scenes, but it's not something that was deliberately supported nor I think was documented ever. I am guilty of assuming it myself in implementing outer joins, but two wrongs don't make a right. Besides that, there is the problem of implementing this. When serializing, we need to distribute metadata sparingly because the overhead can be crippling. If the map key has many distinct values, we need the data to be split into small chunks, possibly one row each, so that it can travel to the correct reducer. If we send the metadata (class and colnames) with each chunk, then everything is easy, including reassembling a data frame with variable-column parts. But the space overhead can be 120x, without trying too hard (single integer col, single row groups) and can be made +Inf if you pick very long col names. That made performance unpredictable and caused grief to users. In 3.0 there are data chunks and metadata chunks interspersed with them, but no longer associated 1:1. So the overhead can be kept under control, but the metadata chunks have to be all identical. To relax this constraint, you'd have to add a metadata id to each chunk and send the metadata chunks reliably to each reducer, then re-associate them with the data and build a data frame. It's doable, but it's a project, and a challenging one if you add a requirement to do it with little or no performance penalty. How's the workaround looking for you, OK, ugly, blocker?
That makes a lot of sense. Are the "chunk" sizes also determined by keyval.length
/nrow
arguments or is that a separate mechanism? I should really just review the code changes.
This issue only seems to appear upon reading the values, either with from.dfs
or subsequent mapreduce jobs. I understand that the chunks need to be consistent internally, but isn't this an issue of combining different chunks? Or is it that they're being serialized within a single chunk without failing? If it's the latter, it seems like a good idea to throw the error on write, rather than read.
On Tue, Sep 16, 2014 at 10:54 AM, Jamie F Olson notifications@github.com wrote:
That makes a lot of sense. Are the "chunk" sizes also determined by keyval.length/nrow arguments or is that a separate mechanism? I should really just review the code changes.
Yes, when key is NULL
This issue only seems to appear upon reading the values, either with from.dfs or subsequent mapreduce jobs. I understand that the chunks need to be consistent internally, but isn't this an issue of combining different chunks? Or is it that they're being serialized within a single chunk without failing? If it's the latter, it seems like a good idea to throw the error on write, rather than read.
Yes, failing early with a decent error message would be great progress
already. The problem is that we have concurrent processes each writing to
their partition, one is writing mtcars
and the other is writing mtcars[, rev(names(mtcars)]
, unbeknownst to the other. So within a single reducer
rmr could do a better job of detecting a change in metadata an throwing
the appropriate error, but let's say that that's implemented and it doesn't
occur. Now on the read side, you find multiple partitions with inconsistent
metadata. We could read them one at a time and then throw the appropriate
error. Seems more feasible now that I wrote it down.
— Reply to this email directly or view it on GitHub https://github.com/RevolutionAnalytics/rmr2/issues/142#issuecomment-55784489 .
So only one reducer actually writes out the metadata for the whole job's output? Or are there multiple concurrent write processes for a single reduce task?
Every reducer is writing metadata for a subset of keys. There is really no way I know of for a reducer to know it is different from the others.
On Tue, Sep 16, 2014 at 11:07 AM, Jamie F Olson notifications@github.com wrote:
So only one reducer actually writes out the metadata for the whole job's output? Or are there multiple concurrent write processes for a single reduce task?
— Reply to this email directly or view it on GitHub https://github.com/RevolutionAnalytics/rmr2/issues/142#issuecomment-55786764 .
So then which concurrent processes are writing to the same chunk? Is the issue that the reducer writes multiple keyvals into a single chunk?
On Tue, Sep 16, 2014 at 12:14 PM, Jamie F Olson notifications@github.com wrote:
So then which concurrent processes are writing to the same chunk?
That cannot happen AFAIK
Is the issue that the reducer writes multiple keyvals into a single chunk?
The issue is that the each reducer write 1 copy of the metadata every, say, 100 data items. The association between data item and metadata is lost. Let me try with an example: on disk we have
data: col1, col2, col3 data: col1, col2, col3 data: col1, col2, col3, col4 data: col1, col2, col3, col4, col5 data: col1, col2, col3 metadata: col1, col2, col3 ?? data: etc
Where I put the ?? I don't know what I should write: metadata that's compatible with the first, second or third data row? Simplifying assumption: all rows have some number and type and name of cols. Even single reducer, I have this problem. With multiple reducers writing their own partition incommunicado, I have the same problem, just harder to solve. My answer was, this is logically a data frame, there's one version of the metadata aka attributes for everybody and that's it. Make sense, but is not compatible with rmr 2.0 where it was like
metadata data metadata data metadata data
Even if data was a single row, the metadata had to be there. Makes sense now?
—
Reply to this email directly or view it on GitHub https://github.com/RevolutionAnalytics/rmr2/issues/142#issuecomment-55797757 .
Opened on a report by @jamiefolson
When the reduce function returns different columns for different values, the job completes successfully, but any attempt to work with the results will lead to a fatal error crashing the R session:
This may seem like an odd thing to want to do, but for sequences of operations, it is convenient to construct a result sequentially and return as partial results even when the full result cannot be calculated.