apache / sedona

A cluster computing framework for processing large-scale geospatial data
https://sedona.apache.org/
Apache License 2.0
1.95k stars 695 forks source link

Spark Jobs aren't distributed across workers #413

Closed dars1608 closed 4 years ago

dars1608 commented 4 years ago

Expected behavior

Spark should be using all of the given resources, but only runs on 2 workers.

Actual behavior

Screenshot from History Server Web UI: http://deviantpics.com/image/problem.08E

Steps to reproduce the problem

Code snippet:

kafkaStreamIn.foreachRDD(
      rdd => {
        if (!rdd.isEmpty()) {
          val inputRDD = new SpatialRDD[Geometry]
          inputRDD.rawSpatialRDD = rdd.map(c => c.value()).toJavaRDD()

          inputRDD.analyze()
          inputRDD.spatialPartitioning(
            GridType.getGridType(env.getProperty(SPATIAL_PARTITIONING)),
            env.getProperty(NUM_OF_SPATIAL_PARTITIONS).toInt
          )

          val indexType = env.getProperty(SPATIAL_INDEX)
          if (indexType != null) {
            inputRDD.buildIndex(IndexType.getIndexType(indexType), true)
          }

          subscriptionsRDD.spatialPartitioning(inputRDD.getPartitioner)
          if (indexType != null) {
            subscriptionsRDD.buildIndex(IndexType.getIndexType(indexType), true)
          }

          val result = JoinQuery.SpatialJoinQuery(inputRDD, subscriptionsRDD, indexType != null, true)
          result.foreach((geometryPair: (Geometry, util.HashSet[Geometry])) => {
            geometryPair._2.asScala.foreach((dataGeometry: Geometry) => {
              val dataId = dataGeometry.getUserData.asInstanceOf[String].toLong
              val subscriptionId = geometryPair._1.getUserData.asInstanceOf[String].split("\t")(0).toLong

              kafkaProducer.value.send(SubscriptionPairModel(dataId, subscriptionId))
            })
          })
        }
      }

subscriptionsRDD represents a dataset consisted of 1000 polygons. inputRDD is a batch of point data, typically around 35000 points per batch. I tried to use KDBTree, QuadTree, RTree and Voronoi diagram for partitioning (this particular case was using KDBTree with QuadTree index. I tried to manually set the number of partitions, but it didn't work. I also tried to change the dominant partitioning side for JoinQuery.SpatialJoinQuery.

Full code is available here: https://gitlab.com/dars1608/geospatial-index-distributed

Settings

GeoSpark version = 1.2.0

Apache Spark version = 2.4.0

JRE version = 1.8

API type = Scala

jiayuasu commented 4 years ago

@dars1608 Hello, most likely, your cluster has some issues. GeoSpark does not change the scheduling module in Spark.

dars1608 commented 4 years ago

@jiayuasu Isn't the job scheduling dependent on the partitioning of the data?

jiayuasu commented 4 years ago

@dars1608 Nope. They do not have direct connection. I would suggest that you open the "stderr" on your history server to check what the actual error it. There might be some other errors that lead to the "dead" workers.

dars1608 commented 4 years ago

You were right, the problem (partially) occured due the some of the spark-submit configurations. Maybe there's also some bad values in the YARN configuration as well.

Thank you!