DyfanJones / noctua

Connect R to Athena using paws SDK (DBI Interface)
https://dyfanjones.github.io/noctua/
Other
45 stars 5 forks source link

Support AWS Athena UNLOAD #160

Closed DyfanJones closed 2 years ago

DyfanJones commented 2 years ago

Aws Athena support UNLOAD which allows Athena to write out different file type i.e. parquet.

This will allow noctua/RAthena to utilise AWS Athena Unload queries and read the parquet format similar to how AWS Data Wrangler currently does (NOTE: AWS Data Wrangler wraps queries with CTAS (https://docs.aws.amazon.com/athena/latest/ug/ctas.html)

AWS Data Wrangler example with Pros/Cons to their current CTAS method https://aws-data-wrangler.readthedocs.io/en/stable/tutorials/006%20-%20Amazon%20Athena.html

DyfanJones commented 2 years ago

Set up awswrangler example for noctua benchmarks

import awswrangler as wr

import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/data/"

if "awswrangler_test" not in wr.catalog.databases().values:
    wr.catalog.create_database("awswrangler_test")

cols = ["id", "dt", "element", "value", "m_flag", "q_flag", "s_flag", "obs_time"]

df = wr.s3.read_csv(
    path="s3://noaa-ghcn-pds/csv/189",
    names=cols,
    parse_dates=["dt", "obs_time"])  # Read 10 files from the 1890 decade (~1GB)

wr.s3.to_parquet(
    df=df,
    path=path,
    dataset=True,
    mode="overwrite",
    database="awswrangler_test",
    table="noaa"
);

wr.catalog.table(database="awswrangler_test", table="noaa")

Initial benchmark testing:

remotes::install_github("dyfanjones/noctua", ref="parquet_unload")

library(DBI)

con <- dbConnect(noctua::athena())

# AWS Athena result is outputed as CSV and then read into R.
system.time({
  df = dbGetQuery(con, "SELECT * FROM awswrangler_test.noaa")
})
# Info: (Data scanned: 80.88 MB)
# user  system elapsed 
# 81.819  38.409 667.637 

# AWS Athena result is outputed as Parquet and then read into R.
system.time({
  df = dbGetQuery(con, "SELECT * FROM awswrangler_test.noaa", unload = T)
})
# Info: (Data scanned: 80.88 MB)
# user  system elapsed 
# 17.899   2.992  57.022 
DyfanJones commented 2 years ago

When caching is enable extra time performance is gained:

noctua::noctua_options(cache_size = 1)

# AWS Athena result is outputed as Parquet and then read into R.
system.time({
  df = dbGetQuery(con, "SELECT * FROM awswrangler_test.noaa", unload = T)
})
# Info: (Data scanned: 80.88 MB)
# user  system elapsed 
# 17.899   2.992  57.022 

system.time({
  df = dbGetQuery(con, "SELECT * FROM awswrangler_test.noaa", unload = T)
})

# Info: (Data scanned: 80.88 MB)
# user  system elapsed 
# 14.094   3.117  34.663 
DyfanJones commented 2 years ago

The unload method does come with some down side. Possibly create a vignette and document pros and cons of both methods, so that users are fully aware of what to expect.

DyfanJones commented 2 years ago

Benchmark running on sagemaker ml.t3.xlarge instance:

library(DBI)

con <- dbConnect(noctua::athena())

# Query ran using CSV output
system.time({
  df = dbGetQuery(con, "SELECT * FROM awswrangler_test.noaa")
})
# Info: (Data scanned: 80.88 MB)
#    user  system elapsed
#  57.004   8.430 160.567 

noctua::noctua_options(cache_size = 1)

# Query ran using UNLOAD Parquet output
system.time({
  df = dbGetQuery(con, "SELECT * FROM awswrangler_test.noaa", unload = T)
})
# Info: (Data scanned: 80.88 MB)
#    user  system elapsed 
#  21.622   2.350  39.232 

# Query ran using cache
system.time({
  df = dbGetQuery(con, "SELECT * FROM awswrangler_test.noaa", unload = T)
})
# Info: (Data scanned: 80.88 MB)
#    user  system elapsed 
#  17.514   2.430  14.249 

# Query ran using cache with unload parameter not specified
system.time({
  df = dbGetQuery(con, "SELECT * FROM awswrangler_test.noaa")
})
# Info: (Data scanned: 80.88 MB)
#    user  system elapsed 
#  15.319   2.445  11.924 

Results:

csv: seconds:160.567

unload: seconds: 39.232 4X faster

cache unload: seconds:11.924 13.4X faster

noctua: 2.2.0.9000 R: 4.1.1

DyfanJones commented 2 years ago

Comparison to awswrangler:

Note: Benchmarks ran on sagemaker ml.t3.xlarge instance:

import awswrangler as wr

# Query with AWS Athena csv output
%%time
wr.athena.read_sql_query("SELECT * FROM noaa", database="awswrangler_test", ctas_approach=False)
# CPU times: user 1min 30s, sys: 5.78 s, total: 1min 36s
# Wall time: 3min 9s
# seconds: 189

# Query with AWS Athena Parquet output
%%time
wr.athena.read_sql_query("SELECT * FROM noaa", database="awswrangler_test")
# CPU times: user 1min 3s, sys: 6.53 s, total: 1min 10s
# Wall time: 1min 32s
# seonds: 92

# Query with AWS Athena Parquet output
%%time
wr.athena.read_sql_query("SELECT * FROM noaa", database="awswrangler_test", categories=["id", "dt", "element", "value", "m_flag", "q_flag", "s_flag", "obs_time"])
# CPU times: user 18.1 s, sys: 2.21 s, total: 20.3 s
# Wall time: 35.9 s
# seconds: 35.9

Results:

csv: seconds: 189

ctas: seonds: 92 2.05X faster

ctas with categories: seconds: 35.9 5.26X faster

awswrangler version: 2.11.0 python: 3.6.13

DyfanJones commented 2 years ago

Closing issues as AWS Athena Unload has been added the RAthena and noctua.

OssiLehtinen commented 2 years ago

Great stuff!