elastic / elasticsearch-hadoop

:elephant: Elasticsearch real-time search and analytics natively integrated with Hadoop
https://www.elastic.co/products/hadoop
Apache License 2.0
1.93k stars 990 forks source link

ES-Hadoop and PySpark SQL Failing with IN Statement #1307

Open Cyb3rWard0g opened 5 years ago

Cyb3rWard0g commented 5 years ago

What kind an issue is this?

Issue description

I am trying to use a basic SQL IN statement to match a field value module_loaded from security event logs with a list of values.

SECURITY EVENT SAMPLE (ONE)

When I run the following query in Kibana: event_id:7 AND process_name:"powershell.exe" AND module_loaded:*samlib.dll

I get to the event I want.

{
  "_index": "logs-endpoint-winevent-sysmon-2019.05.18",
  "_type": "_doc",
  "_id": "7436fb5dc506bdd5fbc482a5c4a726402ab38e19",
  "_version": 1,
  "_score": null,
  "_source": {
    "module_loaded": "c:\\windows\\system32\\samlib.dll",
    "process_name": "powershell.exe",
    "process_guid": "03ba39f5-50b2-5ce0-0000-00109995c501",
    "process_path": "c:\\windows\\system32\\windowspowershell\\v1.0\\powershell.exe",
    "beat_hostname": "WECserver",
    "action": "moduleload",
    "event_id": 7,
    "beat_version": "6.7.0",
  }
}

DATA SAMPLE (TWO) When I run the following query in Kibana: event_id:7 AND process_name:"powershell.exe" AND module_loaded:*hid.dll

I get to the event I want:

{
  "_index": "logs-endpoint-winevent-sysmon-2019.05.18",
  "_type": "_doc",
  "_id": "25f618931e3c411d648e9db3e416db0c0cf0e1e8",
  "_version": 1,
  "_score": null,
  "_source": {
    "module_loaded": "c:\\windows\\system32\\hid.dll",
    "process_name": "powershell.exe",
    "process_guid": "03ba39f5-50b2-5ce0-0000-00109995c501",
    "process_path": "c:\\windows\\system32\\windowspowershell\\v1.0\\powershell.exe",
    "beat_hostname": "WECserver",
    "action": "moduleload",
    "event_id": 7,
    "beat_version": "6.7.0",
  }
}

Now I want to reproduce something similar with Apache SparkSQL via PySparK. I start by initializing the SprakSession and registering a SQL table mapped to the index I am using to find the two records above.

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("HELK JOIN") \
    .master("spark://helk-spark-master:7077") \
    .enableHiveSupport() \
    .getOrCreate()

es_reader = (spark.read
    .format("org.elasticsearch.spark.sql")
    .option("inferSchema", "true")
    .option("es.read.field.as.array.include", "tags")
    .option("es.nodes","helk-elasticsearch:9200")
    .option("es.net.http.auth.user","elastic")
)

security_df = es_reader.load("logs-endpoint-winevent-sysmon-*/")
security_df.createOrReplaceTempView("sysmon_events")

Now I can run SQL queries on it. I want to use the SQL IN statement and replicate the first query I ran early.

module_loaded = spark.sql(
    '''
    SELECT event_id,
        host_name,
        process_name,
        module_loaded
    FROM sysmon_events
    WHERE event_id = 7
        AND module_loaded IN ("c:\\\windows\\\system32\\\samlib.dll")
    '''
)
module_loaded.show(5,False)
+--------+---------------+--------------+------------------------------+
|event_id|host_name      |process_name  |module_loaded                 |
+--------+---------------+--------------+------------------------------+
|7       |it001.shire.com|powershell.exe|c:\windows\system32\samlib.dll|
|7       |hr001.shire.com|powershell.exe|c:\windows\system32\samlib.dll|
|7       |hr001.shire.com|powershell.exe|c:\windows\system32\samlib.dll|
|7       |it001.shire.com|powershell.exe|c:\windows\system32\samlib.dll|
|7       |hr001.shire.com|svchost.exe   |c:\windows\system32\samlib.dll|
+--------+---------------+--------------+------------------------------+
only showing top 5 rows

That works. However, what if I want to have both values c:\windows\system32\samlib.dll and c:\windows\system32\hid.dll inside of the IN statement? it unfortunately fails:

