opensearch-project / OpenSearch

🔎 Open source distributed and RESTful search engine.
https://opensearch.org/docs/latest/opensearch/index/
Apache License 2.0
9.46k stars 1.74k forks source link

[Streaming Indexing] Introduce bulk HTTP API streaming flavour #9070

Closed reta closed 2 weeks ago

reta commented 1 year ago

Is your feature request related to a problem? Please describe. The bulk HTTP API does not support streaming (neither HTTP/2 nor chunked transfer)

Describe the solution you'd like Introduce bulk HTTP API streaming flavor based on new experimental transport (https://github.com/opensearch-project/OpenSearch/issues/9067)

Describe alternatives you've considered N/A

Additional context See please https://github.com/opensearch-project/OpenSearch/issues/9067

https://github.com/opensearch-project/opensearch-api-specification/issues/537 https://github.com/opensearch-project/documentation-website/issues/8111

reta commented 3 weeks ago

There is new experimental API introduced in 2.15.0 which is not yet documented nor finalized:

    POST  /_bulk/stream
    PUT    /_bulk/stream
    POST  /{index}/_bulk/stream
    PUT    /{index}/_bulk/stream

The new API is using LD-JSON and at the moment, picks a single chunk from the stream (bulk operation), forwards it to the node of the cluster and returns the result of the operation as a chunk in the response, for example the request with three chunks:

<---- chunk 1 -----> 
{ "index": { "_index": "test-streaming", "_id": "1" } }
{ "name": "josh" }
<---- chunk 1 -----> 

<---- chunk 2 -----> 
{ "index": { "_index": "test-streaming", "_id": "2" } }
{ "name": "tom" }
<---- chunk 2 -----> 

<---- chunk 3 -----> 
{ "index": { "_index": "test-streaming", "_id": "3" } }
{ "name": "john" }
<---- chunk 3 -----> 

would receive a response with three chunks as well (1:1):

<---- chunk 1 -----> 
{"took":547,"errors":false,"items":[{"index":{"_index":"test-streaming","_id":"1","_version":1,"result":"created","forced_refresh":true,"_shards":{"total":2,"successful":1,"failed":0},"_seq_no":0,"_primary_term":1,"status":201}}]}

<---- chunk 2 -----> 
{"took":20,"errors":false,"items":[{"index":{"_index":"test-streaming","_id":"2","_version":1,"result":"created","forced_refresh":true,"_shards":{"total":2,"successful":1,"failed":0},"_seq_no":1,"_primary_term":1,"status":201}}]}
<---- chunk 2 -----> 

<---- chunk 3 -----> 
{"took":22,"errors":false,"items":[{"index":{"_index":"test-streaming","_id":"3","_version":1,"result":"created","forced_refresh":true,"_shards":{"total":2,"successful":1,"failed":0},"_seq_no":2,"_primary_term":1,"status":201}}]}
<---- chunk 3 -----> 

This is simple communication model but very inefficient. The goal of this issue is to formalize API in such a way that it could be used as efficiently as possible, aiming to overtake existing /_bulk APIs in the future.

reta commented 3 weeks ago

Option #1: Keep using NDJSON for streaming ingestion

