elastic / elasticsearch-hadoop

:elephant: Elasticsearch real-time search and analytics natively integrated with Hadoop
https://www.elastic.co/products/hadoop
Apache License 2.0
10 stars 989 forks source link

Option 'es.read.field.include' with Spark SQL fails to pushdown field or _source filtering #2244

Open cpeterp opened 4 months ago

cpeterp commented 4 months ago

What kind an issue is this?

Issue description

According to the documentation, the preferred method for subsetting fields in a query through Spark SQL is by using the 'es.read.field.include' option (see Reading DataFrames - Controlling the DataFrame schema). According to the docs, filtering options should be pushed down to the ElasticQuery. However, when using this option alone, the actual queries sent to Elastic DO NOT include a source filtering option. Instead, all fields are queried and the specified fields are only subsetted after the data is returned.

Using built-in DataFrame methods like DataFrame.select(<field>) do not modify the underlying query sent to elastic either.

Adding a "_source" parameter to the query option using DataFrameReader.option('es.query', <query>) does not get passed to the underlying query either, although mappings in the "query" key are.

Finally, using the 'es.read.source.filter' option does modify the query sent to Elastic (by adding a "_source" parameter), but using it results in an error when the dataframe is operated on:

User specified source filters were found [name,timestamp], but the connector is executing in a state where it has provided its own source filtering [name,timestamp,location.address]. Please clear the user specified source fields under the [es.read.source.filter] property to continue. Bailing out...

which is addressed in the docs here.

Steps to reproduce

Code:

import json
from pyspark.sql.session import SparkSession
spark = SparkSession()

# Method 1
sdf_1 = spark.read.format('org.elasticsearch.spark.sql') \
  .options(boilerplate_options) \
  .option('es.read.field.include', "field.a,field.b") \
  .load()
sdf_1 .count()

# Method 2
sdf_2 = spark.read.format('org.elasticsearch.spark.sql') \
  .options(boilerplate_options) \
  .option('es.read.field.include', "field.a,field.b") \
  .load()
sdf_2 = sdf_2.select("field.a", "field.b")
sdf_2.count()

# Method 3
query = {
  "_source": ["field.a", "field.b"],
  "query":{"match_all":{}}
}
sdf_3 = spark.read.format('org.elasticsearch.spark.sql') \
  .options(boilerplate_options) \
  .option('es.read.field.include', "field.a,field.b") \
  .option('es.query', json.dumps(query)) \
  .load()
sdf_3.count()

Strack trace: No errors were rasied. However, after setting logging on org.elasticsearch.hadoop.rest to TRACE, I saw the following in the logs for each method:

...
24/07/08 20:27:01 TRACE CommonsHttpTransport: Tx [POST]@[NODE_URL:PORT][sample_indx/_search]?[sort=_doc&scroll=5m&size=1000&preference=_shards_ABC&track_total_hits=true] w/ payload [{"slice":{"id":111,"max":700},"query":{"match_all":{}}}]
...
24/07/08 20:28:01 TRACE CommonsHttpTransport: Rx @[EXECUTER_IP] [200-OK] [{"_scroll_id":"ABCDEFGH","took":8248,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":305,"relation":"eq"},"max_score":null,"hits":[{"_index":"sample_indx","_id":"0001","_score":null,"_source":{"field.a":val, "field.b":val, "field.c":val ....}
...
24/07/08 20:29:01 TRACE CommonsHttpTransport: Tx [DELETE]@[NODE_URL:PORT[_search/scroll]?[null] w/ payload [{"scroll_id":["ABCDEFGH"]}]

Given these logs, and the time required to return data, it seems like no field/_source filter is pushed down.

Version Info

OS: DataBricks Runtime 10.4 LTS ML (runs on Ubuntu) JVM : 1.8.0_382 Hadoop/Spark: 3.2.1 ES-Hadoop : 8.10.0 ES: 8.12: