ropensci / elastic

R client for the Elasticsearch HTTP API
https://docs.ropensci.org/elastic
Other
244 stars 58 forks source link

Question: Bulk upsert - adding new fields rather than replacing entire document #169

Closed iainmwallace closed 6 years ago

iainmwallace commented 7 years ago

Hi,

Wonderful package! New to elastic search but was wondering if it is possible to do a bulk upsert? I want to add extra fields to what are already present.

For example, if I store the following x<-tibble(id=letters[1:3],my_letter=LETTERS[1:3]) docs_bulk_prep(x,"test",path = tempfile(fileext = ".json"),doc_ids=x$id) docs_bulk() I get this as a document {"_index":"test","_type":"test","_id":"a","_version":1,"found":true,"_source":{"id":"a","my_letter":"A"}}

I want to append on a new field "my_number". I naively repeated the process but with a different column in the data frame x<-tibble(id=letters[1:3],my_number=1:3) but my new document replaced the existing. {"_index":"test","_type":"test","_id":"a","_version":2,"found":true,"_source":{"id":"a","my_number":1}}

Is there an efficient way I could approach this?

thanks

Iain

sckott commented 7 years ago

thanks for your question @iainmwallace and glad you like the pkg

so there is a way to do updates using the bulk API https://www.elastic.co/guide/en/elasticsearch/reference/5.2/docs-bulk.html#bulk-update

there are 3 possible inputs to docs_bulk

