mikejohnson51 / opendap.catalog

Flexible backend for getting data from Web and local NetCDF resources into R
https://mikejohnson51.github.io/opendap.catalog
Other
7 stars 2 forks source link

Elastic Search of JSON and RDS #10

Open mikejohnson51 opened 2 years ago

mikejohnson51 commented 2 years ago

With a data catalog of 14,000 + resources, I invited my colleague @program-- to look into methods for providing more fluid, fuzzy, or elastic search over the JSON resource (and ideally package RDS).

Key searches to start include:

I would also appreciate @program-- eyes on the parallel capabilities added with future. See here

https://github.com/mikejohnson51/opendap.catalog/blob/9bf933ee807788543d6054e4b8ed3c760ce161bb/R/get_dap_utils.R#L375

The foreachimplementation in climateR now raises occasional but not infrequent issues with FORK processes and crashes everything.

This future approach seems nice however it can give very different results depending on run.

See the last build of the vignette here where the multisession approach took 30 seconds while the sequential took 2. (I have seen the opposite results too haha).

I hope to document work towards these issues here.

mikejohnson51 commented 2 years ago

If you want to make changes to the JSON file please do so in: https://github.com/mikejohnson51/opendap.catalog/blob/master/data-raw/06_merge.R

program-- commented 2 years ago

In reference to the function you created, here's mine if you want to compare, as well as additional benchmarks to show timings:

#' Search through catalog metadata
#' @param query A general, key/value, and/or regex query string.
#' @param ... Unused.
#' @param .dist Distance number for fuzzy matching, 0 meaning exact match.
#' @param .pagesize Number of rows per iteration to search.
#' @param .future If `TRUE`, uses `future` as the execution engine.
search <- function(query, ..., .dist = 0.25, .pagesize = 1000L, .future = FALSE) {
    `%<-%` <- if (.future) future::`%<-%` else `<<-`

    # internal function for fuzzy-ish searching/matching
    .fuzzy <- function(chars, query) {
        # variable search is found in
        # the query if `varsearch` is TRUE
        varsearch      <- grepl("=", query)
        qsplits        <- tolower(unlist(strsplit(query, " "), TRUE, FALSE))
        .matches       <- list()
        varfn          <- function(x, tbl) !is.na(match(x, tbl))
        .matches$fuzzy <- agrepl(query, chars, .dist, ignore.case = TRUE)
        .matches$regex <- grepl(query, chars, ignore.case = TRUE)
        .matches$vars  <- unlist(
            lapply(qsplits, varfn, tolower(chars)),
            recursive = FALSE,
            use.names = FALSE
        )

        if (varsearch) .matches$vars else do.call(any, .matches)
    }

    # internal function for paging the json stream
    .query <- function(x) {
        .count <<- .count + 1
        .envir[[as.character(.count)]] %<-% {
            nms     <- names(x)
            splits  <- split(x, seq(nrow(x)))
            chars   <- lapply(splits, function(y) paste0(nms, "=", y))
            indices <- which(unlist(
                lapply(chars, .fuzzy, query),
                recursive = FALSE,
                use.names = FALSE
            ))

            if (length(indices) > 0L) x[indices, ]
        }
    }

    .count    <- 0
    .envir    <- new.env()
    .lapply   <- if (.future) future.apply::future_lapply else lapply

    # Set up page intervals
    .rows     <- nrow(params)
    .interval <- seq(from = 1, to = .rows)
    .init     <- floor(.rows / .pagesize)
    .groups   <- rep(seq_len(.init), each = .pagesize)
    if (length(.groups) < .rows) {
        .diff   <- .rows - length(.groups)
        .groups <- c(.groups, rep.int(.init + 1, times = .diff))
    }
    .interval <- split(.interval, .groups)

    .lapply(.interval, function(i) .query(params[i, ]))
    .envir <- as.list(.envir)
    .envir <- .envir[as.character(order(as.numeric(names(.envir))))]
    do.call(rbind, .envir)
}

Benchmarks:

Unit: seconds
    expr      min       lq     mean   median       uq      max neval
   G0250 2.784287 2.856395 2.897190 2.928502 2.953641 2.978779     3
   G0500 2.789368 2.793407 2.807125 2.797445 2.816004 2.834563     3
   G1000 2.626979 2.634372 2.675058 2.641765 2.699097 2.756430     3
   V0250 2.461048 2.491321 2.527163 2.521593 2.560220 2.598847     3
   V0500 2.493942 2.499361 2.505370 2.504780 2.511084 2.517388     3
   V1000 2.383385 2.422683 2.451187 2.461981 2.485089 2.508196     3
   R0250 2.181788 2.195992 2.206203 2.210196 2.218411 2.226626     3
   R0500 2.157164 2.159326 2.178047 2.161488 2.188489 2.215490     3
   R1000 2.009861 2.080992 2.142733 2.152123 2.209169 2.266215     3
 F_G0250 3.652841 3.673079 3.803278 3.693316 3.878496 4.063676     3
 F_G0500 3.175254 3.250813 3.283014 3.326371 3.336894 3.347418     3
 F_G1000 2.938425 2.978484 3.011635 3.018542 3.048240 3.077938     3
 F_V0250 3.373412 3.506352 3.575785 3.639293 3.676971 3.714650     3
 F_V0500 2.881009 3.020036 3.116401 3.159063 3.234096 3.309130     3
 F_V1000 2.647028 2.814814 2.892532 2.982600 3.015285 3.047969     3
 F_R0250 3.136308 3.139510 3.147659 3.142711 3.153335 3.163959     3
 F_R0500 2.510540 2.590953 2.658578 2.671366 2.732598 2.793830     3
 F_R1000 2.496520 2.499216 2.544016 2.501912 2.567763 2.633615     3

