elastic / elasticsearch-rs

Official Elasticsearch Rust Client
https://www.elastic.co/guide/en/elasticsearch/client/rust-api/current/index.html
Apache License 2.0
702 stars 72 forks source link

How to create a request that overwrites older values? #77

Closed Jasperav closed 4 years ago

Jasperav commented 4 years ago

The only method that uses Method::Put is 'PutScript', but I don't think that is what I am looking for. I have a Rust struct that looks something like this:

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct User {
    pub user_id: Uuid,
    pub username: String
}

I want to insert it into my 'user' index in ElasticSearch (to eventually search them by username). This is what I tried:

pub fn update(user: User) -> BResult<()> {
    let elastics = // Some elastic search connection
    let user_id = user.user_id.to_string(); // I want to use the user id as id in ElasticSearch
    let mut update = elastics .create(CreateParts::IndexId("user", &user_id)).body(user);
    // At this point, somehow update.body is None, although I did set it. Strange...

    // I don't use Tokio in my project so I use the Tokio runtime to block
    let response = log_if_err_retry(db_session, Runtime::new().unwrap().block_on(update.send()))?;

    assert_ok(db_session, response.status_code())?;

    Ok(())
}

When I manually provide JSON to the body of 'update' (by doing json!(...)), I get the error 409 when executing the same request twice. That is indeed what the docs are saying in Rust, but I just want the values to overwrite. I think I want to mimic this request (that I execute in Kibana):

PUT /user/_doc/USERID
{ 
    USER JSON
}

But I don't know how to do it in Rust. So:

  1. Why is my update.body None?
  2. How can I insert/update json for a given ID?

I couldn't find a put request in the tests folder.

russcam commented 4 years ago

I'm not sure why update.body is None. All request struct builders use a consuming builder pattern (take mut self), so fluent calls need to assign to the let binding of the struct you're mutating. This would be fine in the example given above as far as I can see, but not sure if this is a simplified example.

The index API can be used to overwrite a complete document, or the update API can be used to make a partial update to a document (internally, Elasticsearch overwrites the whole document by merging existing document with what is supplied in the request).

An example of the index API

use elasticsearch::{
    indices::IndicesCreateParts, Elasticsearch, GetParts, IndexParts,
};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use uuid::Uuid;
use elasticsearch::indices::IndicesExistsParts;
use reqwest::StatusCode;

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct User {
    pub user_id: Uuid,
    pub username: String,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = Elasticsearch::default();

    let index_exists_response = client
        .indices()
        .exists(IndicesExistsParts::Index(&["user"]))
        .send()
        .await?;

    // create index if it doesn't exist
    if index_exists_response.status_code() == StatusCode::NOT_FOUND {
        let create_index_response = client
            .indices()
            .create(IndicesCreateParts::Index("user"))
            .send()
            .await?;

        if !create_index_response.status_code().is_success() {
            panic!("create index failed");
        }
    }

    let mut user = User {
        user_id: Uuid::new_v4(),
        username: "foo".into(),
    };

    // index document
    let index_response = client
        .index(IndexParts::IndexId("user", &user.user_id.to_string()))
        .body(&user)
        .send()
        .await?;

    if !index_response.status_code().is_success() {
        panic!("indexing document with username {} failed", &user.username)
    }

    // get the document by id
    let get_response = client
        .get(GetParts::IndexId("user", &user.user_id.to_string()))
        .send()
        .await?;

    let get: Value = get_response.read_body().await?;
    let seq_no = get["_seq_no"].as_i64().unwrap();
    let primary_term = get["_primary_term"].as_i64().unwrap();
    user = serde_json::from_value(get["_source"].clone())?;
    println!("username: {}, seq_no: {}, primary_term: {}", &user.username, seq_no, primary_term);

    // change the username
    user.username = "bar".into();

    // index the document again
    let index_response = client
        .index(IndexParts::IndexId("user", &user.user_id.to_string()))
        .body(&user)
        .send()
        .await?;

    if !index_response.status_code().is_success() {
        panic!("indexing document with username {} failed", &user.username)
    }

    // get the document again
    let get_response = client
        .get(GetParts::IndexId("user", &user.user_id.to_string()))
        .send()
        .await?;

    let get: Value = get_response.read_body().await?;
    let seq_no = get["_seq_no"].as_i64().unwrap();
    let primary_term = get["_primary_term"].as_i64().unwrap();
    user = serde_json::from_value(get["_source"].clone())?;
    println!("username: {}, seq_no: {}, primary_term: {}", &user.username, seq_no, primary_term);

    Ok(())
}

The seq_no and primary_term can be used for optimistic concurrency control when indexing the document again

let index_response = client
    .index(IndexParts::IndexId("user", &user.user_id.to_string()))
    .if_primary_term(primary_term)
    .if_seq_no(seq_no)
    .body(&user)
    .send()
    .await?;

An example of the update API - replace the second index API call with

user.username = "bar".into();

let update_response = client
    .update(UpdateParts::IndexId("user", &user.user_id.to_string()))
    .body(json!({
        "doc": {
            "username": &user.username
        }
    }))
    .send()
    .await?;
russcam commented 4 years ago

I'm going to close this issue as I believe I've answered your question. @Jasperav 👍

Jasperav commented 4 years ago

O darn I thought I already left a message! Your example worked, thank you very much :). This issue can indeed be closed.