elastic / elasticsearch-hadoop

:elephant: Elasticsearch real-time search and analytics natively integrated with Hadoop
https://www.elastic.co/products/hadoop
Apache License 2.0
9 stars 990 forks source link

Getting a Null Pointer error when connecting to an ES cluster via an https proxy #1250

Open brett--anderson opened 5 years ago

brett--anderson commented 5 years ago

Issue description

Receiving a Null pointer error from org.elasticsearch.hadoop.rest.commonshttp.CommonsHttpTransport.execute while trying to execute a query against a cluster behind an SSL proxy (which load balances across the client nodes)

Steps to reproduce

from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SQLContext

# create our Spark Context  
sc_conf = SparkConf().setAll((
    ("spark.serializer", "org.apache.spark.serializer.KryoSerializer"),
))
sc_conf.setAppName("PythonSparkStreaming")

sc = SparkContext(conf=sc_conf)

sqlContext = SQLContext(sc)

q ="""{
  "query": {
    "match_all": {}
  }  
}"""

es_live_conf = {

# specify the node that we are sending data to (this should be the master)
"es.nodes" : '<proxy_address>.com',

# specify the port in case it is not the default port
"es.port" : ES_PORT,

"es.net.http.auth.user" : ES_USERNAME,

"es.net.http.auth.pass" : ES_PASSWORD,

"es.net.proxy.https.host": '<proxy_address>.com',

"es.net.proxy.https.port": ES_PORT,

"es.net.proxy.https.user": ES_USERNAME,

"es.net.proxy.https.pass": ES_PASSWORD,

# specify a resource in the form 'index/doc-type'
"es.resource" : '<index_name>/document',

"es.net.ssl":"true",

"es.net.ssl.cert.allow.self.signed": "true",

"es.nodes.wan.only": "true",

"es.index.read.missing.as.empty": "true",

# Enabling this setting conflicts with the WAN only setting, which is necessary 
# when dealing with a cluster remotely through a single SSL endpoint
# "es.nodes.client.only": "true"

}

es_live_conf["es.query"] = q

es_rdd = sc.newAPIHadoopRDD(
    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf=es_live_conf)

sqlContext.createDataFrame(es_rdd).limit(1).collect()

Strack trace:

notebook_1        | 19/02/13 20:48:42 DEBUG CommonsHttpTransportFactory: Creating new CommonsHttpTransport
notebook_1        | 19/02/13 20:48:42 DEBUG CommonsHttpTransport: SSL Connection enabled
notebook_1        | 19/02/13 20:48:42 DEBUG CommonsHttpTransport: Using authenticated HTTPS proxy [<proxy_address>.com:21203]
notebook_1        | 19/02/13 20:48:42 INFO CommonsHttpTransport: Using detected HTTP Auth credentials...
notebook_1        | 19/02/13 20:48:42 TRACE CommonsHttpTransport: Opening HTTP transport to <proxy_address>.com:21203
notebook_1        | 19/02/13 20:48:42 DEBUG HeaderProcessor: Added HTTP Headers to method: [Content-Type: application/json
notebook_1        | , Accept: application/json
notebook_1        | ]
notebook_1        | 19/02/13 20:48:42 TRACE CommonsHttpTransport: Tx [HTTPS proxy <proxy_address>:21203][HEAD]@[<proxy_address>:21203][<index_name>]?[null] w/ payload [null]
notebook_1        | 19/02/13 20:48:42 TRACE NetworkClient: Caught exception while performing request [<proxy_address>:21203][documents_v5] - falling back to the next node in line...
notebook_1        | java.lang.NullPointerException
notebook_1        |     at org.elasticsearch.hadoop.rest.commonshttp.CommonsHttpTransport.execute(CommonsHttpTransport.java:497)
notebook_1        |     at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:115)
notebook_1        |     at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:398)
notebook_1        |     at org.elasticsearch.hadoop.rest.RestClient.executeNotFoundAllowed(RestClient.java:406)
notebook_1        |     at org.elasticsearch.hadoop.rest.RestClient.exists(RestClient.java:503)
notebook_1        |     at org.elasticsearch.hadoop.rest.RestClient.indexExists(RestClient.java:498)
notebook_1        |     at org.elasticsearch.hadoop.rest.InitializationUtils.checkIndexStatus(InitializationUtils.java:74)
notebook_1        |     at org.elasticsearch.hadoop.rest.InitializationUtils.validateSettingsForReading(InitializationUtils.java:272)
notebook_1        |     at org.elasticsearch.hadoop.rest.RestService.findPartitions(RestService.java:218)
notebook_1        |     at org.elasticsearch.hadoop.mr.EsInputFormat.getSplits(EsInputFormat.java:412)
notebook_1        |     at org.elasticsearch.hadoop.mr.EsInputFormat.getSplits(EsInputFormat.java:393)
notebook_1        |     at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:130)
notebook_1        |     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
notebook_1        |     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
notebook_1        |     at scala.Option.getOrElse(Option.scala:121)
notebook_1        |     at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
notebook_1        |     at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
notebook_1        |     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
notebook_1        |     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
notebook_1        |     at scala.Option.getOrElse(Option.scala:121)
notebook_1        |     at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
notebook_1        |     at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1343)
notebook_1        |     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
notebook_1        |     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
notebook_1        |     at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
notebook_1        |     at org.apache.spark.rdd.RDD.take(RDD.scala:1337)
notebook_1        |     at org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:239)
notebook_1        |     at org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDD(PythonRDD.scala:302)
notebook_1        |     at org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD(PythonRDD.scala)
notebook_1        |     at sun.reflect.GeneratedMethodAccessor53.invoke(Unknown Source)
notebook_1        |     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
notebook_1        |     at java.lang.reflect.Method.invoke(Method.java:498)
notebook_1        |     at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
notebook_1        |     at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
notebook_1        |     at py4j.Gateway.invoke(Gateway.java:282)
notebook_1        |     at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
notebook_1        |     at py4j.commands.CallCommand.execute(CallCommand.java:79)
notebook_1        |     at py4j.GatewayConnection.run(GatewayConnection.java:238)

Version Info

OS: : jupyter/datascience-notebook, Docker, (Ubuntu 18.04) JVM : 1.8.0_191 Hadoop/Spark: 2.7 / 2.4 ES-Hadoop : 6.6.0 ES : 5.6.5

Updated

I've tried numerous configuration changes but in all scenarios where I have the https proxy configured I get the Null Pointer Exception. Looking at the code for 6.6 it seems to be thrown from this line:

        if (log.isTraceEnabled()) {
            Socket sk = ReflectionUtils.invoke(GET_SOCKET, conn, (Object[]) null);
>>          String addr = sk.getLocalAddress().getHostAddress();
            log.trace(String.format("Rx %s@[%s] [%s-%s] [%s]", proxyInfo, addr, http.getStatusCode(), HttpStatus.getStatusText(http.getStatusCode()), http.getResponseBodyAsString()));
        }

I'm also getting the same error with elasticsearch-hadoop-7.0.0-beta1. The line number is different (669) but it's still pointing at the same block of code.

jbaiera commented 5 years ago

@brett--anderson thanks for opening a bug report on this. Just as a debugging step/workaround idea: does the NPE occur when TRACE logging is disabled for your run?