Params:

# general search  : G
# variable search : V
# regex search    : R
# page sizes = [250, 500, 1000]
microbenchmark::microbenchmark(
    G0250 = opendap.catalog::search("Specific Humidity", .pagesize = 250L),
    G0500 = opendap.catalog::search("Specific Humidity", .pagesize = 500L),
    G1000 = opendap.catalog::search("Specific Humidity", .pagesize = 1000L),
    V0250 = opendap.catalog::search("variable=huss", .pagesize = 250L),
    V0500 = opendap.catalog::search("variable=huss", .pagesize = 500L),
    V1000 = opendap.catalog::search("variable=huss", .pagesize = 1000L),
    R0250 = opendap.catalog::search(".*humidity", .pagesize = 250L),
    R0500 = opendap.catalog::search(".*humidity", .pagesize = 500L),
    R1000 = opendap.catalog::search(".*humidity", .pagesize = 1000L),
    F_G0250 = opendap.catalog::search("Specific Humidity", .pagesize = 250L, .future = TRUE),
    F_G0500 = opendap.catalog::search("Specific Humidity", .pagesize = 500L, .future = TRUE),
    F_G1000 = opendap.catalog::search("Specific Humidity", .pagesize = 1000L, .future = TRUE),
    F_V0250 = opendap.catalog::search("variable=huss", .pagesize = 250L, .future = TRUE),
    F_V0500 = opendap.catalog::search("variable=huss", .pagesize = 500L, .future = TRUE),
    F_V1000 = opendap.catalog::search("variable=huss", .pagesize = 1000L, .future = TRUE),
    F_R0250 = opendap.catalog::search(".*humidity", .pagesize = 250L, .future = TRUE),
    F_R0500 = opendap.catalog::search(".*humidity", .pagesize = 500L, .future = TRUE),
    F_R1000 = opendap.catalog::search(".*humidity", .pagesize = 1000L, .future = TRUE),
    times = 3L
)

I think that yours (based on code alone) is probably faster and more robust :smile:

mikejohnson51 commented 2 years ago

👋 @program-- thanks for this!! I tested a bit and they are pretty equal (existing search is a hair faster).

Where there is a difference is in more open ended requests like "monthly NLDAS VIC SWE". Thats were the current implementation is "better". BUT, thats cause I split the words at spaces so it might be transferable to yours?

mikejohnson51 commented 2 years ago

So, here the first glitch with the new search :) I am not sure why the firs doesn't catch both variables as equally ranked:

library(opendap.catalog)

search("maca daily huss pr bnu-esm")
#> # A tibble: 3 × 16
#>   id       grid.id URL     tiled variable varname long_name units model ensemble
#>   <chr>    <chr>   <chr>   <chr> <chr>    <chr>   <chr>     <chr> <chr> <chr>   
#> 1 maca_day 167     http:/… T     huss     specif… Daily Me… kg k… BNU-… r1i1p1  
#> 2 maca_day 167     http:/… T     huss     specif… Daily Me… kg k… BNU-… r1i1p1  
#> 3 maca_day 167     http:/… T     huss     specif… Daily Me… kg k… BNU-… r1i1p1  
#> # … with 6 more variables: scenario <chr>, T_name <chr>, duration <chr>,
#> #   interval <chr>, nT <int>, rank <dbl>

rbind(search("maca daily huss bnu-esm"),
      search("maca daily pr bnu-esm"))
#> # A tibble: 6 × 16
#>   id       grid.id URL     tiled variable varname long_name units model ensemble
#>   <chr>    <chr>   <chr>   <chr> <chr>    <chr>   <chr>     <chr> <chr> <chr>   
#> 1 maca_day 167     http:/… T     huss     specif… Daily Me… kg k… BNU-… r1i1p1  
#> 2 maca_day 167     http:/… T     huss     specif… Daily Me… kg k… BNU-… r1i1p1  
#> 3 maca_day 167     http:/… T     huss     specif… Daily Me… kg k… BNU-… r1i1p1  
#> 4 maca_day 167     http:/… T     pr       precip… Precipit… mm    BNU-… r1i1p1  
#> 5 maca_day 167     http:/… T     pr       precip… Precipit… mm    BNU-… r1i1p1  
#> 6 maca_day 167     http:/… T     pr       precip… Precipit… mm    BNU-… r1i1p1  
#> # … with 6 more variables: scenario <chr>, T_name <chr>, duration <chr>,
#> #   interval <chr>, nT <int>, rank <dbl>