module_loaded = spark.sql(
    '''
    SELECT event_id,
        host_name,
        process_name,
        module_loaded
    FROM sysmon_events
    WHERE event_id = 7
        AND module_loaded IN ("c:\\\windows\\\system32\\\samlib.dll", "c:\\\windows\\\system32\\\hid.dll")
    '''
)
module_loaded.show(5,False)

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-120-45c477530d44> in <module>
----> 1 module_loaded.show(5,False)

/opt/helk/spark/python/pyspark/sql/dataframe.py in show(self, n, truncate, vertical)
    378             print(self._jdf.showString(n, 20, vertical))
    379         else:
--> 380             print(self._jdf.showString(n, int(truncate), vertical))
    381 
    382     def __repr__(self):

/opt/helk/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

/opt/helk/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/opt/helk/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o1384.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 68.0 failed 4 times, most recent failure: Lost task 0.3 in stage 68.0 (TID 175, 172.18.0.4, executor 24): org.apache.spark.util.TaskCompletionListenerException: Failed to parse filter: {"bool":{"should":[{"match":{"module_loaded":"c:\windows\system32\samlib.dll c:\windows\system32\hid.lab"}}]}}

Previous exception in task: Failed to parse filter: {"bool":{"should":[{"match":{"module_loaded":"c:\windows\system32\samlib.dll c:\windows\system32\hid.lab"}}]}}
    org.elasticsearch.hadoop.rest.query.QueryUtils.parseFilters(QueryUtils.java:74)
    org.elasticsearch.hadoop.rest.RestService.createReader(RestService.java:453)
    org.elasticsearch.spark.rdd.AbstractEsRDDIterator.reader$lzycompute(AbstractEsRDDIterator.scala:49)
    org.elasticsearch.spark.rdd.AbstractEsRDDIterator.reader(AbstractEsRDDIterator.scala:42)
    org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:61)
    scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
    org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
    org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
    org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
    org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    org.apache.spark.scheduler.Task.run(Task.scala:121)
    org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    java.lang.Thread.run(Thread.java:748)
    at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:138)
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:116)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
    at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2758)
    at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
    at sun.reflect.GeneratedMethodAccessor82.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

Version Info

OS: : Official Elasticsearch Docker Image 7.1.0 (CentOS Linux 7 (Core)) JVM : OpenJDK 1.8.0_191 (SPARK) Hadoop/Spark: 2.4.3 ES-Hadoop : 7.1.0 ES : 7.1.0

Cyb3rWard0g commented 5 years ago

I was able to Access the Spark executor err logs and I got more information:

19/06/21 02:51:26 ERROR TaskContextImpl: Error in TaskCompletionListener
java.lang.IllegalArgumentException: Failed to parse filter: {"bool":{"should":[{"match":{"module_loaded":"c:\windows\system32\samlib.dll c:\windows\system32\hid.lab"}}]}}
    at org.elasticsearch.hadoop.rest.query.QueryUtils.parseFilters(QueryUtils.java:74)
    at org.elasticsearch.hadoop.rest.RestService.createReader(RestService.java:453)
    at org.elasticsearch.spark.rdd.AbstractEsRDDIterator.reader$lzycompute(AbstractEsRDDIterator.scala:49)
    at org.elasticsearch.spark.rdd.AbstractEsRDDIterator.reader(AbstractEsRDDIterator.scala:42)
    at org.elasticsearch.spark.rdd.AbstractEsRDDIterator.close(AbstractEsRDDIterator.scala:81)
    at org.elasticsearch.spark.rdd.AbstractEsRDDIterator.closeIfNeeded(AbstractEsRDDIterator.scala:74)
    at org.elasticsearch.spark.rdd.AbstractEsRDDIterator$$anonfun$1.apply$mcV$sp(AbstractEsRDDIterator.scala:54)
    at org.elasticsearch.spark.rdd.AbstractEsRDDIterator$$anonfun$1.apply(AbstractEsRDDIterator.scala:54)
    at org.elasticsearch.spark.rdd.AbstractEsRDDIterator$$anonfun$1.apply(AbstractEsRDDIterator.scala:54)
    at org.elasticsearch.spark.rdd.CompatUtils$1.onTaskCompletion(CompatUtils.java:112)
    at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:117)
    at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:117)
    at org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:130)
    at org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:128)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:128)
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:116)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.codehaus.jackson.JsonParseException: Unrecognized character escape 'w' (code 119)
 at [Source: java.io.StringReader@63fe6467; line: 1, column: 51]

