r-dbi / RPostgres

A DBI-compliant interface to PostgreSQL
https://rpostgres.r-dbi.org
Other
328 stars 78 forks source link

Redshift: dbWriteTable() with larger DFs and CSVs #429

Closed kmishra9 closed 3 months ago

kmishra9 commented 1 year ago

Hey there,

I'd run into some issues with trying to upload data to redshift using RPostgres and non-Redshift-specific drivers a couple years ago and developed a workaround then that relied on pushing data to S3 first and then copying it into Redshift.

That solution utilizes the somehow-still-working RedshiftTools package, so when I was doing some refactoring, I was eager to see if DBI::dbWriteTable had made any progress on this project in the couple years since, and figured I'd toss a reprex your way with any bugs I saw.


It is "hard" to get data into redshift in ways that probably shouldn't be hard. Both dplyr::copy_to() and dbplyr::dbWriteTable() don't work as expected for me, and I'm forced to rely on a hackier workaround than I'd like to.

If one outcome of this bug report is to say "You should use use dbWriteTable() to upload a CSV instead of DF, that's totally fine w/ me, but that part should probably be fixed to work w/ schema naming and DBI::Id just like the "lite" version with the small DF does.

suppressPackageStartupMessages(library(dplyr))
suppressPackageStartupMessages(library(dbplyr))

db <- DBI::dbConnect(
    drv      = RPostgres::Redshift(),
    host     = "xxxxxxxxxxxxxxxxxx",
    port     = "5439",
    user     = keyring::key_get(service = "rs_un_airflow_prod"),
    password = keyring::key_get(service = "rs_pw_airflow_prod"),
    dbname   = 'xxxxxxxxxxxxxxxxxx',
    sslmode  = "require"
)

db2 <- DBI::dbConnect(RSQLite::SQLite(), dbname = ":memory:")

# db3 <- simulate_redshift() # Wish these worked with DBI for reprex purposes but they don't...
# db4 <- simulate_sqlite()

test_df_full <- nycflights13::flights
test_df_lite <- nycflights13::flights %>% slice(1:100)

temp_path_full <- tempfile()
temp_path_lite <- tempfile()

db_path_full_rs <- DBI::Id(schema = 'public', table = 'test_df_full')
db_path_lite_rs <- DBI::Id(schema = 'public', table = 'test_df_lite')
db_path_full_sq <- DBI::Id(schema = NULL, table = 'test_df_full')
db_path_lite_sq <- DBI::Id(schema = NULL, table = 'test_df_lite')

readr::write_csv(x = test_df_full, file = temp_path_full)
readr::write_csv(x = test_df_full, file = temp_path_lite)

# Writing with the df itself only works for small enough DFs in Redshift
DBI::dbWriteTable(conn = db, name = db_path_full_rs, value = test_df_full, overwrite = T)
#> Error: Failed to prepare query: ERROR:  Statement is too large. Statement Size: 77989149 bytes. Maximum Allowed: 16777216 bytes
DBI::dbWriteTable(conn = db, name = db_path_lite_rs, value = test_df_lite, overwrite = T)

db %>% tbl(db_path_full_rs) %>% tally() # Expected: n = 336,776; Actual: Error
#> Error in `db_query_fields.DBIConnection()`:
#> ! Can't query fields.
#> Caused by error:
#> ! Failed to prepare query: ERROR:  relation "public.test_df_full" does not exist
#> Backtrace:
#>      ▆
#>   1. ├─db %>% tbl(db_path_full_rs) %>% tally()
#>   2. ├─dplyr::tally(.)
#>   3. ├─dplyr::tbl(., db_path_full_rs)
#>   4. └─dplyr:::tbl.DBIConnection(., db_path_full_rs)
#>   5.   ├─dplyr::tbl(...)
#>   6.   └─dbplyr:::tbl.src_dbi(...)
#>   7.     └─dbplyr::tbl_sql(c(subclass, "dbi"), src = src, from = from, ...)
#>   8.       ├─vars %||% dbplyr_query_fields(src$con, from_sql)
#>   9.       └─dbplyr:::dbplyr_query_fields(src$con, from_sql)
#>  10.         └─dbplyr:::dbplyr_fallback(con, "db_query_fields", ...)
#>  11.           ├─rlang::eval_bare(expr((!!fun)(con, ...)))
#>  12.           └─dbplyr:::db_query_fields.DBIConnection(con, ...)
#>  13.             └─base::tryCatch(...)
#>  14.               └─base (local) tryCatchList(expr, classes, parentenv, handlers)
#>  15.                 └─base (local) tryCatchOne(expr, names, parentenv, handlers[[1L]])
#>  16.                   └─value[[3L]](cond)
#>  17.                     └─cli::cli_abort("Can't query fields.", parent = cnd)
#>  18.                       └─rlang::abort(...)
db %>% tbl(db_path_lite_rs) %>% tally() # Expected: n = 100; Actual: n = 100
#> # Source:   SQL [1 x 1]
#> # Database: postgres  [kunal_ha@airflow-production-redshift.data.crickethealth.com:5439/eff_swh]
#>         n
#>   <int64>
#> 1     100