Created on 2022-03-03 by the reprex package (v2.0.1)

mikejohnson51 commented 2 years ago

I think what we want here is a weighting grid. For example exact matches on id variable or varname should be worth more the partial matches in long_name ...

program-- commented 2 years ago

@mikejohnson51 This may be useful -- I did some digging into your implementation and modified it a bit to support the variable search syntax I had above. I was able to speed up the implementation:

query = "maca daily huss pr bnu-esm"
microbenchmark(search(query), search2(query), times = 10L)
#> Unit: milliseconds
#>            expr       min        lq      mean    median        uq      max neval
#>   search(query) 2242.6958 2263.4006 2314.7720 2308.5124 2375.0440 2387.668    10
#>  search2(query)  887.1193  895.4158  926.0967  907.3915  929.0791 1044.221    10

but, this modified implementation might be a bit too coarse, since:

search2(query)
#>  # A tibble: 10,749 x 16
#>    id     grid.id URL      tiled variable varname long_name  units model ensemble
#>    <chr>  <chr>   <chr>    <chr> <chr>    <chr>   <chr>      <chr> <chr> <chr>   
#>  1 hawai… 71      https:/… ""    nudp     nudp    number of… NA    NA    NA      
#>  2 hawai… 71      https:/… ""    nusf     nusf    number of… NA    NA    NA      
#>  3 hawai… 71      https:/… ""    nuvdp    nuvdp   number of… NA    NA    NA      
#>  4 hawai… 71      https:/… ""    nuvsf    nuvsf   number of… NA    NA    NA      
#>  5 hawai… 71      https:/… ""    nvdp     nvdp    number of… NA    NA    NA      
#>  6 hawai… 71      https:/… ""    nvsf     nvsf    number of… NA    NA    NA      
#>  7 hawai… 71      https:/… ""    sudp     sudp    standard … NA    NA    NA      
#>  8 hawai… 71      https:/… ""    susf     susf    standard … NA    NA    NA      
#>  9 hawai… 71      https:/… ""    suvdp    suvdp   standard … NA    NA    NA      
#> 10 hawai… 71      https:/… ""    suvsf    suvsf   standard … NA    NA    NA      
#> # … with 10,739 more rows, and 6 more variables: scenario <chr>, T_name <chr>,
#> #   duration <chr>, interval <chr>, nT <int>, rank <dbl>

Anyways, this might help in handling the weighted grid issue (or just add more to your workload 😅 -- here's the modified implementation, which I only modified .query():

.query <- function(x, query, subs = NULL) {
  query <- gsub("daily", "day", query)
  query <- gsub("monthly", "month", query)
  query <- gsub("hourly", "hour", query)

  if (is.null(subs)) {
    subs <- x
  }

  q <- strsplit(query, " ")[[1]]

  # `varsearch` checks if any query tokens match the
  # variable search syntax, with the correct column
  # names, and gets its index in `q`.
  varsearch <- grep(
      paste0("[", paste(names(subs), collapse = "|"), "]=(.*)"),
      q
  )

  # `dists` will be a ncol(subs) length list of length(q) x nrow(subs) matrices
  dists <- lapply(
    subs,
    adist,
    x = q,
    ignore.case = TRUE,
    partial = TRUE
  )

  # `framer()` converts `dists` into nrow(subs) x length(q) data tables
  # `ranks` will be a single nrow(subs) x length(q)*ncol(subs) data table
  # `indices` will be a nrow(subs) length vector of minimum row distances
  framer  <- function(x) data.table::as.data.table(t(x))
  ranks   <- do.call(cbind, lapply(dists, framer))
  indices <- apply(ranks, 1, min, na.rm = TRUE)

  if (length(varsearch) > 0) {
    # if there are any variable search syntax queries,
    # this will find them, and set the corresponding
    # element in `indices` to 0 (indicating a match)
    tokens  <- strsplit(q, "=")[varsearch]
    vsearch <- function(x) grep(x[2], subs[[x[1]]])
    matches <- unique(unlist(lapply(tokens, vsearch)))
    indices[matches] <- 0

    # to make this more strict, you can set the other
    # elements not match to a bigger number, like 10
    # so that those rows do not show up in the return
    #> indices[-matches] <- 10
  }

  x$rank <- indices

  if (min(indices) > length(q) * 3) {
    warning("No likely matches found.")
  }

  x <- x[x$rank == min(indices), ]

  x[order(x$rank), ]
}