Open costin opened 10 years ago
+1; was hoping to pull an aggregation result back to Spark.
Hello Costin!
We started using ES and Spark in my company. And we are really struggling because we can't use aggregations to shrink data which gets loaded into Spark. I am wondering if this feature will be implemented in the nearest future. Also, maybe you need help with that? Interesting how much time in your opinion it will take to implement it.
Kind regards, Alex
This feature is the main one in 2.2. In terms of ETA I'm reserved in giving an estimate since aggregation works differently then scan and scroll. The current plan is to release m2 in the next two weeks and focus entirely on aggregations after that. A prototype should be available in the following weeks - when that happens, this ticket will be updated.
Help is always appreciated however do note that complex/critical pieces of functionality, like this one, are carefully examined due to their impact. In other words, any PR raised might or might not be accepted for technical reasons.
+1 (and good luck @Costin)
+1
Hi , I am also stuck with this , could you let me know expected date of release for this feature.
+1 Any clues when we can expect this feature?
+1 Would love to know this as well
+1 this is indeed reducing a lot the added value of ES as a source for spark jobs.
+1
+1
+1 We really need this functionality
I would like to detail a bit the use case. In most cases, aggregated result are relatively small, and do not need result pagination. However, this is limiting the usage of ES on spark for controlled query. In our case, we let our users create arbitrary queries. In such cases, it is preventing the proper paralellization of computations on the query results. We cannot of course open these features to our customers, because we cannot control the amount of memory used on Spark side. We understand that by asking ES to scroll the result, we are shifting the burden to ES for buffering the results of an aggregation. However, ES is in a unique position to optimize the aggregation, and might use tricks like sorting to stream results from nodes little by little in certain cases, which would be much more efficient cluster wise.
:+1: We also really need this functionnality :(
Folks ... Is there a released version which supports Elastic Search "query" with aggregation, that can be called using the "newAPIHadoopRDD" API ? If not, are there any work arounds ?
This 2.5 yo bug keeps getting moved back, the lack of this functionality is a huge gap for ES/Spark.... Please prioritize this!
@dickeyre It seems like this was stuck in one of the release version tags and kept getting pushed back. For the time being the major limitation with this is how different the mechanisms for information retrieval are for scroll versus aggregations. The other problem that we run up on is the lack of suitable API hooks mixed with conflicting adherence to API standards (e.g. rdd.count() requirements to materialize the data). I'll remove the version tag to avoid any confusion on the issue's prioritization.
any word? 2.5 years is a long time for some core functionality like this to not exist...
@vijaykramesh As mentioned above when this ticket was set to stalled
, currently there are significant roadblocks to this, one of which is the lack of API hooks, and another being the complexity of mixing scan and scroll with aggregation based requests.
+1
+1
+1
+1
+1
I guess it'll never be done :(
I suggest to look into using an intermediate layer between Spark and ElasticSearch, such as Dremio; it can pushdown the aggregation to ES and return the results to Spark over JDBC.
It doesn't seem to be finished yet, but we really need it.XD
I found another bottleneck that affected transmission speed.Set "es.scroll.size" to 10000 and it default value is 50.It can increase the transmission speed from ES to spark.
@stupidsky You have to be careful about that number (scroll size). If the documents are large, it becomes IO issue, where reading from the disk and sending them over the network, will become a bottleneck itself.
I have an idea. I use elasticsearch-spark to send a query statement to elasticsearch, and want to get results after aggregated. But it return data including label of hits excluding label of aggregations.
@stupidsky The connector doesn't have any facilities for reading the aggregation portion of a result. It just skips over it and reads the hits. If you are looking to get aggregate data into spark with that kind of query, it is entirely possible to run the aggregation using the regular rest client and plug the results into spark with a makeRDD
method.
As it stands, the plan for aggregation pushdown in spark is to make use of the Composite Aggregation facility in Elasticsearch to stream the aggregation results to the workers in Hadoop/Spark.
This is what leads us to the stalled status at the moment: Spark does not inform data sources with aggregate information at the start of the job, which means we either have to break into the Spark query planning code and intercept it ourselves (which would be a maintenance nightmare) or wait until Spark provides a facility to do those aggregate pushdowns automatically.
This is a feature that users have wanted for a long time and I would really like to provide it in some way. The need for the Spark integration points may not be required for the feature to work at the RDD level. I want to let everyone in the community that this is a major item we would love to deliver, and we are thankful for your patience while we figure out the best path forward.
any updates on this?
Is this likely something we will never see happen now?
This is related to #1801. Spark has added the ability to push aggregations down into the data source (Elasticsearch in this case). We have experimented with it a little bit -- enough to know that it will be a significant amount of work to implement. But it is not currently prioritized. Until we can get this done, my less-than-ideal advice to anyone coming across this ticket would be to use a java client directly in your spark code whenever you need to perform an aggregation.
Currently due to the use of scan/scroll API, the results in aggregations are not properly returned. To cope with this, a mixture of count and scroll should be used.