elastic / elasticsearch-js

Official Elasticsearch client library for Node.js
https://ela.st/js-client
Apache License 2.0
5.23k stars 727 forks source link

Bulk helper: Support streaming use/inifinite streams or generators #2266

Open spinscale opened 2 months ago

spinscale commented 2 months ago

šŸš€ Feature Proposal

A common use-case for apps are infinite consumers, that pass data over to Elasticsearch via bulk requests. Being a Java Client users for many years I thought all clients operate the same and do support this. However Martijn corrected my assumption in the forum that there is a differentiation between push and pull based bulk ingestion helpers in the various clients.

My basic idea would be adding support (or maybe it already works and just requires documentation updates) for endless ingestion by providing a bulk helper. This way I could use something like queueable to keeping adding data that then gets consumed by a bulk helper.

As mentioned in the thread, there may be corner cases (like the queue being empty longer than the flush interval), that need to be covered.

Also in order to align with the other clients, adding another document count threshold to the bulk helper could make sense.

Motivation

This will make it easier to implement any kind of continously polling/streaming service that needs to bulk index data into Elasticsearch.

Example

I'd assume there is no change in the bulk API actually (maybe also add number of documents), but it allows parsing a generator that is infinite.

P.S. If this already works as expected, please close - there is still the possibility I missed this in the docs and just asked around for nothing cause everything works as expected šŸ˜€

JoshMock commented 2 months ago

I've not used queueable before, but if it uses ReadableStream correctly, the bulk helper should already support it. As the docs note, datasource can be an array, async generator, or ReadableStream. (It also works with Buffers, and I'm not sure why that's not documented.) Here is where the code asserts what types it supports, and here is where it begins looping over datasource.

Have you already tried using an infinite stream or generator to see if they work? If not, I'd love to know what problems you ran into because it should!