elastic / elasticsearch-hadoop

:elephant: Elasticsearch real-time search and analytics natively integrated with Hadoop
https://www.elastic.co/products/hadoop
Apache License 2.0
1.93k stars 986 forks source link

Option es.read.field.*.include unable to take field names containing a colon #2221

Open SebGay opened 2 months ago

SebGay commented 2 months ago

What kind an issue is this?

Issue description

Hello,

When attempting to use the following configurations: es.read.field.as.array.include or es.read.field.include providing a value that contains a colon causes an org.elasticsearch.hadoop.EsHadoopIllegalArgumentException error.

Steps to reproduce

Code:

Setup for testing
from json import dumps 
def boilerplate(additional_options, index, good_field):
    constant_options = {'es.nodes': 'xxx.xxx.xxx.xxx', 'es.port': '9200', 'es.nodes.resolve.hostname': 'false', 'es.nodes.wan.only': 'true'}

    query = dumps({"query": {"bool":{"filter": {"exists": {"field" : good_field}}}}})

    (
    spark.read 
    .format( "org.elasticsearch.spark.sql" )
    .options(**constant_options)  
    .options(**additional_options)          
    .option( "es.query", query)
    .load(index)
    .select(good_field)
    .display()
    )

Note that the following are palceholders in code for real names: STRUCTNAME, NESTEDFIELDn, PREFIX, SUFFIXn, INDEXn. INDEX1 does not contain field names with : while INDEX2 only contains fields with :.

Working on nested field in INDEX1:
boilerplate({'es.read.field.as.array.include': 'STRUCTNAME.NESTEDFIELD1'}, "INDEX1", "STRUCTNAME.NESTEDFIELD2")
Working without setting in INDEX2:
boilerplate({}, "INDEX2", "PREFIX:SUFFIX1")
Error (1) with include setting:
boilerplate({'es.read.field.as.array.include': 'PREFIX:SUFFIX2'}, "INDEX2", "PREFIX:SUFFIX1")

results in:

org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Failed to parse [es.read.field.as.array.include] option with value of [PREFIX:SUFFIX1]

as does including the suggestion from this thread

boilerplate({'es.mapping.date.rich':'false', 'es.read.field.as.array.include': 'PREFIX:SUFFIX2'}, "INDEX2", "PREFIX:SUFFIX1")
Error (2) with include setting:
boilerplate({'es.read.field.include': 'PREFIX:SUFFIX2'}, "INDEX2", "PREFIX:SUFFIX1")

results in:

org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Invalid parameter [dc:contributors] specified in inclusion configuration
Working with exclude
boilerplate({'es.read.field.exclude': 'PREFIX:SUFFIX2'}, "INDEX2", "PREFIX:SUFFIX1")

Trace

Error 1 ``` org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Failed to parse [es.read.field.as.array.include] option with value of [PREFIX:SUFFIX2] --------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last) File , line 1 ----> 1 boilerplate({'es.mapping.date.rich':'false', 'es.read.field.as.array.include': 'PREFIX:SUFFIX2'}, "INDEX2", "PREFIX:SUFFIX1") File , line 13, in boilerplate(additional_options, index, good_field) 3 constant_options = {'es.nodes': '[REDACTED]', 'es.port': '9200', 'es.nodes.resolve.hostname': 'false', 'es.nodes.wan.only': 'true'} 5 query = dumps({"query": {"bool":{"filter": {"exists": {"field" : good_field}}}}}) 7 ( 8 spark.read 9 .format( "org.elasticsearch.spark.sql" ) 10 .options(**constant_options) 11 .options(**additional_options) 12 .option( "es.query", query) ---> 13 .load(index) 14 .select(good_field) 15 .display() 16 ) File /databricks/spark/python/pyspark/instrumentation_utils.py:47, in _wrap_function..wrapper(*args, **kwargs) 45 start = time.perf_counter() 46 try: ---> 47 res = func(*args, **kwargs) 48 logger.log_success( 49 module_name, class_name, function_name, time.perf_counter() - start, signature 50 ) 51 return res File /databricks/spark/python/pyspark/sql/readwriter.py:312, in DataFrameReader.load(self, path, format, schema, **options) 310 self.options(**options) 311 if isinstance(path, str): --> 312 return self._df(self._jreader.load(path)) 313 elif path is not None: 314 if type(path) != list: File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1355, in JavaMember.__call__(self, *args) 1349 command = proto.CALL_COMMAND_NAME +\ 1350 self.command_header +\ 1351 args_command +\ 1352 proto.END_COMMAND_PART 1354 answer = self.gateway_client.send_command(command) -> 1355 return_value = get_return_value( 1356 answer, self.gateway_client, self.target_id, self.name) 1358 for temp_arg in temp_args: 1359 if hasattr(temp_arg, "_detach"): File /databricks/spark/python/pyspark/errors/exceptions/captured.py:188, in capture_sql_exception..deco(*a, **kw) 186 def deco(*a: Any, **kw: Any) -> Any: 187 try: --> 188 return f(*a, **kw) 189 except Py4JJavaError as e: 190 converted = convert_exception(e.java_exception) File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name) 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) 325 if answer[1] == REFERENCE_TYPE: --> 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". 328 format(target_id, ".", name), value) 329 else: 330 raise Py4JError( 331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n". 332 format(target_id, ".", name, value)) Py4JJavaError: An error occurred while calling o923.load. : org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Failed to parse [es.read.field.as.array.include] option with value of [PREFIX:SUFFIX2] at org.elasticsearch.hadoop.util.SettingsUtils.getFieldArrayFilterInclude(SettingsUtils.java:228) at org.elasticsearch.spark.sql.SchemaUtils$.convertToStruct(SchemaUtils.scala:129) at org.elasticsearch.spark.sql.SchemaUtils$.discoverMapping(SchemaUtils.scala:93) at org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema$lzycompute(DefaultSource.scala:229) at org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema(DefaultSource.scala:229) at org.elasticsearch.spark.sql.ElasticsearchRelation.$anonfun$schema$1(DefaultSource.scala:233) at scala.Option.getOrElse(Option.scala:189) at org.elasticsearch.spark.sql.ElasticsearchRelation.schema(DefaultSource.scala:233) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:503) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:384) at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:340) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:340) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:244) at sun.reflect.GeneratedMethodAccessor430.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:397) at py4j.Gateway.invoke(Gateway.java:306) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:199) at py4j.ClientServerConnection.run(ClientServerConnection.java:119) at java.lang.Thread.run(Thread.java:750) Caused by: org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Invalid parameter [PREFIX:SUFFIX2] specified in inclusion configuration at org.elasticsearch.hadoop.serialization.field.FieldFilter.toNumberedFilter(FieldFilter.java:189) at org.elasticsearch.hadoop.util.SettingsUtils.getFieldArrayFilterInclude(SettingsUtils.java:226) ... 24 more Caused by: java.lang.NumberFormatException: For input string: "SUFFIX2" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Integer.parseInt(Integer.java:580) at java.lang.Integer.parseInt(Integer.java:615) at org.elasticsearch.hadoop.serialization.field.FieldFilter.toNumberedFilter(FieldFilter.java:184) ... 25 more ```
Error 2 ``` org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Invalid parameter [PREFIX:SUFFIX1] specified in inclusion configuration --------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last) File , line 1 ----> 1 boilerplate({'es.read.field.include': 'PREFIX:SUFFIX1'}, "INDEX2", "PREFIX:SUFFIX2") File , line 13, in boilerplate(additional_options, index, good_field) 3 constant_options = {'es.nodes': '[REDACTED]', 'es.port': '9200', 'es.nodes.resolve.hostname': 'false', 'es.nodes.wan.only': 'true'} 5 query = dumps({"query": {"bool":{"filter": {"exists": {"field" : good_field}}}}}) 7 ( 8 spark.read 9 .format( "org.elasticsearch.spark.sql" ) 10 .options(**constant_options) 11 .options(**additional_options) 12 .option( "es.query", query) ---> 13 .load(index) 14 .select(good_field) 15 .display() 16 ) File /databricks/spark/python/pyspark/instrumentation_utils.py:47, in _wrap_function..wrapper(*args, **kwargs) 45 start = time.perf_counter() 46 try: ---> 47 res = func(*args, **kwargs) 48 logger.log_success( 49 module_name, class_name, function_name, time.perf_counter() - start, signature 50 ) 51 return res File /databricks/spark/python/pyspark/sql/readwriter.py:312, in DataFrameReader.load(self, path, format, schema, **options) 310 self.options(**options) 311 if isinstance(path, str): --> 312 return self._df(self._jreader.load(path)) 313 elif path is not None: 314 if type(path) != list: File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1355, in JavaMember.__call__(self, *args) 1349 command = proto.CALL_COMMAND_NAME +\ 1350 self.command_header +\ 1351 args_command +\ 1352 proto.END_COMMAND_PART 1354 answer = self.gateway_client.send_command(command) -> 1355 return_value = get_return_value( 1356 answer, self.gateway_client, self.target_id, self.name) 1358 for temp_arg in temp_args: 1359 if hasattr(temp_arg, "_detach"): File /databricks/spark/python/pyspark/errors/exceptions/captured.py:188, in capture_sql_exception..deco(*a, **kw) 186 def deco(*a: Any, **kw: Any) -> Any: 187 try: --> 188 return f(*a, **kw) 189 except Py4JJavaError as e: 190 converted = convert_exception(e.java_exception) File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name) 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) 325 if answer[1] == REFERENCE_TYPE: --> 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". 328 format(target_id, ".", name), value) 329 else: 330 raise Py4JError( 331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n". 332 format(target_id, ".", name, value)) Py4JJavaError: An error occurred while calling o898.load. : org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Invalid parameter [PREFIX:SUFFIX1] specified in inclusion configuration at org.elasticsearch.hadoop.serialization.field.FieldFilter.toNumberedFilter(FieldFilter.java:189) at org.elasticsearch.hadoop.serialization.dto.mapping.Mapping.filter(Mapping.java:86) at org.elasticsearch.hadoop.serialization.dto.mapping.MappingUtils.filterMapping(MappingUtils.java:138) at org.elasticsearch.spark.sql.SchemaUtils$.discoverMappingAndGeoFields(SchemaUtils.scala:109) at org.elasticsearch.spark.sql.SchemaUtils$.discoverMapping(SchemaUtils.scala:92) at org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema$lzycompute(DefaultSource.scala:229) at org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema(DefaultSource.scala:229) at org.elasticsearch.spark.sql.ElasticsearchRelation.$anonfun$schema$1(DefaultSource.scala:233) at scala.Option.getOrElse(Option.scala:189) at org.elasticsearch.spark.sql.ElasticsearchRelation.schema(DefaultSource.scala:233) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:503) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:384) at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:340) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:340) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:244) at sun.reflect.GeneratedMethodAccessor430.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:397) at py4j.Gateway.invoke(Gateway.java:306) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:199) at py4j.ClientServerConnection.run(ClientServerConnection.java:119) at java.lang.Thread.run(Thread.java:750) Caused by: java.lang.NumberFormatException: For input string: "SUFFIX1" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Integer.parseInt(Integer.java:580) at java.lang.Integer.parseInt(Integer.java:615) at org.elasticsearch.hadoop.serialization.field.FieldFilter.toNumberedFilter(FieldFilter.java:184) ... 26 more ```

Version Info

OS: Databricks Runtime 14.2 on Azure JVM : Unknown
Hadoop/Spark: Spark 3.5.0 ES-Hadoop : org.elasticsearch:elasticsearch-spark-30_2.12:7.17.1 ES : 7.17.1

Feature description