The current _bulk APIs use NDJSON (Newline delimited JSON, https://github.com/ndjson/ndjson-spec) which fits perfectly to the streaming use cases: each chunk sent along the request represents distinct bulk operation/action (index, create, delete, and update). The suggestion is to not change it and use NDJSON for streaming /_bulk/stream APIs as well.

To efficiently batch chunks, the streaming bulk request could support following parameters:

The combination of both should also be possible (in this regard, whichever condition triggers first, it would shape the batch). Although the request format stays unchanged, the semantic of response would change to reflect the internal batching (so it won't be 1:1 anymore), for example, with batch_size=3 and three request chunks:

<---- chunk 1 -----> 
{ "index": { "_index": "test-streaming", "_id": "1" } }
{ "name": "josh" }
<---- chunk 1 -----> 

<---- chunk 2 -----> 
{ "index": { "_index": "test-streaming", "_id": "2" } }
{ "name": "tom" }
<---- chunk 2 -----> 

<---- chunk 3 -----> 
{ "index": { "_index": "test-streaming", "_id": "3" } }
{ "name": "john" }
<---- chunk 3 -----> 

the response would only contain one chunk (per 3 request chunks):

<---- chunk 1 -----> 
{"took":547,"errors":false,"items":[
    {"index":{"_index":"test-streaming","_id":"1","_version":1,"result":"created","forced_refresh":true,"_shards":{"total":2,"successful":1,"failed":0},"_seq_no":0,"_primary_term":1,"status":201}}
    {"index":{"_index":"test-streaming","_id":"2","_version":1,"result":"created","forced_refresh":true,"_shards":{"total":2,"successful":1,"failed":0},"_seq_no":1,"_primary_term":1,"status":201}},
    {"index":{"_index":"test-streaming","_id":"3","_version":1,"result":"created","forced_refresh":true,"_shards":{"total":2,"successful":1,"failed":0},"_seq_no":2,"_primary_term":1,"status":201}}
]}
<---- chunk 1 -----> 

The downside of this model: if the bulk action has documents without _id, it would be difficult to match the outcome of N request chunks to corresponding M response chunks (but this is a general problem with existing _bulk APIs as well) to precisely pinpoint the failures fe.

On the benefits side, keeping NDJSON would significantly simplify the migration from the existing _bulk APIs since the only changes would be related to slicing the bulk operations (and response processing if needed) but not data formats.

Risks to consider:

Option #2: Introduce efficient (binary?) format for streaming ingestion

Alternative option is to introduce new efficient (binary?) format for streaming ingestion (for example, based on Protocol Buffers).

Protocol Buffers are great for handling individual messages within a large data set. Usually, large data sets are a collection of small pieces, where each small piece is structured data. - https://protobuf.dev/programming-guides/techniques/

The example message schema may look like this:

syntax = "proto3";
import "google/protobuf/any.proto";

message Index {
  optional string index = 1;
  optional string _id = 2;
  optional bool require_alias = 3;
  map<string,  google.protobuf.Any> fields = 4;
}

message Create {
  optional string index = 1;
  optional string _id = 2;
  optional bool require_alias = 3;  
  map<string,  google.protobuf.Any> fields = 4;
}

message Delete {
  optional string index = 1;
  string _id = 2;
  optional bool require_alias = 3;      
}

message Update {
  optional string index = 1;
  string _id = 2;
  optional bool require_alias = 3;    
  optional google.protobuf.Any doc = 4;
}

message Action {
  oneof action {
      Index index = 1;
      Create create = 2;
      Delete delete= 3;
      Update update = 4;
  }
}

The schema actively relies on google.protobuf.Any to pass freestyle JSON-like structures around (for example, documents or scripts):

The Any message type lets you use messages as embedded types without having their .proto definition. An Any contains an arbitrary serialized message as bytes, along with a URL that acts as a globally unique identifier for and resolves to that message’s type. - https://protobuf.dev/programming-guides/proto3/#any

Risks to consider:

reta commented 3 weeks ago

@andrross @dblock @msfroh would appreciate early feedback if possible, working on some POCs at the moment to capture the operational metrics.

andrross commented 3 weeks ago

@reta My inclination here is that since this is a new API that is specifically targeting high-throughput use cases then we should explore option 2. I think there is quite a lot of overhead with parsing JSON and a binary protocol could improve upon that. For simple use cases where ease of use is paramount, the existing request-response style newline delimited JSON API will remain. I also think we could make adoption of a new binary API easier for the end user by keeping the same public API (as much as possible) of the various language clients like opensearch-java by continuing to use the current types exposed by those APIs even if it means an extra conversion on the client side to say a protobuf object type in the process of doing the binary serialization.

Tagging @amberzsy here as she is working on some protobuf experiments with the client API on the search side. We could potentially make the protobuf implementation for the client API much easier by generating the protobuf schemas from the opensearch-api specification, as that should the source-of-truth for OpenSearch client API.

reta commented 3 weeks ago

@reta My inclination here is that since this is a new API that is specifically targeting high-throughput use cases then we should explore option 2.

Thanks @andrross , I will try this route as well and publish some metrics so we could compare the wins (if any), thanks!

andrross commented 3 weeks ago

@reta I'm really curious if the protobuf Any type is feasible here. Have you actually used it in practice at all? I was thinking we might have to fallback to binary blobs of UTF-8 encoded JSON for the actual document source in some cases.

reta commented 3 weeks ago

@reta I'm really curious if the protobuf Any type is feasible here. Have you actually used it in practice at all? I was thinking we might have to fallback to binary blobs of UTF-8 encoded JSON for the actual document source in some cases.

No, I haven't tried this schema yet in practice (only made sure it is processable by protoc successfully), will let you know how it goes, thanks @andrross !

reta commented 2 weeks ago

To have change delivered incrementally, focusing this issue on HTTP/NDJSON API flavour (Option #1) and spinning off Protobuf API flavour into https://github.com/opensearch-project/OpenSearch/issues/15447 for future release (Option #2).