elastic / elasticsearch-hadoop

:elephant: Elasticsearch real-time search and analytics natively integrated with Hadoop
https://www.elastic.co/products/hadoop
Apache License 2.0
1.93k stars 989 forks source link

Latency Spike during Spark Structured Streaming #2165

Open sinban04 opened 10 months ago

sinban04 commented 10 months ago

What kind an issue is this?

Issue description

I'm using elasticsearch-hadoop to stream data into Elasticsearch server using Spark Structured Streaming. But during streaming, it shows repetitive and periodic latency spike of Operation Duration in spark.

image

Spark

When i tried Structured Streaming w/ console sink (print to console), it shows the stable low latency

image

ES

I digged into ES montoring w/ Grafana, but it shows nothing special w/ ES. It shows indexing time lower than 50ms and it doesn't matter w/ that high latency in streaming (about 50s)

ES-Hadoop connector

I saw the elasticsearch-hadoop code and i found out that it uses Bulk API to send Dataframe to ES w/ Hadoop HTTP Client. At first, i believed that ES does not response fast enough to spark to commit the operation even if it has done indexing But, for now i have no idea about this latency spike

Do you have any case like this ?

Steps to reproduce

    # ES Configs
    val spark = 
            SparkSession.builder
            .appName(s"Mobile Click Streaming => $datetime")
...
            .config("es.port", "10200")
            .config("es.nodes.wan.only","true")
            .config("es.index.auto.create", "true")
...
            .getOrCreate

    val esQuery = df
      .writeStream
      .outputMode("append")
      .queryName("writing_to_es")
      .format("org.elasticsearch.spark.sql")
      .trigger(org.apache.spark.sql.streaming.Trigger.ProcessingTime("60 seconds"))
      .start("feedback_latency_debug")

Version Info

OS: : Centos7

$ lsb_release -d
Description:    CentOS Linux release 7.9.2009 (Core)

JVM : Java 1.8.0_112

$ java -version
java version "1.8.0_112"
Java(TM) SE Runtime Environment (build 1.8.0_112-b15)
Java HotSpot(TM) 64-Bit Server VM (build 25.112-b15, mixed mode)

Hadoop/Spark: Spark 3.1.2-2, Hadoop 3.1.1.3.1.2-39 ES-Hadoop : 7.12.1 (Scala Spark https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark-30) ES : 7.10.1 (Elasticsearch Server)

Feature description

sinban04 commented 10 months ago

Pyspark w/ Python bulk api

I tested the same task w/ Elasticsearch Python API I changed the implementation of spark structured streaming from scala into python to use python API. I used Pyspark w/ same spark version(3.1.2-2) and used foreachBatch instead of Elasticsearch Sink. In order to use python Bulk API, i used collect() and toJSON() to make data list

    def updateToESPythonAPI(df, epoch_id):
        es = Elasticsearch([
            'http://esfarm-cluster.~~~~~~~.com:10200',
        ], http_auth=('id', 'pw!'))
        index_name = "structured_python01"
        data = []

        for row in df.toJSON().collect():
            json_ = json.loads(row)
            data.append(
                {
                    "_index": index_name,
                    "_id": json_['doc_id'],
                    "_source": row,
                }
            )
        count = len(data)
        start = time.time()
        helpers.bulk(es, data)
        end = time.time()
        print("ES Bulk Sended !")
        print(f"It took {end - start:.5f} sec sending {count} items. \n")

and it showed no latency spike image

I doubt there's some bug in elasticsearch-spark-30 connector

Or just the discrepancy of the version could raise unexpected behavior ?

Pyspark w/ elasticsearch Sink

And w/ Elasticsearch Connector, it shows same results as in Scala Spark

image

sinban04 commented 10 months ago

Scala Spark + foreachBatch + Batch API

Hi guys, i tested again with Scala Spark (3.1.2-2) using normal batch API (df.write.format("org.elasticsearch.spark.sql") ) w/ foreachBatch

It's just as same as Streaming Sink but, rather used batch API in foreachBatch function.

        val esQuery = df
          .writeStream
          .queryName("ES Bulk Update")
          .option("truncate", "false")
          .format("console")
          .trigger(org.apache.spark.sql.streaming.Trigger.ProcessingTime("5 seconds"))
          .outputMode("append")
          .foreachBatch { 
            (batchDF: DataFrame, batchId: Long) =>
              ...
              batchDF.write
                  .format("org.elasticsearch.spark.sql")
                  .option("es.nodes", endpoint)
                  .option("es.resource", index)
                  .option("es.mapping.id", "doc_id") 
                  .option("es.net.http.auth.user", username)
                  .option("es.net.http.auth.pass", password)
                  .option("es.nodes.wan.only", "true")
                  .option("es.write.operation", "upsert")
                  .mode("append")
                  .save()
          }
          .start()

And it displayed no latency spike

image

I believe that there's some problem in the writeStream API of connector