Open AydinChavez opened 5 years ago
ES does not really serve as a good queue for these sorts of operations. At the moment, the only way that you might be able to use it as an input would be to read all the data from the index and then stop producing inputs as you have exhausted the input. At this point it's probably a better idea to just use the index as input to a batch job instead of a streaming job.
There are some things we are looking at that would make this doable in the future. For instance, in the event of a changes API in Elasticsearch, we could automatically pull in any new data or updates that occurred since the last batch of data was processed. This is much better than attempting a repeated query approach where the data is queried continuously against an advancing watermark.
Is there new feature or way added for doing streaming with ES as source??
Here is the change that we would need to make this happen: https://github.com/elastic/elasticsearch/issues/1242. And here is a fleet-specific change that provides some of this (but I don't think it's enough to do this right): https://github.com/elastic/elasticsearch/pull/71093.
Hi, is there any way to directly read stream using pyspark from elastic?
Hi,
I'd like to consume/read data from ES via spark structured streaming. The only possible solution I know today is via a workaround by using a combination of logstash and kafka (
ES -> logstash -> kafka -> spark structured streaming
).It would be very helpful to have a direct solution via spark structured streaming so you just invoke readstream towards the ES-backend. Technically, the solution should offer a possibility to provide an ES query as a parameter when connecting to ES where it should be possible to define a filter and/or a timeframe on each interval.
I don't think that it should be that hard to implement, as the spark-batch variant of reading data from ES does already exist and most part of this code could to be reused for the structured streaming version.