for the 3rd option, no problem, the user just has to create the file manually and say what operation they want to do with each row (caveat is that if files created with docs_bulk_prep there's currently no way to do anything other than the index operation) - so you can do updates right now if you create your files manually

for data.frame or list input - we don't currently support in docs_bulk to do anything other than the index operation - we could potentially add a way to either 1) pass in a vector == NROW(data.frame) or length(list) with the operation (index/create/update/delete) for each row or list chunk - or 2) allow a specific field in the data.frame or list with the operation (index/create/update/delete) - but as you can see on the docs page it's not always just a single string (index/create/update/delete) - some of those operations can take more commands making it complicated

anyway - I can try to add support for data.frames - but I can't promise that it will be incorporated if it doesn't fit

iainmwallace commented 7 years ago

Thanks! I will try and write my own json files and use the docs_bulk option to upload them

Adding support to add new fields if they don't exist from a data.frame, and update them if they do, would enable what I would imagine is a pretty common workflow

Store info from database query A about object type 1 in elastic search Store other info from database query B about object type 1 in elastic search Users then query the elastic search instance

sckott commented 7 years ago

Adding support to add new fields if they don't exist from a data.frame, and update them if they do

will look into it

sckott commented 7 years ago

haven't forgotten about this, still getting around to it

iainmwallace commented 7 years ago

Great!

In case it is useful to others, this is how I created custom json files for upserting into elastic search to do this workflow.

library(dplyr)
library(jsonlite)
library(tidyr)
library(purrr)
library(pbapply)

map_header<-function(x){
  header <-
    list(update = list(
      "_index" = "my_index",
      "_type" = "my_id",
      "_id" = x
    ))

  header<-jsonlite::toJSON(header,auto_unbox = T)
  return(header)
}

map_body<-function(x){
  #create property "my_dataset"
  my_doc = list(
    my_dataset = x
  )

  json_doc<-jsonlite::toJSON(list(doc = my_doc, doc_as_upsert = TRUE), auto_unbox = T)
  return(json_doc)
}

create_json_body<-function(my_id_subset,my_dataset,tmp_file="tmp_elastic_files_"){
  #Create document for each row in a dataset limited to specific rows
  #my_dataset = dataset to load into elastic search
  #my_id_subset = list of ids 
  my_small_dataset<-my_dataset%>%filter(id_column%in%my_id_subset)
  my_tmp_file = tempfile(pattern=tmp_file,fileext = ".json")

  tmp_table<-my_small_dataset%>%nest(-id_column)%>%
    mutate(body=map(data,map_body))%>%
    mutate(header=map(id_column,map_header))%>%
    mutate(combined=paste0(header,"\n",body))

  write(tmp_table$combined,file=my_tmp_file)
  print(my_tmp_file)
}

Example

my_dataset<-data_frame(id_column=letters[1:26],value1=runif(26),value2=runif(26))

my_ids<-unique(my_dataset$id_column)
x<-split(my_ids, ceiling(seq_along(my_ids)/10)) # change based on how many documents per json file
pblapply(x,create_json_body,my_dataset)

files<-list.files(tempdir(),pattern="tmp_elastic_files_")
for(i in seq_along(files)){
  cat(i,"\n")
  invisible(
    docs_bulk(
      paste0(tempdir(),"/",files[i])
    )
  )
}
sckott commented 7 years ago

thanks for that

sckott commented 7 years ago

@iainmwallace putting off to the next milestone - but some work on a different branch. install like devtools::install_github("ropensci/elastic@bulk-update") and let me know what you think

sckott commented 6 years ago

any thoughts @iainmwallace ?

iainmwallace commented 6 years ago

Hi Scott,

Just tried but had an issue finding the docs_bulk_update function discussed in this commit https://github.com/ropensci/elastic/commit/12dcb92696d23bfc396cdddf184df73e4a5c195d

I have a brief write up on what I did here : http://www.iainmwallace.com/2018/01/21/elasticsearch-and-r/

Is there something else I should be doing?

Cheers,

Iain

On Sun, Jan 21, 2018 at 12:18 PM, Scott Chamberlain < notifications@github.com> wrote:

any thoughts @iainmwallace https://github.com/iainmwallace ?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/ropensci/elastic/issues/169#issuecomment-359264320, or mute the thread https://github.com/notifications/unsubscribe-auth/AML4VuIInuU-YI-HM12zowxGOXWmEPLUks5tM3FjgaJpZM4MWHGR .

sckott commented 6 years ago

@iainmwallace sorry about that, just updated the branch https://github.com/ropensci/elastic/tree/bulk-update - i had forgotten to update NAMESPACE and make the man file for it.

try again after reinstalling from that branch

iainmwallace commented 6 years ago

Thanks - I can now see the function.

When I try to run the following code, I am not able to update the data due to the index being read only. Is there a setting somewhere that I need to change when creating the index?

library(elastic) connect(es_port = 9200)

df <- data.frame(name = letters[1:3], size = 1:3, id = 100:102) index_create('test') docs_bulk(df, 'test', 'foobar', es_ids = FALSE)

df2 <- data.frame(size = c(45, 56), id = 100:101) docs_bulk_update(df2, index = 'foobar', type = 'foobar')

updating results in error

[[1]]$items[[2]]$update$error [[1]]$items[[2]]$update$error$type [1] "cluster_block_exception"

[[1]]$items[[2]]$update$error$reason [1] "blocked by: [FORBIDDEN/12/index read-only / allow delete (api)];"

On Sun, Jan 21, 2018 at 6:39 PM, Scott Chamberlain <notifications@github.com

wrote:

@iainmwallace https://github.com/iainmwallace sorry about that, just updated the branch https://github.com/ropensci/elastic/tree/bulk-update - i had forgotten to update NAMESPACE and make the man file for it.

try again after reinstalling from that branch

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/ropensci/elastic/issues/169#issuecomment-359292061, or mute the thread https://github.com/notifications/unsubscribe-auth/AML4VtuMnWGDL2431zWbkIc3TOqBVcVUks5tM8qcgaJpZM4MWHGR .

sckott commented 6 years ago

possibly this https://stackoverflow.com/questions/34911181/how-to-undo-setting-elasticsearch-index-to-readonly/34911897#34911897 same here https://discuss.elastic.co/t/forbidden-12-index-read-only-allow-delete-api/110282/4

iainmwallace commented 6 years ago

Thanks - the issue was that my disk was nearly full causing elastic search force all indices to be read only (a flood stage watermark, more details available here https://www.elastic.co/guide/en/elasticsearch/reference/6.x/disk-allocator.html )

The update looks great! Works as expected. Only small suggestion is that the warning if the id column is missing incorrectly states that '_id' must be present, when it should be just 'id' Error in docs_bulk_update.data.frame(df3, index = "test2", type = "foobar") : data.frame must have a column "_id" or pass doc_ids

An equally small suggestion, it would be useful to have examples of how to pass additional parameters through the functions. For example, I wasn't able to figure out how to pass the read_only parameter through the index_create function.

hope that helps :)

On Mon, Jan 22, 2018 at 2:23 PM, Scott Chamberlain <notifications@github.com

wrote:

possibly this https://stackoverflow.com/questions/34911181/how-to- undo-setting-elasticsearch-index-to-readonly/34911897#34911897 same here https://discuss.elastic.co/t/forbidden-12-index-read-only- allow-delete-api/110282/4

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/ropensci/elastic/issues/169#issuecomment-359535536, or mute the thread https://github.com/notifications/unsubscribe-auth/AML4Vgj4P3JLMJWX2K_T8HFOTTLQP7Mxks5tNOAjgaJpZM4MWHGR .

sckott commented 6 years ago

Glad you sorted out the problem, and that the fxn works.

Thanks for the suggestions

sckott commented 6 years ago

@iainmwallace i think it's done now, merged into master, so you can devtools::install_github("ropensci/elastic") and get latest

sckott commented 6 years ago

any feedback is good, only supports data.frame's for now - added another example, so there's one for adding new rows or new columns, both the same operation really.