I seems to be more of a syntax issue.

Cyb3rWard0g commented 5 years ago

If I escape the backslashes with another backslash (not 3 as my initial queries above)

module_loaded = spark.sql(
    '''
    SELECT event_id,
        host_name,
        process_name,
        module_loaded
    FROM sysmon_events
    WHERE event_id = 7
        AND module_loaded IN ("c:\\windows\\system32\\samlib.dll","c:\\windows\\system32\\hid.lab")
    '''
)

I get the following logical plan. As you can see, the backslashes are not taken and it just shows a long string with all the text of the module path together

== Parsed Logical Plan ==
'Project ['event_id, 'host_name, 'process_name, 'module_loaded]
+- 'Filter (('event_id = 7) && 'module_loaded IN (c:windowssystem32samlib.dll,c:windowssystem32hid.lab))
   +- 'UnresolvedRelation `sysmon_events`
aliasbadwolf commented 5 years ago

@Cyb3rWard0g its tricky and painful and not related to Elasticsearch.

Since Spark and Elastic are written in Scala and Java respectively (both being JVM languages), the pySpark code first gets converted to Scala code. Now in Java (and scala) "\" is an "escape" character so if you were writing the last code directly in Scala or Java i.e. module_loaded = spark.sql( ''' SELECT event_id, host_name, process_name, module_loaded FROM sysmon_events WHERE event_id = 7 AND module_loaded IN ("c:\\windows\\system32\\samlib.dll","c:\\windows\\system32\\hid.lab") ''' )

then it would have resulted in +- 'Filter (('event_id = 7) && 'module_loaded IN (c:\windows\system32\samlib.dll,c:\windows\system32\hid.lab)) as the first "\" will be consumed in escaping the second one.

However, things gets interesting when code gets converted from Python to Scala. It will work this way.

Input Unicode characters (just taking partial code) to scala would be: AND module_loaded IN ("c:\\windows\\system32\\samlib.dll","c:\\windows\\system32\\hid.lab")

which when converted to Scala code (i.e. scala code is generated) will become (considering the explanation above about first "\" being escape character) -> AND module_loaded IN ("c:\windows\system32\samlib.dll","c:\windows\system32\hid.lab")

which when converted to Java Bytecode will become -> AND module_loaded IN ("c:windowssystem32samlib.dll","c:windowssystem32hid.lab")

and thats what is happening.

So if the end result you want is single "\" then use 4 "\" in your code and if you want the end result to have 2 "\" (i.e. \) then use 8 "\" in python code.

Hope it helps!!

aliasbadwolf commented 5 years ago

@Cyb3rWard0g Also, if possible rather than storing paths with "\" in Elasticsearch store them with Forward Slashes, unless there is a stringent requirement to have them with "\".

vernepenn commented 4 years ago

I have a similar problem with scala code.

Elasticsearch version (bin/elasticsearch --version):6.3.1

Plugins installed: []

JVM version (java -version):1.8

OS version (uname -a if on a Unix-like system):linux

Description of the problem including expected versus actual behavior:

val sparkConf = new SparkConf() .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.es.index.auto.create", "true") .set("spark.es.nodes", "nodes") .set("spark.es.port", "9200") .set("spark.es.resource", "index_name") .set("spark.es.nodes.wan.only", "true").set("spark.master", "local[4]")

val essessionDataFrame = spark.sqlContext.read .format("org.elasticsearch.spark.sql") .option("inferSchema", "true").load("index_name/type") essessionDataFrame.createOrReplaceTempView(hiveTableName) essessionDataFrame.show() spark.catalog.refreshTable(hiveTableName) val sql = s"SELECT DISTINCT memid FROM $hiveTableName WHERE town_no IN ('21400','23500')" spark.sql(sql).rdd.count()

  1. index_name/type has 3 records whether town_no = 21400 or town_no = 23500
  2. when I run sql = s"SELECT DISTINCT memid FROM $hiveTableName WHERE town_no IN ('21400')" or s"SELECT DISTINCT memid FROM $hiveTableName WHERE town_no IN ('23500')" it works and return 3 records
  3. but I run sql = s"SELECT DISTINCT memid FROM $hiveTableName WHERE town_no IN ('21400','23500')" , it return 0 record.