Closed wlandau closed 4 years ago
Thanks so much for this reprex, Amanda. I think your example ran smoothly because make()
does not actually attempt to reload the connection from the cache. Below, I force it to happen (note the "load 2 items: this_con, seeded" message).
library(drake)
library(DBI)
library(tidyverse)
pkgconfig::set_config("drake::strings_in_dots" = "literals")
# Funs
connect_to_db <- function() {
dbConnect(RSQLite::SQLite(), "db")
}
seed_db <- function(conn) {
dbWriteTable(conn, "mtcars", mtcars, overwrite = TRUE)
}
get_res <- function(conn) {
dbGetQuery(conn, "SELECT * FROM mtcars") %>%
as_tibble()
}
update_db_records <- function(tbl, conn) {
new <- tbl %>%
map_dfr(log)
dbWriteTable(conn,
"mtcars",
new,
append = FALSE,
overwrite = TRUE
)
}
test_is_original <- function() {
testthat::expect_equal(
mtcars %>%
as_tibble(),
get_res(con)
)
}
test_is_log <- function() {
testthat::expect_equal(
mtcars %>%
map_dfr(log),
get_res(con)
)
}
# Create db and write mtcars table
con <- connect_to_db()
seed_db(con)
test_is_original()
# Connection for updating is a target in the plan
plan <- drake_plan(
this_con = connect_to_db(),
seeded = get_res(this_con),
processed_data =
update_db_records(seeded, this_con)
)
# Do the planned database work.
make(plan, verbose = 4)
#> cache /tmp/RtmpCzTQKd/reprex2c0718c63f5/.drake
#> analyze environment
#> analyze 8 imports: seed_db, test_is_original, update_db_records, test_is...
#> analyze 3 targets: this_con, seeded, processed_data
#> construct graph edges
#> construct vertex attributes
#> construct graph
#> import get_res
#> import update_db_records
#> import RSQLite::SQLite
#> import connect_to_db
#> target this_con
#> store this_con
#> target seeded
#> store seeded
#> target processed_data
#> store processed_data
# Just remove process_data
# so the next make() loads the connection from the cache.
clean(processed_data)
make(plan, verbose = 4)
#> cache /tmp/RtmpCzTQKd/reprex2c0718c63f5/.drake
#> analyze environment
#> analyze 8 imports: seed_db, test_is_original, update_db_records, test_is...
#> import get_res
#> import update_db_records
#> import RSQLite::SQLite
#> import connect_to_db
#> skip this_con
#> skip seeded
#> load 2 items: this_con, seeded
#> target processed_data
#> fail processed_data
#> Error: Target `processed_data` failed. Call `diagnose(processed_data)` for details. Error message:
#> external pointer is not valid
Created on 2018-10-17 by the reprex package (v0.2.1)
Interestingly, this is not a problem if you use an in-memory cache.
library(drake)
library(DBI)
library(tidyverse)
pkgconfig::set_config("drake::strings_in_dots" = "literals")
# Funs
connect_to_db <- function() {
dbConnect(RSQLite::SQLite(), "db")
}
seed_db <- function(conn) {
dbWriteTable(conn, "mtcars", mtcars, overwrite = TRUE)
}
get_res <- function(conn) {
dbGetQuery(conn, "SELECT * FROM mtcars") %>%
as_tibble()
}
update_db_records <- function(tbl, conn) {
new <- tbl %>%
map_dfr(log)
dbWriteTable(conn,
"mtcars",
new,
append = FALSE,
overwrite = TRUE
)
}
test_is_original <- function() {
testthat::expect_equal(
mtcars %>%
as_tibble(),
get_res(con)
)
}
test_is_log <- function() {
testthat::expect_equal(
mtcars %>%
map_dfr(log),
get_res(con)
)
}
# Create db and write mtcars table
con <- connect_to_db()
seed_db(con)
test_is_original()
# Connection for updating is a target in the plan
plan <- drake_plan(
this_con = connect_to_db(),
seeded = get_res(this_con),
processed_data =
update_db_records(seeded, this_con)
)
# Do the planned database work.
cache <- storr::storr_environment()
make(plan, verbose = 4, cache = cache)
#> analyze environment
#> analyze 9 imports: seed_db, test_is_original, update_db_records, cache, ...
#> analyze 3 targets: this_con, seeded, processed_data
#> construct graph edges
#> construct vertex attributes
#> construct graph
#> import get_res
#> import update_db_records
#> import RSQLite::SQLite
#> import connect_to_db
#> target this_con
#> store this_con
#> target seeded
#> store seeded
#> target processed_data
#> store processed_data
# Just remove process_data
# so the next make() loads the connection from the cache.
clean(processed_data, cache = cache)
make(plan, verbose = 4, cache = cache)
#> analyze environment
#> analyze 9 imports: seed_db, test_is_original, update_db_records, cache, ...
#> import get_res
#> import update_db_records
#> import RSQLite::SQLite
#> import connect_to_db
#> skip this_con
#> skip seeded
#> load 2 items: this_con, seeded
#> target processed_data
#> store processed_data
Created on 2018-10-17 by the reprex package (v0.2.1)
So it looks like database connection objects loaded from storage no longer work. Since imported objects always come directly from your environment (or the envir
argument to make()
) I think it would be safer to set up the connection outside the plan.
Another limitation of both approaches is deployment to a cluster, where the connection object is invoked on a different computer than it was originally created. Below, the connection object is delivered to SGE nodes over a ZeroMQ socket. In that situation, each target would unfortunately need to reconnect to the database all over again.
library(drake)
library(DBI)
library(tidyverse)
# Funs
connect_to_db <- function() {
dbConnect(RSQLite::SQLite(), "db")
}
seed_db <- function(conn) {
dbWriteTable(conn, "mtcars", mtcars, overwrite = TRUE)
}
get_res <- function(conn) {
dbGetQuery(conn, "SELECT * FROM mtcars") %>%
as_tibble()
}
update_db_records <- function(tbl, conn) {
new <- tbl %>%
map_dfr(log)
dbWriteTable(conn,
"mtcars",
new,
append = FALSE,
overwrite = TRUE
)
}
test_is_original <- function() {
testthat::expect_equal(
mtcars %>%
as_tibble(),
get_res(con)
)
}
test_is_log <- function() {
testthat::expect_equal(
mtcars %>%
map_dfr(log),
get_res(con)
)
}
# Create db and write mtcars table
con <- connect_to_db()
seed_db(con)
test_is_original()
# Connection for updating is a global object
plan <- drake_plan(
seeded = get_res(con),
seeded2 = seeded,
processed_data =
update_db_records(seeded, con)
)
options(clustermq.scheduler = "sge")
make(plan, verbose = 4, jobs = 2, parallelism = "clustermq", caching = "master")
#> cache <CENSORED>/.drake
#> analyze environment
#> analyze 13 imports: ld, wd, seed_db, test_is_original, update_db_records, td,...
#> analyze 3 targets: seeded, seeded2, processed_data
#> construct graph edges
#> construct vertex attributes
#> construct graph
#> import con
#> import get_res
#> import update_db_records
#> Submitting 2 worker jobs (ID: 6944) ...
#> target seeded
#> Target seeded messages:
#> Loading required package: RSQLite
#> fail seeded
#> Error: Target `seeded` failed. Call `diagnose(seeded)` for details. Error message:
#> external pointer is not valid
#> Execution halted
#> <CENSORED> has registered the job-array task 42922204.1 for deletion
#> <CENSORED> has registered the job-array task 42922204.2 for deletion
Great examples! They illustrate the issue well.
A use case outside of the cluster scenario where you might want to be able to save a db connection as a target is, if some condition is met, doing something expensive like making a copy of the entire db and creating that connection from the copy; else, connecting to the original db. I think in the cluster situation most people would expect to need to reconnect each machine, but maybe a solution to the db copying use case would be portable to the cluster use case?
I'm not sure how or if drake
would be able to handle connections as targets, but for now indicating in the documentation that connections should be made outside of the plan seems like the right move.
Now that you mention copies, what if we have a drake
workflow that creates a new database from scratch? Without a database to start with, we might really want that connection to be a target.
If our connection could refresh itself automatically, this might be safer. (Maybe a connection object wrapped in an R6
class with an active field?) But then we'd have a target that changes itself unpredictably, which could invalidate downstream targets that depend on it. (Related: https://github.com/ropensci/drake/issues/345#issuecomment-399571383.)
Problems like https://github.com/ropensci/drake/issues/345 tend to go away the more we focus on functions rather than objects, so I wonder if the right direction is to make it easier to reconnect and disconnect.
library(drake)
library(dplyr)
drake_plan(
seeded = get_res(con),
processed_data = update_db_records(seeded, con)
) %>%
mutate(
command = paste(
"{con <- connect_to_db()\non.exit(dbDisconnect(con))\n",
command, "}"
)
) %>%
drake_plan_source()
#> drake_plan(
#> seeded = {
#> con <- connect_to_db()
#> on.exit(dbDisconnect(con))
#> get_res(con)
#> },
#> processed_data = {
#> con <- connect_to_db()
#> on.exit(dbDisconnect(con))
#> update_db_records(seeded, con)
#> },
#> strings_in_dots = "literals"
#> )
Created on 2018-10-19 by the reprex package (v0.2.1)
Of course, a hack that pastes code together is unsatisfying, so maybe drake
itself does require modification. Several months back, I implemented a hook
argument to make()
, which is a function that executes around a target's command. I did not exactly follow through with it, but with more development, maybe we could make something like this possible.
library(drake)
db_hook <- function(code){
con <- connect_to_db()
on.exit(dbDisconnect(con))
force(code)
}
plan <- drake_plan(
seeded = get_res(con),
processed_data = update_db_records(seeded, con)
)
make(plan, hook = db_hook)
I only worry that lexical scoping might not make con
available to the code
.
The following worked when I tried it.
# Funs
# ...
db <- function(code){
con <- connect_to_db()
on.exit(dbDisconnect(con))
eval(substitute(code))
}
con <- connect_to_db()
seed_db(con)
dbDisconnect(con)
plan <- drake_plan(
seeded = db(get_res(con)),
processed_data = db(update_db_records(seeded, con))
)
make(plan)
Maybe this could be part of our recommendation? Given how easy it turned out to be, I have mixed feelings about make(hook = ...)
.
Okay interesting! The second solution seems preferable to the first because it requires saving a connection, which people inevitably will want to be able to access later in order to reconnect, list tables, etc.
I think in the case of the second solution, though, you'd still need to get_res
because saving the return value of db
gives you the return of dbWriteTable
which is TRUE
in the success case.
Were you able to get the second solution to execute on the db? processed_data
here doesn't seem to have gotten log
ed.
# Funs
# ...
db <- function(code) {
con <- connect_to_db()
on.exit(dbDisconnect(con))
eval(substitute(code))
}
con <- connect_to_db()
seed_db(con)
dbDisconnect(con)
plan <- drake_plan(
seeded = db(get_res(con)),
is_processed = db(update_db_records(seeded, con)),
processed_data = db(get_res(con))
)
clean()
make(plan)
#> target seeded
#> target processed_data
#> target is_processed
loadd(is_processed)
is_processed
#> [1] TRUE
loadd(processed_data)
processed_data
#> # A tibble: 32 x 11
#> mpg cyl disp hp drat wt qsec vs am gear carb
#> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
#> 1 21 6 160 110 3.9 2.62 16.5 0 1 4 4
#> 2 21 6 160 110 3.9 2.88 17.0 0 1 4 4
#> 3 22.8 4 108 93 3.85 2.32 18.6 1 1 4 1
#> 4 21.4 6 258 110 3.08 3.22 19.4 1 0 3 1
#> 5 18.7 8 360 175 3.15 3.44 17.0 0 0 3 2
#> 6 18.1 6 225 105 2.76 3.46 20.2 1 0 3 1
#> 7 14.3 8 360 245 3.21 3.57 15.8 0 0 3 4
#> 8 24.4 4 147. 62 3.69 3.19 20 1 0 4 2
#> 9 22.8 4 141. 95 3.92 3.15 22.9 1 0 4 2
#> 10 19.2 6 168. 123 3.92 3.44 18.3 1 0 4 4
#> # ... with 22 more rows
testthat::expect_equal(mtcars %>% as_tibble(), processed_data)
Created on 2018-10-21 by the reprex package (v0.2.0).
Okay interesting! The second solution seems preferable to the first because it requires saving a connection, which people inevitably will want to be able to access later in order to reconnect, list tables, etc.
Just so I am clear, are we talking about the db()
function with on.exit()
? I did not realize we were saving a connection there.
I think in the case of the second solution, though, you'd still need to get_res because saving the return value of db gives you the return of dbWriteTable which is TRUE in the success case.
Were you able to get the second solution to execute on the db? processed_data here doesn't seem to have gotten
log
ed.
Thanks for catching that, I did not intend to return TRUE
for processed_data
. I read through your code way too hastily the first time.
With some minor adjustments, I actually do get the correct processed_data
. As you said, we should make sure db()
returns the value of get_res(con)
.
library(drake)
library(DBI)
library(tidyverse)
connect_to_db <- function() {
dbConnect(RSQLite::SQLite(), "db")
}
seed_db <- function(conn) {
dbWriteTable(conn, "mtcars", mtcars, overwrite = TRUE)
}
get_res <- function(conn) {
dbGetQuery(conn, "SELECT * FROM mtcars") %>%
as_tibble()
}
update_db_records <- function(tbl, conn) {
new <- tbl %>%
map_dfr(log)
dbWriteTable(
conn,
"mtcars",
new,
append = FALSE,
overwrite = TRUE
)
}
is_log <- function(con) {
identical(
mtcars %>%
map_dfr(log),
get_res(con)
)
}
db <- function(code) {
con <- connect_to_db()
on.exit(dbDisconnect(con))
eval(substitute(code))
get_res(con)
}
plan <- drake_plan(
seeded = db(seed_db(con)),
processed_data = db(update_db_records(seeded, con))
)
make(plan)
#> target seeded
#> target processed_data
con <- connect_to_db()
is_log(con)
#> [1] TRUE
dbDisconnect(con)
Created on 2018-10-21 by the reprex package (v0.2.1)
Another couple potential issues:
In drake
's code analysis, If con
is an object in the environment, it will be automatically detected as a dependency.
library(drake)
con <- "constant"
plan <- drake_plan(
seeded = db(seed_db(con)),
processed_data = db(update_db_records(seeded, con))
)
config <- drake_config(plan)
vis_drake_graph(config)
Created on 2018-10-21 by the reprex package (v0.2.1)
Then if this imposter con
is missing in the next make()
, the targets will be invalidated.
library(drake)
plan <- drake_plan(
seeded = db(seed_db(con)),
processed_data = db(update_db_records(seeded, con))
)
config <- drake_config(plan)
vis_drake_graph(config)
Created on 2018-10-21 by the reprex package (v0.2.1)
Maybe we could advise ignore(con)
? It does the job, but it's inconvenient. It is possible to add a global ignore
argument to make()
to exclude symbols like con
, but I would prefer not to add extra complexity to drake
itself.
library(drake)
con <- "contract"
plan <- drake_plan(
seeded = db(seed_db(ignore(con))),
processed_data = db(update_db_records(seeded, ignore(con)))
)
config <- drake_config(plan)
vis_drake_graph(config)
Created on 2018-10-21 by the reprex package (v0.2.1)
A potential safeguard is to add a phony target called con
to force drake
to ignore imports of the same name. It's technically a hack, but it does not seem to do harm.
library(drake)
pkgconfig::set_config("drake::strings_in_dots" = "literals")
con <- "constabulary"
plan <- drake_plan(
con = "phony",
seeded = db(seed_db(con)),
processed_data = db(update_db_records(seeded, con))
)
make(plan)
#> Unloading targets from environment:
#> con
#> ...
For many use cases, I am not sure we want to keep copying the contents of databases and caching them as targets. It seems like the data itself really belongs back in the database. From testimonies like this one, drake
unsurprisingly slows down with so many large intermediate results. (Incidentally, this use case motivated drake
's new "hasty" mode.)
What if we returned a hash instead? Maybe a time stamp if that is too expensive? I use fastdigest()
below to set each target's value to be the fingerprint of the intended result.
library(drake)
library(DBI)
library(tidyverse)
pkgconfig::set_config("drake::strings_in_dots" = "literals")
connect_to_db <- function() {
dbConnect(RSQLite::SQLite(), "db")
}
seed_db <- function(con) {
dbWriteTable(con, "mtcars", mtcars, overwrite = TRUE)
}
get_res <- function(con) {
dbGetQuery(con, "SELECT * FROM mtcars") %>%
as_tibble()
}
update_db_records <- function(con) {
new <- get_res(con) %>%
map_dfr(log)
dbWriteTable(
con,
"mtcars",
new,
append = FALSE,
overwrite = TRUE
)
}
is_log <- function(con) {
identical(
mtcars %>%
map_dfr(log),
get_res(con)
)
}
db <- function(code) {
con <- connect_to_db()
on.exit(dbDisconnect(con))
eval(substitute(code))
fastdigest::fastdigest(get_res(con))
}
plan <- drake_plan(
con = "phony",
seeding_step = db(seed_db(con)),
processing_step = db(update_db_records(con))
)
make(plan)
#> target con
#> target seeding_step
#> target processing_step
con <- connect_to_db()
is_log(con)
#> [1] TRUE
dbDisconnect(con)
Created on 2018-10-21 by the reprex package (v0.2.1)
Just so I am clear, are we talking about the db() function with on.exit()? I did not realize we were saving a connection there.
Yep my bad, you're totally right.
Nice hashing solution! I think you're right that storing the result of get_res
itself isn't the right way to go as presumably the user can query the database at any time.
It may also be useful for people to expand db
as I have here to be able to return a different result than get_res
in the case when get_res
is expensive.
library(drake)
library(DBI)
library(tidyverse)
pkgconfig::set_config("drake::strings_in_dots" = "literals")
connect_to_db <- function() {
dbConnect(RSQLite::SQLite(), "db")
}
seed_db <- function(con) {
dbWriteTable(con, "mtcars", mtcars, overwrite = TRUE)
}
get_res <- function(con) {
dbGetQuery(con, "SELECT * FROM mtcars") %>%
as_tibble()
}
update_db_records <- function(con) {
new <- get_res(con) %>%
map_dfr(log)
dbWriteTable(
con,
"mtcars",
new,
append = FALSE,
overwrite = TRUE
)
}
is_log <- function(con) {
identical(
mtcars %>%
map_dfr(log),
get_res(con)
)
}
mt_3 <- function(con) {
dbGetQuery(con, "SELECT * FROM mtcars
LIMIT 3") %>%
as_tibble()
}
db <- function(code, query) {
con <- connect_to_db()
on.exit(dbDisconnect(con))
eval(substitute(code))
fastdigest::fastdigest(query(con))
}
plan <- drake_plan(
con = "phony",
seeding_step = db(seed_db(con),
query = mt_3),
processing_step = db(update_db_records(con),
query = mt_3)
)
clean()
make(plan)
#> target con
#> target seeding_step
#> target processing_step
con <- connect_to_db()
is_log(con)
#> [1] TRUE
dbDisconnect(con)
Created on 2018-10-22 by the reprex package (v0.2.1)
Hmm... I think we are getting into custom metaprogramming (similar to this) and it is a great place for it. If we pass both the query and the task, then the connection object will no longer show up in the code analysis.
Another thing I forgot: even if we use hashes instead of in-memory data, the command of processing_step
should still mention seeding_step
somewhere. Otherwise, the dependency relationship is lost.
Latest reprex:
library(drake)
library(DBI)
library(tidyverse)
pkgconfig::set_config("drake::strings_in_dots" = "literals")
connect_to_db <- function() {
dbConnect(RSQLite::SQLite(), "db")
}
seed_db <- function(con) {
dbWriteTable(con, "mtcars", mtcars, overwrite = TRUE)
}
get_res <- function(con) {
dbGetQuery(con, "SELECT * FROM mtcars") %>%
as_tibble()
}
update_db_records <- function(con) {
new <- get_res(con) %>%
map_dfr(log)
dbWriteTable(
con,
"mtcars",
new,
append = FALSE,
overwrite = TRUE
)
}
is_log <- function(con) {
identical(
mtcars %>%
map_dfr(log),
get_res(con)
)
}
mt_3 <- function(con) {
dbGetQuery(con, "SELECT * FROM mtcars
LIMIT 3") %>%
as_tibble()
}
db <- function(execute, observe, ...) {
con <- connect_to_db()
on.exit(dbDisconnect(con))
execute(con)
fastdigest::fastdigest(observe(con))
}
plan <- drake_plan(
seeding_step = db(seed_db, mt_3),
processing_step = db(update_db_records, mt_3, seeding_step)
)
make(plan)
#> target seeding_step
#> target processing_step
config <- drake_config(plan)
vis_drake_graph(config)
con <- connect_to_db()
is_log(con)
#> [1] TRUE
dbDisconnect(con)
Created on 2018-10-22 by the reprex package (v0.2.1)
Cool so just to make sure I understand, processing_step
is now treating seeding_step
as a dependency because you're passing seeding_step
to db
in ...
?
Yes, exactly. That's the magic of static code analysis. I attempt an explanation here, but I could probably do more to demystify it, possibly citing CodeDepends
and this section of Advanced R.
Awesome. Well it seems to me like your db
function covers a lot of the issues I originally raised while remaining concise. Up to you of course if you want to flesh it out and include it in the package or if you think better to provide as an example and have people implement themselves in whatever way best matches their use case.
Might be worth stress testing the approach against a large database and/or one that has high latency. Maybe with a more expensive version of an update_db_records
or through many iterations of an update_db_records
. (A bit hard for me to tell from the benchmarks on the visualization what the effect would be of re-connecting at each step would be with many more db operations.)
Awesome. Well it seems to me like your db function covers a lot of the issues I originally raised while remaining concise. Up to you of course if you want to flesh it out and include it in the package or if you think better to provide as an example and have people implement themselves in whatever way best matches their use case.
I prefer to write about it in the manual rather than include it in the package itself. Maybe one example with two versions of db()
: one that returns a hash while we're manipulating a large database and another that returns a subset of the data for summarization and post-processing in downstream targets. Maybe a drake_example("database")
too (ref: https://github.com/wlandau/drake-examples/issues/6).
Might be worth stress testing the approach against a large database and/or one that has high latency. Maybe with a more expensive version of an update_db_records or through many iterations of an update_db_records. (A bit hard for me to tell from the benchmarks on the visualization what the effect would be of re-connecting at each step would be with many more db operations.)
Absolutely. Maybe benchmarks of connect_to_db()
itself too.
Sounds good! Let me know if you could use any more help from me on this and thanks for the interesting conversation.
You are welcome, I really appreciate your help nailing down some good practices. I think the next thing is to identify a large public database that we are allowed to use for teaching drake
. Also cc @AlexAxthelm.
That sounds great. I should have a chance to work on this soon, since it's overlapping more with work.
Another issue: pure functions and immutability. For the sake of reproducibility, Make-like pipeline tools usually require a guarantee that the act of building a target does not change any dependencies upstream. drake
is working towards this with make(lock_envir = TRUE)
(ref: https://github.com/ropensci/drake/issues/619, https://github.com/ropensci/drake/pull/621).
Immutability makes database work tricky. We usually want to do a lot of in-place operations so we can avoid deep copies of large tables.
Advice from @edgararuiz: look at how Spark manages its pipelines. The section on ML pipelines seems relevant.
If we write a new chapter on this, data.table
probably deserves a mention too.
Since drake
now supports efficient formats for large data, we could turn this into a chapter on big data and warn about immutability when it comes to databases.
What is the recommended way of working with a large data.table? I've been running copy() at the beginning of functions to preserve the original - but hitting memory limits now so need to edit in place. Lets say i have an object dt
that is edited in place by one part of the plan, but also used to build another target. Should i run rm()
on the original reference after its been edited so Drake reloads it or what do you recommend?
You might be interested in the manual's chapter on memory management. The TL;DR is to choose an appropriate memory_strategy
and activate garbage_collection
in make()
. And for speed, if you are not doing this already, I recommend using the "fst_dt" format for storage, e.g. drake_plan(x = target(generate_large_dt(), format = "fst_dt"))
.
Thanks! Did not know about fst_dt format. Server is still on Drake 7.2 but now I might be able to get the admin to update... :)
For databases in general, I think d996dc25f1bf95712eae8ef2b60f3607a84a047b is sufficient for the manual for now. If more comes to light here or https://github.com/ropensci/drake/issues/1046, we can add and amend as we go.
For #20, it would be great to align on best practices for working with database connection objects. @aedobbyn and @AlexAxthelm have much more experience with databases than I do. From @aedobbyn in https://github.com/ropensci/drake/issues/552: