Closed alexsanjoseph closed 4 years ago
Thanks for the detailed overview - I have used data.table a bit, but not furrr
.
You numbers are interesting, though. I have seen data table giving magnitude improvements, but how does adding in futures
make minutes into seconds? I understand it uses multiple cores, but the theoretical maximum improvement should be limited to the number of cores?
Will try to take a crack over the weekend on this
Also you've understood the code correctly. I think I should be feeling proud of myself ;)
R is singlethreaded and every operation is blocking. In my use case I start with a 10 megabyte table of ~56K rows, each row containing the start and end point of a sequence of numbers I need to work on. It's far too large for me to do the entire process all at once to the entire table, so I need to split it up into chunks of about 1K rows at a time.
So if it takes 10 seconds to do a 1K row chunk then i'm looking at about 10 minutes to do everything serially. If I multithread this using future_map()
then instead of a single thread that operates on all 56K chunks consecutively I have 8 fully independant worker threads. Any time one of them finishes a chunk it automagically grabs the next available chunk to work on until the whole process is completed.
At even a pessimistic half efficiency that's a difference of 560 seconds down to 140 seconds. Plus consider the efficiency gains when one of the worker threads hit's a difficult job that takes particularly long to complete and the others can simply leapfrog it.
Don't get me wrong it's not magic, multithreading can be worse than singlethreaded when you've got lots of tiny operations being performed and it wastes too much time on setup or moving data back and forth between the main and child processes. But in any use case where you're bound by a independent long-blocking cpu intensive jobs it works great.
Thanks for the detailed info, that makes sense.
In compareDFs case I wonder if lots of tiny operations might be the case? Most of the time the Number of group ~ Number of rows, so the overhead might be too much.
But definitely the only way to know it is to do some testing and figure out how it work for this specific case.
By "lots of tiny operations" I meant in the context of creating and resolving child processes, I can see how I wasn't that clear about that. Future's big weakness is that it takes time to get all the child processes set up, organized, and the various threads coordinated. A big part of that is that passing data back and forth between parent and child process is done basically by serializing it as if you were writing and reading an RDS file.
Think of it like the difference between the items lapply iterates over versus the code lapply applies within each iteration. If your code takes a long time because each iteration of lapply is a time consuming cpu-bound process then you're right in Furrr's wheelhouse and will see significant gains. On the other hand if each iteration of lapply takes a very short time and there's just a whole bunch of them then any gains from multithreading will be a wash or even outweighed by the overhead.
In your case you have a small number of time consuming iterations that can be done in any order. You won't see any difference on smaller objects, but when you start comparing 700K row ~70 megabyte dataframes it'll definitely show.
Turns out I have no idea how to use git but that's ok because this chunk of the rewrite was compact enough to fit in a comment. The below function gets you a comparison_df with -
for subtracted rows, +
for added rows, and ~
for rows where group_col
exists in both objects but all values are not identical. It has an optional argument wideoutput
to choose between the traditional row based compare_df()
output and a side-by-side version where you see the group_col
followed by the old data, then chng_type
, then new data.
This also allows for a huge optimization when processing cell-level data. Instead of needing to operate on the entire object you can immediately do a fast keyed filter to only get rows where chng_type="~"
. On top of this the addition of the newold
column preserves information on directionality, meaning it's possible not only to determine that something has changed but also exactly what that change was. Since you know the order of values now you can meaningfully calculate change over time based on the column class. Date/time columns can provide elapsed time, numeric columns change in absolute or percentage, and character/logical columns a simple difference of X->Y
As for how to go about that there's two main options. Keep the information vertical and split the work up by columns, or put it in side-by-side format and then figure out a parametric way to access column names and match them up based on _old
or _new
. The latter allows you to chunk objects by row since every row becomes completely independent of the others.
Theoretically you could combine row-chunking with vertical-operation by creating a unique per-group ID number but I'm not sure it's worth it. Maybe long term as an optional flag for people who want to compare million row dataframes.
Edit: Almost forgot. The performance difference between this and the current CRAN version of compareDF when used on a 1 megabyte dataframe of 14,000 rows is 23 seconds vs 0.079 seconds.
dtcompare <- function(df_old, df_new, group_col, wideoutput=FALSE) {
#setDT by reference in case we're working with big objects
df_old<-as.data.table(df_old, key=group_col)
df_new<-as.data.table(df_new, key=group_col)
#add a way to track which rows are from which DF
df_old[,newold:="old"]
df_new[,newold:="new"]
#rbind order doesn't matter since we track with column newold and the key reorders everything anyway
df_combined<-rbind(df_old,df_new)
setDT(df_combined)
setkeyv(df_combined, group_col)
#first DT op gets only rows with no dupes at all to identify deletions/additions by the newold column
#first dt OP is put inside "i" clause to operate on subset without losing rest of data
df_combined[
df_combined[
,
if(.N==1) .SD,
by=group_col
],
chng_type:=if_else(newold=="new","+","-")
]
#duplicated returns first of the duplicates, with fromLast=TRUE returns the second
#this may miss triplicate rows, but we're comparing 2 DFs so that's an error anyway
#Can't use the if(.N) trick from above for some reason, doesn't write changes to chng_type
#setdiff(names(),'newold') alls programmatically using all columns but newold
df_combined[
(duplicated(df_combined, by=setdiff(names(df_combined), 'newold')) |
duplicated(df_combined, fromLast=TRUE,by=setdiff(names(df_combined), 'newold'))),
chng_type:="="
]
#Anything left cng_type="NA" must be a row that exists in both DFs and is not identical
#if it is not a deletion, addition, or identical it must be an alteration
df_combined[is.na(chng_type),chng_type:="~"]
#if wideoutput==TRUE then output will be:
#group_col, df_old columns, chng_type, df_new columns
#otherwise behavior is unchanged
if (wideoutput==TRUE) {
df_widecomparison<-as.data.table(
full_join(
df_combined[df_combined$newold=='old',],
df_combined[df_combined$newold=='new',],
by=c(group_col,'chng_type'),
suffix=c("_old","_new")
), key=group_col
)
df_widecomparison[,`:=` (newold_new=NULL, newold_old=NULL)]
return(df_widecomparison)
} else if (wideoutput==FALSE) {
return(df_combined)
}
}
wideoutput=FALSE
or not supplied
wideoutput=TRUE
Hey nice! Let me check it out over the weekend
@D3SL - Went over your snippet, here are my comments:
Also, the time taken for difference checking varies significantly on the type of differences that is present. I have been trying to do some performance testing with a two dataframes of 240K(10MB) and 360K(15MB) rows each with ~100K differences and the current implementation was able to find and tabulate 100K differences in <10 seconds.
I assume your test dataset is significantly different and hence the results you're seeing. There might be cases which are really limiting to the current implementation. I'd appreciate if you could direct me to details of how to download/create the dataset for me to test and see if I can improve on it.
I really appreciate the time you've taken to try it out and give me a workable example. If you believe that you can do an implementation that passes all the current tests in compareDF while doing it faster, I'll be more than glad to merge it into my package. As it stands now, I'm not able to see how I can conceivably bring in this approach without losing other features that are currently provided by compareDF. I understand that your approach solves the specific problem you have, so maybe it makes sense to build it into a separate package that could be helpful to the community?
P.S - I've been trying out this with data.table using the current implementation, but the bottleneck(duplicate finding with rowdiff) doesn't seem to be something that can be easily solved by data.table. So I'm not even sure that the issue even makes sense anymore. I'll wait for your reply before deciding whether to close it or keep it open.
I think I should clarify that wasn't meant to be a finished product, it was a proof of concept to test if data.table could even work for this. I only put an afternoon or two into it and I think most of that was trying to really grok how the magic happened to see what an efficient way to refactor it would be. I don't have any of the other features partly because I still don't fully understand what the lapply calls were doing, and mostly because I was aiming just to recreate that one core component.
From what I could gather the entire package starts by first creating the comparison_df
table and everything else is done by processing that in different ways. I was hedging on that being where the overwhelming majority of the runtime was coming from.
I assume your test dataset is significantly different and hence the results you're seeing.
Interestingly it's not, the test set I used for the benchmark had only 5 rows different between the two. The inefficiency may be a product of the key column being 14K rows of doubles instead of something like character or integer data, it could be that R's choking on those being doubles because it's got to do the internal math to get the "full" number to work with. I'll work on putting together a test set that reproducibly shows the difference.
but the bottleneck(duplicate finding with rowdiff) doesn't seem to be something that can be easily solved by data.table.
It's actually so trivial in data.table that it's not even a separate thing you do, it's just how data.table works. Setting the key physically reorders the data in memory and also defines what counts as a unique observation. Just by having the data keyed and the newold
column you can determine which rows are identical, added or deleted, or have a value altered.
For example combined_rowdiffs
and rowdiffs
are both replaced by this:
both_tables[ , if(.N==1) .SD, by=group_col]
The i
section is left empty so it takes the entire table. From there it groups the table according to Student
and Division
, and counts how many rows are in each group by calling the special character .N
. If .N==1
then that row is passed to the special object .SD
which is the subset of the current table meeting the criteria given in the DT call. In this case any row which has no duplicates according to group_col
gets passed to .SD
.
Because j
is already being used you have to drop that into the i
section of another DT call. Because you know the group_col
doesn't exist on the filtered rows you know it has to be a whole-row subtraction or addition, and the newold
column tells you which of those it is.
both_tables[ #The parent DT call opens here
both_tables[, if(.N==1) .SD, by=group_col] #This entire DT call is the "i" slot of the parent DT call
, #the comma means the "i" slot of the parent call is ended
chng_type:=if_else(newold=="new","+","-") #The "j" slot of the parent call sets chng_type based on newold
]
Finding rows that are unchanged entirely is even simpler, you just use duplicated()
as the filter for i
. After that by process of elimination any row where chng_type=NA
is a row where the group_col
exists in both tables but one of its cells was changed somehow.
Thanks for the detailed explanations. The thinking process seems to be a paradigm shift from the current implementation, but I think you are on to something here, I will take a stab at this.
Meanwhile, I'll wait for you to give me the reproducible test case with the data that is taking an inordinately long time.
@D3SL - I was able to fold it in your idea without too many breaking changes. Please take a look at the PR if you have any comments.
https://github.com/alexsanjoseph/compareDF/pull/36
You should also be able to try out the changes in the new branch and see if it has improved the performance as well. (My ests gave me a 5x increase in speed)
I've also added the create_wide_output as an additional function.
Thanks for the amazing idea and feedback!
Also I'd like to acknowledge your efforts by name, if that's alright. Currently I've used D3SL as the pseudonym
Please take a look at the PR if you have any comments.
Sure, but you might need to link me to a decent tutorial on how to use github first. I've only used it for bug reports until now.
Meanwhile, I'll wait for you to give me the reproducible test case with the data that is taking an inordinately long time.
Going on the theory that the original delay had to do with having a huge number of long doubles I put these two together off a govt website, it's international public data. If I'm right the main branch should take about 15 seconds to run on an i5 8th gen and the DT branch should be almost instant.
Also I'd like to acknowledge your efforts by name, if that's alright. Currently I've used D3SL as the pseudonym
That's fine, this is as much my name as anything else.
Sure, but you might need to link me to a decent tutorial on how to use github first. I've only used it for bug reports until now.
Nevermind - You can just try installing the latest and see if it works fine.
Development version can be installed using devtools
devtools::install_github('alexsanjoseph/compareDF')
I tried using the dataset you posted -
test_new = readxl::read_xlsx("test_new.xlsx")
test_old = readxl::read_xlsx("test_old.xlsx")
system.time({A = compare_df(test_new, test_old, c('start', 'end'))})
Using the older approach takes 1.5 seconds, and with the new one takes 0.35 seconds. Significant improvement, but curious why it took much more time when you tried it (different grouping columns?)
Thanks, I've gone with D3SL for acknowledgements
Fixed in 2.3.0
Sorry I disappeared, haven't had much free time recently. I tried to check whether our using different grouping columns accounted for the runtime difference but I couldn't replicate it, which is weird.
I installed the 2.3 version off github and made up some test data (below) as a sort of worst case scenario. Grouping only by the "key" column I get about an 5 second runtime with compareDF 2.3.0-dev off github and .08sec with my bare data.table comparison. I'm not sure if that's due to an efficiency loss somewhere or because compareDF is just doing a whole lot more.
Probably chucking profvis
at this would be the best bet.
tibble(
key=runif(2500, min=100000, max=10000000000),
text=sample(fruit, 2500, replace=TRUE),
nacol=NA,
logical=sample(c(TRUE,FALSE),2500,replace=TRUE)
)
Happy to help with this if you don't mind helping me get up to speed on what's going on under the hood.
If I understand it right this is the heart of the comparison process:
After this is where I get a bit lost on how you get cell-level data on changes. It looks like you use the call to lapply to iterate over each column one at a time, but specifically in interaction with dplyr's group_by so your difference-checking code (range_x, len_unique_x, etc) all operate on each dplyr-group within a column as opposed to the column as a whole.
If I understand this right then even more than data.table I think you should look into furrr. I'd assumed you were doing some kind of cartesian comparison, every row in df_old against every row in df_new, but your code is operating on each column independantly. Using future_map instead of lapply within
.diff_type_df
would mean instead of calling lapply sequentially for each column you could divide that running time by the number of available cores. A four column dataframe on a remotely modern computer could be run in about 1/2 to 1/3rd of the time .To put it in real world numbers I run a similar group-based operation on a ~1 igabyte dataframe and going from dplyr to data.table took me from 30 minutes to 15 minutes, but switching from regular map to future_map took that down to seconds. And the best part is it's a drop-in replacement, if someone doesn't explicitly set their plan to multiprocess/multicore it transparently works just like regular map.