tidy-finance / website

This repository hosts the source code for the website tidy-finance.org
https://tidy-finance.org
Other
83 stars 47 forks source link

Have WRDS server do work for TRACE data #36

Closed iangow closed 9 months ago

iangow commented 1 year ago

Why not have the WRDS PostgreSQL server do all the work?

Steps:

Based on output in comment below, code seems to work fine. (Not sure if it delivers much performance benefit, but it saves on readers' electricity bills.)

Note that code in comment below does data aggregation in the database and is about 10 times faster than code using collect().

clean_enhanced_trace <- function(cusips, 
                                 connection, 
                                 start_date = as.Date("2002-01-01"), 
                                 end_date = today()) {

  nrow <- function(df) {
    df %>% count() %>% pull(n)
  }

  # Packages (required)
  library(tidyverse)
  library(lubridate)
  library(dbplyr)
  library(RPostgres)

  # Function checks ---------------------------------------------------------
  # Input parameters
  ## Cusips
  if(length(cusips) == 0 | any(is.na(cusips))) stop("Check cusips.")

  ## Dates
  if(!is.Date(start_date) | !is.Date(end_date)) stop("Dates needed")
  if(start_date < as.Date("2002-01-01")) stop("TRACE starts later.")
  if(end_date > today()) stop("TRACE does not predict the future.")
  if(start_date >= end_date) stop("Date conflict.")

  ## Connection
  if(!dbIsValid(connection)) stop("Connection issue.")

  # Enhanced Trace ----------------------------------------------------------
  # Main file
  trace_all <- tbl(connection, 
                   in_schema("trace", "trace_enhanced")) |> 
    filter(cusip_id %in% cusips) |>
    filter(trd_exctn_dt >= start_date & trd_exctn_dt <= end_date) |> 
    select(cusip_id, msg_seq_nb, orig_msg_seq_nb,
           entrd_vol_qt, rptd_pr, yld_pt, rpt_side_cd, cntra_mp_id,
           trd_exctn_dt, trd_exctn_tm, trd_rpt_dt, trd_rpt_tm, 
           pr_trd_dt, trc_st, asof_cd, wis_fl, 
           days_to_sttl_ct, stlmnt_dt, spcl_trd_fl)

  # Enhanced Trace: Post 06-02-2012 -----------------------------------------
  # Trades (trc_st = T) and correction (trc_st = R)
  trace_post_TR <- trace_all |> 
    filter((trc_st == "T" | trc_st == "R"),
           trd_rpt_dt >= as.Date("2012-02-06"))

  # Cancelations (trc_st = X) and correction cancelations (trc_st = C)
  trace_post_XC <- trace_all |>
    filter((trc_st == "X" | trc_st == "C"),
           trd_rpt_dt >= as.Date("2012-02-06"))

  # Cleaning corrected and cancelled trades
  trace_post_TR <- trace_post_TR |>
    anti_join(trace_post_XC,
              by = c("cusip_id", "msg_seq_nb", "entrd_vol_qt", 
                     "rptd_pr", "rpt_side_cd", "cntra_mp_id", 
                     "trd_exctn_dt", "trd_exctn_tm"))

  # Reversals (trc_st = Y)
  trace_post_Y <- trace_all |>
    filter(trc_st == "Y",
           trd_rpt_dt >= as.Date("2012-02-06"))

  # Clean reversals
  ## match the orig_msg_seq_nb of the Y-message to 
  ## the msg_seq_nb of the main message
  trace_post <- trace_post_TR |>
    anti_join(trace_post_Y,
              by = c("cusip_id", "msg_seq_nb" = "orig_msg_seq_nb", 
                     "entrd_vol_qt", "rptd_pr", "rpt_side_cd", 
                     "cntra_mp_id", "trd_exctn_dt", "trd_exctn_tm"))

  # Enhanced TRACE: Pre 06-02-2012 ------------------------------------------
  # Cancelations (trc_st = C)
  trace_pre_C <- trace_all |>
    filter(trc_st == "C",
           trd_rpt_dt < as.Date("2012-02-06"))

  # Trades w/o cancelations
  ## match the orig_msg_seq_nb of the C-message 
  ## to the msg_seq_nb of the main message
  trace_pre_T <- trace_all |>
    filter(trc_st == "T",
           trd_rpt_dt < as.Date("2012-02-06")) |>
    anti_join(trace_pre_C, 
              by = c("cusip_id", "msg_seq_nb" = "orig_msg_seq_nb", 
                     "entrd_vol_qt", "rptd_pr", "rpt_side_cd", 
                     "cntra_mp_id", "trd_exctn_dt", "trd_exctn_tm"))

  # Corrections (trc_st = W) - W can also correct a previous W
  trace_pre_W <- trace_all |>
    filter(trc_st == "W",
           trd_rpt_dt < as.Date("2012-02-06"))

  # Implement corrections in a loop
  ## Correction control
  correction_control <- nrow(trace_pre_W)
  correction_control_last <- nrow(trace_pre_W)

  ## Correction loop
  while(correction_control > 0) {
    # Corrections that correct some msg
    trace_pre_W_correcting <- trace_pre_W |>
      semi_join(trace_pre_T, 
                by = c("cusip_id", "trd_exctn_dt",
                       "orig_msg_seq_nb" = "msg_seq_nb"))

    # Corrections that do not correct some msg
    trace_pre_W <- trace_pre_W |>
      anti_join(trace_pre_T, 
                by = c("cusip_id", "trd_exctn_dt",
                       "orig_msg_seq_nb" = "msg_seq_nb"))

    # Delete msgs that are corrected and add correction msgs
    trace_pre_T <- trace_pre_T |>
      anti_join(trace_pre_W_correcting, 
                by = c("cusip_id", "trd_exctn_dt",
                       "msg_seq_nb" = "orig_msg_seq_nb")) |>
      union_all(trace_pre_W_correcting) 

    # Escape if no corrections remain or they cannot be matched
    correction_control <- nrow(trace_pre_W)
    if(correction_control == correction_control_last) {
      correction_control <- 0 
    }
    correction_control_last <- nrow(trace_pre_W)
  }

  # Clean reversals
  ## Record reversals
  trace_pre_R <- trace_pre_T |>
    filter(asof_cd == 'R') |>
    group_by(cusip_id, trd_exctn_dt, entrd_vol_qt, 
             rptd_pr, rpt_side_cd, cntra_mp_id) |>
    window_order(trd_exctn_tm, trd_rpt_dt, trd_rpt_tm) |>
    mutate(seq = row_number()) |>
    ungroup()

  ## Remove reversals and the reversed trade
  trace_pre <- trace_pre_T |> 
    filter(is.na(asof_cd) | !(asof_cd %in% c('R', 'X', 'D'))) |> 
    group_by(cusip_id, trd_exctn_dt, entrd_vol_qt, 
             rptd_pr, rpt_side_cd, cntra_mp_id) |> 
    window_order(trd_exctn_tm, trd_rpt_dt, trd_rpt_tm) |> 
    mutate(seq = row_number()) |> 
    ungroup() |> 
    anti_join(trace_pre_R,
              by = c("cusip_id", "trd_exctn_dt", "entrd_vol_qt", 
                     "rptd_pr", "rpt_side_cd", "cntra_mp_id", "seq")) |> 
    select(-seq)

  # Agency trades -----------------------------------------------------------
  # Combine pre and post trades
  trace_clean <- trace_post |> 
    union_all(trace_pre)

  # Keep angency sells and unmatched agency buys
  ## Agency sells
  trace_agency_sells <- trace_clean |> 
    filter(cntra_mp_id == "D",
           rpt_side_cd == "S")

  # Agency buys that are unmatched
  trace_agency_buys_filtered <- trace_clean |> 
    filter(cntra_mp_id == "D",
           rpt_side_cd == "B") |> 
    anti_join(trace_agency_sells, 
              by = c("cusip_id", "trd_exctn_dt", 
                     "entrd_vol_qt", "rptd_pr"))

  # Agency clean
  trace_clean <- trace_clean |> 
    filter(cntra_mp_id == "C")  |> 
    union_all(trace_agency_sells) |> 
    union_all(trace_agency_buys_filtered) 

  # Additional Filters ------------------------------------------------------
  trace_add_filters <- trace_clean |> 
    mutate(days_to_sttl_ct2 = stlmnt_dt - trd_exctn_dt) |> 
    filter(is.na(days_to_sttl_ct) | as.numeric(days_to_sttl_ct) <= 7,
           is.na(days_to_sttl_ct2) | as.numeric(days_to_sttl_ct2) <= 7,
           wis_fl == "N",
           is.na(spcl_trd_fl) | spcl_trd_fl == "",
           is.na(asof_cd) | asof_cd == "")

  # Output ------------------------------------------------------------------
  # Only keep necessary columns
  trace_final <- 
    trace_add_filters |> 
    select(cusip_id, trd_exctn_dt, trd_exctn_tm, 
           rptd_pr, entrd_vol_qt, yld_pt, rpt_side_cd, cntra_mp_id) 

  # Return
  return(trace_final |> 
           arrange(cusip_id, trd_exctn_dt, trd_exctn_tm) %>%
           collect())
}

wrds <- dbConnect(
  Postgres(),
  host = "wrds-pgdata.wharton.upenn.edu",
  dbname = "wrds",
  port = 9737,
  sslmode = "require",
  bigint = "integer")

tidy_finance <- dbConnect(
  duckdb::duckdb(),
  "data/tidy_finance.duckdb",
  read_only = FALSE
)

mergent_cusips <- 
  tbl(tidy_finance, "mergent") |>
  pull(complete_cusip)

mergent_parts <- split(
  mergent_cusips,
  rep(1:100, 
      length.out = length(mergent_cusips))
)

for (j in 1:length(mergent_parts)) {
  trace_enhanced <- clean_enhanced_trace(
    cusips = mergent_parts[[j]],
    connection = wrds,
    start_date = ymd("2014-01-01"),
    end_date = ymd("2016-11-30")
  )
  print(paste("Starting", j))
  dbWriteTable(
    conn = tidy_finance,
    name = "trace_enhanced",
    value = trace_enhanced,
    overwrite = ifelse(j == 1, TRUE, FALSE),
    append = ifelse(j != 1, TRUE, FALSE))
  print(paste("Finishing", j))
}

dbDisconnect(wrds)
dbDisconnect(tidy_finance, shutdown = TRUE)
iangow commented 1 year ago

Tweaks to get code to work with database:

library(tidyverse)
library(DBI)

tidy_finance <- dbConnect(
  duckdb::duckdb(),
  "data/tidy_finance.duckdb",
  read_only = TRUE)

mergent <- tbl(tidy_finance, "mergent")

cusips <- mergent %>% pull(complete_cusip)

bonds_outstanding <- 
  expand_grid(date = seq(ymd("2014-01-01"),
                           ymd("2016-11-30"), 
                           by = "quarter"), 
              complete_cusip = cusips) |> 
  copy_to(tidy_finance, df = _, name = "cusips") |>
  left_join(mergent |> select(complete_cusip, 
                              offering_date,
                              maturity), 
            by = "complete_cusip") |> 
  mutate(offering_date = floor_date(offering_date),
         maturity = floor_date(maturity)) |> 
  filter(date >= offering_date & date <= maturity) |> 
  count(date) |> 
  mutate(type = "Outstanding")

trace_enhanced <- tbl(tidy_finance, "trace_enhanced") 

bonds_traded <- 
  trace_enhanced |> 
  mutate(date = floor_date(trd_exctn_dt, "quarters")) |> 
  group_by(date) |> 
  summarize(n = n_distinct(cusip_id),
            type = "Traded",
            .groups = "drop") 

bonds_outstanding |> 
  union_all(bonds_traded) |> 
  ggplot(aes(
    x = date, 
    y = n, 
    color = type, 
    linetype = type
  )) +
  geom_line() +
  labs(
    x = NULL, y = NULL, color = NULL, linetype = NULL,
    title = "Number of bonds outstanding and traded each quarter"
  )


mergent |>
  mutate(maturity = as.numeric(maturity - offering_date) / 365,
         offering_amt = offering_amt / 10^3) |> 
  pivot_longer(cols = c(maturity, coupon, offering_amt),
               names_to = "measure") |>
  group_by(measure) |>
  summarize(
    mean = mean(value, na.rm = TRUE),
    sd = sd(value, na.rm = TRUE),
    min = min(value, na.rm = TRUE),
    q05 = quantile(value, 0.05, na.rm = TRUE),
    q50 = quantile(value, 0.50, na.rm = TRUE),
    q95 = quantile(value, 0.95, na.rm = TRUE),
    max = max(value, na.rm = TRUE)
  )
#> # Source:   SQL [3 x 8]
#> # Database: DuckDB 0.7.1 [root@Darwin 22.5.0:R 4.3.0/data/tidy_finance.duckdb]
#>   measure        mean     sd     min   q05   q50     q95    max
#>   <chr>         <dbl>  <dbl>   <dbl> <dbl> <dbl>   <dbl>  <dbl>
#> 1 coupon         4.15   3.72 0       0      4.88    9.62    39 
#> 2 offering_amt 190.   419.   0       0.349 12.2  1000    15000 
#> 3 maturity       7.29   8.20 0.00822 1.03   5.01   30.0    100.

trace_enhanced |> 
  group_by(trd_exctn_dt) |> 
  summarize(trade_size = sum(entrd_vol_qt * rptd_pr / 100, na.rm = TRUE) / 10^6,
            trade_number = n(),
            .groups = "drop") |> 
  pivot_longer(cols = c(trade_size, trade_number),
               names_to = "measure") |> 
  group_by(measure) |>
  summarize(
    mean = mean(value, na.rm = TRUE),
    sd = sd(value, na.rm = TRUE),
    min = min(value, na.rm = TRUE),
    q05 = quantile(value, 0.05, na.rm = TRUE),
    q50 = quantile(value, 0.50, na.rm = TRUE),
    q95 = quantile(value, 0.95, na.rm = TRUE),
    max = max(value, na.rm = TRUE)
  )
#> # Source:   SQL [2 x 8]
#> # Database: DuckDB 0.7.1 [root@Darwin 22.5.0:R 4.3.0/data/tidy_finance.duckdb]
#>   measure        mean    sd   min    q05    q50    q95    max
#>   <chr>         <dbl> <dbl> <dbl>  <dbl>  <dbl>  <dbl>  <dbl>
#> 1 trade_number 25921. 5460. 438   17851. 26025  34458. 40889 
#> 2 trade_size   12968. 3574.  17.2  6138. 13408. 17851. 20905.

dbDisconnect(tidy_finance, shutdown = TRUE)

Created on 2023-05-10 with reprex v2.0.2

christophscheuch commented 9 months ago

Thank you for the extensive proposal and sorry for the delayed response. We decided to keep the code as it is, namely downloading the raw TRACE data and cleaning it in memory. We did not find a consistent performance improvement by moving the pipelines to the WRDS server when we coded the cleaning procedure. On the contrary, the current in-memory solution was actually faster. And optimizing the electricity bill of users is currently not on our agenda ;)

