JackYumCha / QA

Question and Answer
0 stars 0 forks source link

SparkBuilder #23

Open JackYumCha opened 6 years ago

JackYumCha commented 6 years ago
package org.example.ds

import collection.JavaConversions._
import java.io.File
import java.sql.Timestamp
import java.util.ArrayList
import java.util.regex.Pattern

import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.datasyslab.geosparksql.utils.GeoSparkSQLRegistrator
import org.datasyslab.geosparkviz.core.Serde.GeoSparkVizKryoRegistrator
import scala.util.control._

object SparkBuilder {
  def CreateGeoSpark():SparkSession = {
    val sparkSession = SparkSession.builder()
      .master("local[*]")
      .appName("geoSparkTest")
      // register geoSpark functionalities such as indexing
      .config("spark.driver.maxResultSize", "32g")
      .config("spark.ui.enabled", "true")
      .config("spark.serializer", classOf[KryoSerializer].getName)
      // either GeoSparkKryoRegistrator or GeoSparkVizKryoRegistrator register geoSpark functionalities such as indexing
      .config("spark.kryo.registrator", classOf[GeoSparkVizKryoRegistrator].getName)
      .config("geospark.global.index","true")
      .config("geospark.global.indextype","rtree")
      .config("geospark.join.gridtype","quadtree")
      .getOrCreate()
    sparkSession.sparkContext.setLogLevel("ERROR")
    GeoSparkSQLRegistrator.registerAll(sparkSession)
    RegisterTimeFunctions(sparkSession)
    return sparkSession;
  }

  def RegisterTimeFunctions(sparkSession: SparkSession)={
    val timestamp_add = (startTime: Timestamp, interval: Long, unit: String) => {
      unit.toLowerCase() match {
        case "s" => new Timestamp(startTime.getTime() + interval * 1000)
        case "m" => new Timestamp(startTime.getTime() + interval * 60000)
        case "h" => new Timestamp(startTime.getTime() + interval * 3600000)
        case "d" => new Timestamp(startTime.getTime() + interval * 86400000)
      }
    }
    sparkSession.sqlContext.udf.register("timestamp_add", timestamp_add)
  }

  def GetAllFiles(folderRoot: String, endPattern: String):ArrayList[String]={
    println(s"Search Folder: ${folderRoot}")
    val results:ArrayList[String] = new ArrayList[String]()

    val dir = new File(folderRoot)
    if(dir.exists() && dir.isDirectory()){
      val fileList = dir.listFiles.filter(_.isFile()).toList
      for( inputFile <- fileList){
        //println(s"File: ${inputFile.getName()}, match: ${inputFile.getName().endsWith(".csv")}")
        if(inputFile.getName().endsWith(endPattern)){
          results.add(inputFile.getAbsolutePath())
        }
      }
      val dirList = dir.listFiles.filter(_.isDirectory()).toList
      for(inputDir <- dirList){
        results.addAll(GetAllFiles(inputDir.getAbsolutePath(), endPattern))
      }
    }
    return results
  }

  def ReadAllCsv(sparkSession: SparkSession, folderRoot: String, headerOption: String, delimiter: String):DataFrame ={
    var unionDf:DataFrame = null
    for(csvFile:String <- GetAllFiles(folderRoot,".csv")){
      println(s"File: ${csvFile}")
      if(unionDf == null){
        unionDf = sparkSession.read.option("header",headerOption).option("delimiter",delimiter).csv(csvFile)
      }
      else{
        unionDf = unionDf.union(sparkSession.read.option("header","true").option("delimiter",",").csv(csvFile))
      }
    }
    return unionDf
  }

  def ReadOneCsv(sparkSession: SparkSession, folderRoot: String, headerOption: String, delimiter: String):DataFrame ={
    var unionDf:DataFrame = null

    val loop = new Breaks

    loop.breakable{
      for(csvFile:String <- GetAllFiles(folderRoot,".csv")){
        println(s"File: ${csvFile}")
        if(unionDf == null){
          unionDf = sparkSession.read.option("header",headerOption).option("delimiter",delimiter).csv(csvFile)
        }
        else{
          unionDf = unionDf.union(sparkSession.read.option("header","true").option("delimiter",",").csv(csvFile))
        }
        loop.break
      }
    }

    return unionDf
  }

}