Open nknize opened 2 years ago
I'd like to restate the problem a little bit to help me understand better:
The current
_bulk
indexing API guarantees that all provided documents are durability persisted before sending the response to the user. For various reasons, it is generally not possible to fully utilize server-side resources from a single-threaded client sending serialized bulk requests. This forces users to build architectures that send concurrent bulk requests that are difficult to get right. Furthermore, users often have a stream of documents as the source to send to the search engine and converting that source to concurrent bulk requests is a major impedance mismatch. A streaming index API will solve this mismatch and allow users to fully utilizing indexing capabilities in a much simpler way.
Is that right? I think a clearer description of the use cases being targeted here would help us define the semantics of the API.
I think it's crazy that ES (and now OS) does not offer a streaming indexing API.
The bulk indexing API is horrible -- clients must figure out 1) how many docs to include in every bulk request (incurring possibly high latency if docs/sec is not so high), 2) how many concurrent bulk requests to send to maximize indexing throughput, 3) handle resulting heap/OOME issues, and 4) then handle the horrific RejectedExecutionException
if too many requests are sent concurrently. This API is exposing implementation details of the cluster that should NOT be a concern of the client.
A streaming indexing API would empower the server to pull docs off the wire into the "right" sized chunks/concurrency based on available indexing and heap resources, would allow simple back-pressure if the requested indexing rate is too high for the cluster resources, and finally would enable a very simple durability model (when client is done sending the streaming docs, when they see the ACK at the end, the changes are durable).
We should deprecate the poorly designed bulk indexing API and switch to a simpler/less risky streaming indexing API!
Here is a proposal for the above mentioned RFC,
The OpenSearch bulk
API Provides a way to perform multiple index
, create
, delete
, and update
actions in a single request. The node that receives the request is referred to as the coordinating node as it manages the life of the request and assembles the response. This can be a dedicated node or one of the data nodes in the cluster. The coordinating node uses the write threadpool, which has number of threads equals to the available processors and a queue size of 10000, to handle the bulk requests.
A bulk request contains operations intended for multiple indices and shards. Once a bulk request is received by the coordinating node, it
429 (Too Many Requests)
for plain HTTP client or RejectedExecutionException
in case of a Java client.The bulk API assumes data is bounded and available in batches. In reality, a lot of data is unbounded because it arrives gradually over time. With the bulk API, the user must artificially divide the data into chunks of fixed size. However, there is no clear way to know what is the right size of the chunk without experimenting with it.
With streaming mode, we can send a stream of data records using the bulk API in in a single HTTP connection. To enable streaming mode, we will leverage upon the Chunked Transfer Encoding to establish a persistent connection and OpenSearch will accept the records in order as long as the connection is open. For each record, OpenSearch will asynchronously send a response to the client in the same order.
In the streaming mode, the API can be called with an additional header "Transfer-Encoding: chunked” on the initial request. For example to establish a connection,
curl -X POST -H "Transfer-Encoding: chunked" -H "Content-Type: application/json"
"<host:port>/_bulk"
Once the stream has been established, we can start sending multiple requests on the same stream. To existing bulk API uses the newline delimited JSON (NDJSON) structure:
{ "index": { "_index": "<index>", "_id": "<id>" } }
{ "A JSON": "document" }
...
In the streaming mode, we would want to combine the action, metadata and the optional source into a single structure to stream it atomically. For example,
{ "action": { "index" : { "_index" : "<index>", "_id" : "<id>" } }, "source": { "A JSON": "document" }}
However, it would add a breaking change to the bulk API and the alternative would be to introduce a new API.
In OpenSearch, a refresh operation makes the changes to shards available for search. OpenSearch provides the dynamic setting index.refresh_interval
, which defaults to 1 second, to set the time interval when it performs a refresh.
With the bulk API, we can pass an URL parameter refresh
to control the refresh. Possible values are true
, wait_for
and false
.
In the streaming mode, we may want to reconsider whether to provide this control to the client.
Currently, OpenSearch uses a transaction log to store index and delete operations until they are committed to the Lucene index. OpenSearch, through the index.translog.durability
dynamic setting, provides two kinds durability levels - request
and async
fsync
ed and committed to the disk. (default)fsync
ed and committed in the background every sync_interval (default to 5s).With streaming mode, we may want to keep the same durability mechanism and this can be taken care of as part of the pluggable translog feature proposal.
The Chunk Transfer Encoding is only available in HTTP/1.1
. While, OpenSearch currently supports both HTTP/1.0
and HTTP/1.1
versions, for the streaming API we can only support the HTTP/1.1
.
HTTP/2
has its own streaming mechanism and we need to update the streaming API when we start supporting HTTP/2
in OpenSearch.
If the client doesn’t close the connection when all the data is streamed, resources on the server is wasted and we may need to consider closing the connection after a certain client idle time out.
Currently, the requests to OpenSearch and responses to the client can be gzip-compressed for optimization. With the streaming mode enabled, the individual requests can not be compressed.
Based on the feedback from this RFC, we will work on the design (link to be added) for the above solution.
@adnapibar thanks a lot for formalizing the proposal, I was thinking about possible streaming mode implementation and I think the Chunked Transfer Encoding may not be what we should be looking at. To be clear - chunking transfer does work, even the back pressure should be feasible to support, but what does not for bulk ingestion is error propagation, let me elaborate.
Let us assume that client establishes the stream and starts sending bulk requests. At some point, the server may need to shed the load and stop ingestion, including the bulk streams. How to communicate that back to the client?
HTTP/2 or/and WebSockets could be the superior options in this case to explore. The client streams bulk requests but the servers communicates back the status of the stream processing.
To enable streaming mode, we will leverage upon the Chunked Transfer Encoding to establish a persistent connection and OpenSearch will accept the records in order as long as the connection is open. For each record, OpenSearch will asynchronously send a response to the client in the same order.
I am curious how asynchronous response per each record could be implemented specifically for chunked transfer encoding: AFAIK most of the clients and servers would not deal with response before finishing with the request, even if both request and response are chunked.
Thank you.
AFAIK most of the clients and servers would not deal with response before finishing with the request, even if both request and response are chunked.
@reta Thanks for the feedback. I was suggesting that each chunk to be an individual operation that can be handled atomically by the server and a corresponding response to be sent as chunked encoding. It doesn't have the same semantics as the bulk request which handles batches of operations. While there are lots of implementations details that I'm unaware at this point, I would appreciate to hear drawbacks and alternative mechanisms.
I agree with @reta about http2 and/or websockets. When I had originally read this issue that is what I immediately thought of. With 2-way communication it would be easier for OpenSearch to notify the client of its state, ie. low heap, full queue, etc.
+1, the persistence of a connection is not really the problem (can be achieved with keep-alive today), it's the need to have to reason about the size of bulk requests to avoid sending too many requests
Also, while it's not possible to follow the progress of each separate request in the bulk API (that would be streaming), I think we should also consider possible ways to improve upon that, e.g. https://github.com/opensearch-project/OpenSearch/issues/3625
The Kafka protocol is perhaps a good example to look at. It is a fully pipeline-able 2-way streaming API which is what I was thinking of here. The key points are that clients can (and should) use non-blocking I/O to implement request pipelining and that a single client should rarely if ever need to implement connection pooling to send requests across multiple connections.
However, Kafka defines a completely hand-rolled binary format on a plain TCP connection which puts a lot of implementation burden on clients, which is probably not the right thing to do given that there are now streaming options available like HTTP/2 and WebSockets.
We have to provide an API over HTTP and not only with clients for high level programming languages. I agree that HTTP/2 is a superior option but we don't have support for it yet.
@andrross Kafka also provides a streaming REST API over HTTP using the same mechanism that I proposed earlier.
Opened a separate issue to add support for HTTP/2 #3651
In the streaming mode, we would want to combine the action, metadata and the optional source into a single structure...However, it would add a breaking change to the bulk API and the alternative would be to introduce a new API.
I think the streaming index API should be a new API. Like segment replication it should start as experimental behind a feature flag so we can benchmark default parameters and API semantics before promoting it as a first class bulk ingest mechanism. As you touch on in the durability levels
, we're exploring durability under different configurations and looking at introducing new durability controls. For example, segrep w/o remote store needs the local translog to provide durability. Once operations are durable in the translog we can ack to the client; segrep w/ remote store will ack after a commit. But like UDP a user may not be so concerned about durability and won't care if an operation is lost in which case no ack is necessary.
Refresh policy: In the streaming mode, we may want to reconsider whether to provide this control to the client.
This defaults to false
in the current bulk API, effectively decoupling refresh from bulk indexing. The high penalty true
value was originally introduced for those cases where users wanted documents available for search immediately after each operation (e.g., some security use cases) and wait_for
was intended to strike a balance. I think we'll want to retain this control but introducing streaming index as a separate API allows us to explore the necessity as we evolve segment replication.
I agree that it should be a new API as it allows you to pack more optimizations within this API that are natively closer to the streaming paradigm and would be difficult to implement in the existing synchronous bulk API, especially with all its durability options.
I would also think about how we could directly stream to the data node instead of coordinator node splitting the requests in between. This would probably require us to vend clients with some intelligence about routing?
I have been thinking about whether we should explore a more performant x content parser for high throughput writes than JSON and streaming API fits pretty well there. Instead of just using existing bulk formats, we could implement a more performant wire protocol along with this API.
In the streaming mode, we would want to combine the action, metadata and the optional source into a single structure...However, it would add a breaking change to the bulk API and the alternative would be to introduce a new API.
You need to be more careful about this change as it introduces much more duplicate processing with every document parsed.
I concur the thought of having a separate API to revisit our freshness and durability semantics and pack optimizations as needed.
I guess the network infrastructure/firewall would potentially limit how long the connection can stay open, this should also factor in inevitable cases where the connections have to be forcibly closed like server maintenance
Do we also plan on supporting a client library for ensuring persistent consistent(keep-alive), connection close on end of stream, backup buffering mechanism if the server isn't able to process as fast and close connection if the buffer hits a certain limit, reconnection on connection drops. The server could apply back-pressure if it isn't able to process the stream as fast or sees resources being too close to being exhausted.
I would also think about how we could directly stream to the data node instead of coordinator node splitting the requests in between. This would probably require us to vend clients with some intelligence about routing?
@itiyama I think we could have the coordinator split the streams for parallel processing and fan it out to respective shards as needed or even consider having a single stream always write to a single shard if there are overheads with splitting them
@reta do we want to create a meta issue and start documenting the client breaking changes to help drive a bwc / 3.0 release discussion for supporting this feature?
@reta do we want to create a meta issue and start documenting the client breaking changes to help drive a bwc / 3.0 release discussion for supporting this feature?
@nknize very likely so, I will work on POCs over the next week to outline the implementation choices, than we could bundle that into meta issue weighting the breaking changes, thanks!
@reta @nknize, I have created a github project for better tracking and visibility. https://github.com/orgs/opensearch-project/projects/116/views/1. Feel free to add all related issues to this board and we can track it from there
I hope we do not expose cluster details like "heap is low", "too many concurrent requests", etc., through this new API. Rather, the cluster should pull from the stream at the rate it is able to given the load and free resources in the cluster.
Probably the first POC should build on _bulk
internally? I.e. the coordinator accepts this stream, but breaks it into N concurrent bulk requests internally of size roughly M bytes each, out to the separate shards/indices that the streaming update requests target. N and M might be tunable configuration on the cluster, or, could be dynamically determined, or maybe eventually, both.
If the client is sending more docs per second than the cluster can handle, just let the standard TCP level networking back-pressure stall the incoming bytes. The client in turn will see that the socket is blocking on trying to write too many bytes/second. In this new streaming indexing API, clients should never see rejections.
I don't think we should enable fancy durability options to begin. Start simple: nothing is durable until the client closed the connection and gets the final ACK / TCP close success back from the server. Likewise, let's not tie refresh to this new API either. If security use-cases really need this, then they should call refresh
themselves after closing the indexing stream.
Similarly, let's not introduce transactional semantics for the first go. I.e. the ongoing periodic refreshes (1s by default) will expose any recently indexed documents via this new streaming API for searching.
Thanks a lot @mikemccand , your comments are very much aligned with what we have discussed as the way forward, I intend to capture these details in the implementation proposal (next week or so). Thank you again.
Why does this feature entail a breaking change? It seems capable of existing in the 2.x line.
Why does this feature entail a breaking change?
There will need to be HTTP2 streaming OpenSearch APIs added the clients in order to leverage the streaming index API. This will require an upgrade to both clients and servers. @reta I think that can be done in a backward compatible way but it would require supporting both HTTP1 and HTTP2 and I'm not sure there's a good understanding yet of that blast radius and what tech debt will continue to accrue. We already have so much as it is. That along with server and client upgrades I wonder if we just box this to 3.0 only and release when we have a date schedule for 3.0?
I also want to point out that we intend for this to be the transport replacement for both _bulk
as well as the standard Put
document DSL API and Update. I mention this because some folks are minimizing the importance of this feature as it pertains to the bigger durability mechanisms. We do not plan to fragment the implementation with HTTP1 for Put/Update
calls when we can achieve all of the above (e.g., "stream" one document) w/ the same HTTP2 mechanisms.
@reta I think that can be done in a backward compatible way but it would require supporting both HTTP1 and HTTP2 and I'm not sure there's a good understanding yet of that blast radius and what tech debt will continue to accrue.
@nknize "yes" for server side but "no" for clients - the existing 2.x clients (rest / RHLC) are build on top of Apache Http Client 4.x and it does not support HTTP/2.
Is your feature request related to a problem? Please describe. Current
_bulk
indexing API places a high configuration burden on users today to avoidRejectedExecutionException
due toTOO_MANY_REQUESTS
. This forces the user to "experiment" with bulk block sizes, multi-threading, refresh intervals, etc.Describe the solution you'd like The _bulk configuration burden and workflow should be relocated from the user and handled by the server. The user experience should switch to an anxiety free API that enables users to send a "stream" of index requests that is load balanced by the server in a Streaming Index mechansim.
This Streaming Index API mechanism should also handle the "durability" responsibility based on a user defined Durability Policy to determine the following:
Describe alternatives you've considered Continue w/ durability as it is today w/ a document replication model.