We also decided against using duckdb for aggregation because (i) not everybody might have access to duckdb as opposed to sqlite (e.g. companies) and (ii) one big advantage of duckdb (the postgres scanner extension) is not consistently available for Windows users.

iangow commented 9 months ago

Makes sense. A couple of notes:

  1. Your code already has the WRDS server do some of "the work" even if it's just some filtering and selecting variables. So this is going to make it tough to beat.
  2. The inability to compute() (in SQL, CREATE TEMPORARY TABLE) makes it very hard to have a good WRDS server–side solution. The query will get very big and unwieldy fast.

Regarding DuckDB, I wonder how restrictive this is in practice. Both SQLite (install.packages("RSQLite")) and DuckDB (install.packages("duckdb")) are user-installed R packages (i.e., not part of base R). Perhaps more IT departments have white-listed SQLite for various reasons. I do note that DuckDB is 81MB (very big by CRAN standards) because of the way they put everything in the one package.

I think a more robust approach than what I proposed earlier might put the data in separate parquet files. However this adds a little overhead to users in terms of understanding how to load them and seems not right for the mass of readers of Tidy Finance.

For my own book, I made a different design choice. To keep each chapter independent, I do not have any local data storage and use the WRDS server a lot more (i.e., collect() later). This has its own downsides, so I offer two alternative paths forward for users looking to store data. Both of these are in appendices.

  1. Local PostgreSQL server
  2. Local parquet data repository

The latter option yields a data repository of about 9GB that could be stuck in Dropbox and shared with co-authors. I suspect that anything using an SQLite (or DuckDB) database would make it difficult to share data with co-authors if there's any chance of writing and reading at the same time.