JackYumCha / QA

Question and Answer
0 stars 0 forks source link

Geo Intersection Analysis #27

Open JackYumCha opened 6 years ago

JackYumCha commented 6 years ago
package org.example.ds

import org.example.ds.SparkBuilder

import collection.JavaConversions._
import java.io.File
import java.sql.Timestamp
import java.util.ArrayList
import java.util.regex.Pattern

import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.storage.StorageLevel
import org.datasyslab.geosparksql.utils.GeoSparkSQLRegistrator
import org.datasyslab.geosparkviz.core.Serde.GeoSparkVizKryoRegistrator

import scala.util.control._

object GeoCsvTasks {
  def AnalyzeGeoCsv() ={
    val sparkSession = SparkBuilder.CreateGeoSpark()

    val congressionalsCsv = sparkSession.read.format("csv")
      .option("header","true").option("delimiter",",")
      .load("C:\\Users\\erris\\Desktop\\Lecture\\Chapter 9\\congressional.csv")
    congressionalsCsv.createOrReplaceTempView("congressionalsCsv")
    congressionalsCsv.persist(StorageLevel.MEMORY_ONLY)

    val countiesCsv = sparkSession.read.format("csv")
      .option("header","true").option("delimiter",",")
      .load("C:\\Users\\erris\\Desktop\\Lecture\\Chapter 9\\counties.csv")
    countiesCsv.createOrReplaceTempView("countiesCsv")
    countiesCsv.persist(StorageLevel.MEMORY_ONLY)

    val congressionalsDf = sparkSession.sql(
      """
        |SElECT id,ST_GeomFromWKT(wkt) as congressional_shape
        |FROM congressionalsCsv
      """.stripMargin)
    congressionalsDf.createOrReplaceTempView("congressionalsDf")
    congressionalsDf.persist(StorageLevel.MEMORY_ONLY)

    congressionalsDf.show()

    val countiesDf = sparkSession.sql(
      """
        |SElECT id,ST_GeomFromWKT(wkt) as county_shape
        |FROM countiesCsv
      """.stripMargin)
    countiesDf.createOrReplaceTempView("countiesDf")
    countiesDf.persist(StorageLevel.MEMORY_ONLY)

    countiesDf.show()

    val intersectsDf = sparkSession.sql(
      """
        |SELECT countiesDf.id as county_id,congressionalsDf.id as congresional_id
        |FROM countiesDf
        |JOIN congressionalsDf ON
        |ST_INTERSECTS(county_shape,congressional_shape)
      """.stripMargin)
    intersectsDf.createOrReplaceTempView("intersectsDf")
    intersectsDf.persist(StorageLevel.MEMORY_ONLY)
    intersectsDf.show(50)

    val countySummaryDf = sparkSession.sql(
      """
        |SELECT distinct county_id, count(congresional_id) as number_of_congression
        |FROM intersectsDf
        |Group By county_id
      """.stripMargin)

    println("county summary: ")
    countySummaryDf.show(50)

    countySummaryDf.coalesce(1).write.format("csv")
      .option("header","true").option("delimiter",",")
      .mode(SaveMode.Overwrite)
      .save("C:\\Users\\erris\\Desktop\\Lecture\\Chapter 9\\county_summary")

  }
}