# Both work in SQLite and Microsoft SQL though
DBI::dbWriteTable(conn = db2, name = db_path_full_sq, value = test_df_full, overwrite = T)
DBI::dbWriteTable(conn = db2, name = db_path_lite_sq, value = test_df_lite, overwrite = T)

db2 %>% tbl(db_path_full_sq) %>% tally() # Expected: n = 336,776; Actual: 336,776
#> # Source:   SQL [1 x 1]
#> # Database: sqlite 3.40.1 [:memory:]
#>        n
#>    <int>
#> 1 336776
db2 %>% tbl(db_path_lite_sq) %>% tally() # Expected: n = 100; Actual: n = 100
#> # Source:   SQL [1 x 1]
#> # Database: sqlite 3.40.1 [:memory:]
#>       n
#>   <int>
#> 1   100

# Writing "indirectly" from a file doesn't work at all
DBI::dbWriteTable(conn = db, name = db_path_full_rs, value = temp_path_full, overwrite = T)
#> Error in (function (classes, fdef, mtable) : unable to find an inherited method for function 'dbWriteTable' for signature '"RedshiftConnection", "SQL", "character"'
DBI::dbWriteTable(conn = db, name = db_path_lite_rs, value = temp_path_lite, overwrite = T)
#> Error in (function (classes, fdef, mtable) : unable to find an inherited method for function 'dbWriteTable' for signature '"RedshiftConnection", "SQL", "character"'

# Refresh SQLite DB
db2 <- DBI::dbConnect(RSQLite::SQLite(), dbname = ":memory:")

# But works in SQLite (though NOT with DBI:Id which is likely a bug I've also gotta report)... 
# DBI::dbWriteTable(conn = db2, name = db_path_full_sq, value = temp_path_full, overwrite = T)
# DBI::dbWriteTable(conn = db2, name = db_path_lite_sq , value = temp_path_lite, overwrite = T)
DBI::dbWriteTable(conn = db2, name = 'test_df_full', value = temp_path_full, overwrite = T)
DBI::dbWriteTable(conn = db2, name = 'test_df_lite', value = temp_path_lite, overwrite = T)

Created on 2023-03-30 with reprex v2.0.2

krlmlr commented 1 year ago

Thanks. This is a hard problem, also tracked in https://github.com/r-dbi/DBI/issues/252.

Loading large data works best if the data is near the server. Also, it helps to disable or delete things like indexes and constraints. To load large data efficiently, a little more than a single function call will be necessary. Procedures will vary vastly across databases.

For small data, these things don't matter that much, reliability is important, and dbWriteTable() should just work.

The current Redshift implementation creates a huge SQL query that inserts all rows. As you noticed, this collides with Redshift's limit on the query size. To work around this, we need a better version of DBI::sqlAppendTable() that returns chunks of SQL with a predefined maximum length.

kmishra9 commented 1 year ago

It is "hard" to get data into redshift in ways that probably shouldn't be hard.

I think I said this in a way that doesn't indicate how much I appreciate all the work you guys have put into making DBI and dbplyr awesome to use and with tremendous cross-db compatibility. I only meant it feels harder to get things into Redshift, relative to other DBs (seemingly, at least haha... probably because copy_to() doesn't work as well).

Agree w/ your assessment of the problem & the most reasonable solution!

Do you think a different function or even graceful fallback of dbWriteTable() that relies on an upload to S3 first then a native Redshift COPY command for large datasets would ever be in scope to implement? This would be more complex than the existing implementation from a "process" and "number of things that could go wrong" perspective... but it would also could be be more efficient than generating a massive SQL query with all of the data.

krlmlr commented 3 months ago

Missed the question, sorry.

I think the upload to S3 plus COPY should live elsewhere. It's too different from what this package does (basically, wrapping libpq).