ropensci / elastic

R client for the Elasticsearch HTTP API
https://docs.ropensci.org/elastic
Other
245 stars 58 forks source link

'docs_bulk'-functionality for ingest attachment-plugin ('pipeline_attachment') ? #253

Closed Aeilert closed 4 years ago

Aeilert commented 5 years ago

I have a question about pushing documents in bulk with the ingest attachment-plugin. This used to work by setting an additional parameter, query = 'pipeline=attachment', in docs_bulk (tested with version 0.8.4), but no longer seems to work with the current version of the package.

When using docs_bulk with a pipeline like the one below, the data is pushed through to Elasticsearch, but not using the plugin. The result is a an index containing a base64-encoded data-field, and not a list of fulltext-fields like you would expect.

This does not work ```r # Create ingest attachment pipeline body.pipeline <- '{ "description" : "Extract attachment information", "processors" : [ { "attachment" : { "field" : "data", "target_field": "fulltext", "indexed_chars" : -1, "on_failure" : [ { "set" : { "field" : "error", "value" : "{{ _ingest.on_failure_message }}" } } ] }, "remove": { "field": "data" } } ] }' pipeline_create(es.con, id = "attachment", body = body.pipeline) ``` ```r # Create test-index index_create(es.con, index = "myindex") # List of base64-encoded documents w/ some metadata docs <- list( list(data = "e1xydGYxXGFuc2kNCkxvcmVtIGlwc3VtIGRvbG9yIHNpdCBhbWV0DQpccGFyIH0=", category = "lorem ipsum"), list(data = "aGVsbG8gd29ybGQgaGVsbG8gd29ybGQ=", category = "hello world") ) # Push documents to Elastic docs_bulk(conn = es.con, x = docs, index = "myindex", type = '_doc', doc_ids = 1:2, es_ids = FALSE, query = 'pipeline=attachment') ``` ```r # Data was not pushed correctly Search(es.con,"myindex") ... $hits$hits $hits$hits[[1]] $hits$hits[[1]]$`_index` [1] "myindex" $hits$hits[[1]]$`_type` [1] "_doc" $hits$hits[[1]]$`_id` [1] "1" $hits$hits[[1]]$`_score` [1] 1 $hits$hits[[1]]$`_source` $hits$hits[[1]]$`_source`$data [1] "e1xydGYxXGFuc2kNCkxvcmVtIGlwc3VtIGRvbG9yIHNpdCBhbWV0DQpccGFyIH0=" $hits$hits[[1]]$`_source`$category [1] "lorem ipsum" ```

I could of course use pipeline_attachment but I have several thousands files and want to take advantage of the bulk API. Maybe this could be solved with a docs_bulk wrapper for pipeline_attachment? Or just a parameter to add 'pipeline=attachment' to the POST-statement (not sure why passing query-option to cruldoesn't work)?

As an example of the functionality I'm looking for I created a simple wrapper-function for pipeline_attachment. I'm not saying this should be the solution. It's just to illustrate the functionality.

This does work ```r DocsBulkAttachment <- function (conn, x, index = NULL, type = NULL, chunk_size = 1000, doc_ids = NULL, es_ids = TRUE, raw = FALSE, quiet = FALSE, pipeline = 'attachment', sleep = 1, ...) { elastic:::is_conn(conn) elastic:::assert(quiet, "logical") if (is.null(index)) { stop("index can't be NULL when passing a list", call. = FALSE) } if (is.null(type)) type <- "_doc" #index elastic:::check_doc_ids(x, doc_ids) if (is.factor(doc_ids)) doc_ids <- as.character(doc_ids) x <- unname(x) x <- elastic:::check_named_vectors(x) rws <- seq_len(length(x)) data_chks <- split(rws, ceiling(seq_along(rws)/chunk_size)) if (!is.null(doc_ids)) { id_chks <- split(doc_ids, ceiling(seq_along(doc_ids)/chunk_size)) } resl <- vector(mode = "list", length = length(data_chks)) for (i in seq_along(data_chks)) { if (!quiet) { pb <- txtProgressBar(min = 0, max = length(data_chks[[i]]), initial = 0, style = 3) on.exit(close(pb)) } resl2 <- vector(mode = "list", length = length(data_chks[[i]])) for(y in seq_along(data_chks[[i]])){ resl2[[y]] <- pipeline_attachment(conn, index = index, type = type, pipeline = pipeline, body = x[data_chks[[i]]][[y]], id = id_chks[[i]][y]) if (!quiet) setTxtProgressBar(pb, y) } resl[[i]] <- resl2 Sys.sleep(sleep) } return(resl) } ``` ```r index_create(es.con, index = "myindex2") DocsBulkAttachment(es.con, index = "myindex2", x = docs, type = '_doc', doc_ids = 1:2, pipeline = "attachment") ``` ```r # Data was pushed correctly Search(es.con,"myindex2") ... $hits$hits $hits$hits[[1]] $hits$hits[[1]]$`_index` [1] "myindex2" $hits$hits[[1]]$`_type` [1] "_doc" $hits$hits[[1]]$`_id` [1] "1" $hits$hits[[1]]$`_score` [1] 1 $hits$hits[[1]]$`_source` $hits$hits[[1]]$`_source`$fulltext $hits$hits[[1]]$`_source`$fulltext$content_type [1] "application/rtf" $hits$hits[[1]]$`_source`$fulltext$language [1] "ro" $hits$hits[[1]]$`_source`$fulltext$content [1] "Lorem ipsum dolor sit amet" $hits$hits[[1]]$`_source`$fulltext$content_length [1] 28 $hits$hits[[1]]$`_source`$category [1] "lorem ipsum" ```

I'm using R 3.5.3. and Elasticsearch 7.0.0 w/ Docker. I have installed the ingest-attachment plugin. See below for other session info.

Session Info ```r R version 3.5.3 (2019-03-11) Platform: x86_64-apple-darwin15.6.0 (64-bit) Running under: macOS Mojave 10.14.3 Matrix products: default BLAS: /System/Library/Frameworks/Accelerate.framework/Versions/A/Frameworks/vecLib.framework/Versions/A/libBLAS.dylib LAPACK: /Library/Frameworks/R.framework/Versions/3.5/Resources/lib/libRlapack.dylib locale: [1] en_US.UTF-8/en_US.UTF-8/en_US.UTF-8/C/en_US.UTF-8/en_US.UTF-8 attached base packages: [1] stats graphics grDevices utils datasets methods base other attached packages: [1] elastic_1.0.0.9100 loaded via a namespace (and not attached): [1] compiler_3.5.3 R6_2.4.0 tools_3.5.3 httpcode_0.2.0 curl_3.3 Rcpp_1.0.1 urltools_1.7.3 triebeard_0.3.0 crul_0.7.4 [10] jsonlite_1.6 ```
Dockerfile ```r FROM docker.elastic.co/elasticsearch/elasticsearch:7.0.0 RUN bin/elasticsearch-plugin install --batch ingest-attachment COPY config/. ./config/ ```
sckott commented 5 years ago

thanks for the detailed report, i'll take a look soon

Aeilert commented 5 years ago

Great.

sckott commented 5 years ago

So I make sure I understand:

Aeilert commented 5 years ago

To answer your questions:

sckott commented 4 years ago

Was it the httr-package back then?

Yes, it was the httr pkg back then

Okay, just pushed a change, I think this should work for you now, if you're still interested. See the query param in docs_bulk

sckott commented 4 years ago

@Aeilert 👆