cjuexuan / mynote

237 stars 34 forks source link

spark sql读取parquet优化 #47

Open cjuexuan opened 6 years ago

cjuexuan commented 6 years ago

背景

最近遇到一个case

 load parquet.`/basePath` as tb;
 select * from tb where dt='20170102' limit 10;

实际等价于sparkSession.read.parquet(basePath).registerTempView("tb");

在这个过程中发现在分区文件比较多的时候耗时还是比较严重的,观察了下执行计划,发现20170101已经被作为filter push下去了

原因分析

于是跟踪了下相关的源码,调用链路如下

  1. DataFrameReader.load
  2. sparkSession.baseRelationToDataFrame
  3. DataSource.resolveRelation
  4. Datasource.getOrInferFileFormatSchema
  5. lazy tempFileIndex
  6. InMemoryFileIndex

其中InMemoryFileIndex的创建还是一个很耗性能的,但是我们发现是可以把这个阶段skip掉的

源码大致如下

  private def getOrInferFileFormatSchema(
      format: FileFormat,
      fileStatusCache: FileStatusCache = NoopCache): (StructType, StructType) = {
    // the operations below are expensive therefore try not to do them if we don't need to, e.g.,
    // in streaming mode, we have already inferred and registered partition columns, we will
    // never have to materialize the lazy val below
    lazy val tempFileIndex = {
      val allPaths = caseInsensitiveOptions.get("path") ++ paths
      val hadoopConf = sparkSession.sessionState.newHadoopConf()
      val globbedPaths = allPaths.toSeq.flatMap { path =>
        val hdfsPath = new Path(path)
        val fs = hdfsPath.getFileSystem(hadoopConf)
        val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
        SparkHadoopUtil.get.globPathIfNecessary(qualified)
      }.toArray
      new InMemoryFileIndex(sparkSession, globbedPaths, options, None, fileStatusCache)
    }
    val partitionSchema = if (partitionColumns.isEmpty) {
      // Try to infer partitioning, because no DataSource in the read path provides the partitioning
      // columns properly unless it is a Hive DataSource
      val resolved = tempFileIndex.partitionSchema.map { partitionField =>
        val equality = sparkSession.sessionState.conf.resolver
        // SPARK-18510: try to get schema from userSpecifiedSchema, otherwise fallback to inferred
        userSpecifiedSchema.flatMap(_.find(f => equality(f.name, partitionField.name))).getOrElse(
          partitionField)
      }
      StructType(resolved)
    } else {
      // maintain old behavior before SPARK-18510. If userSpecifiedSchema is empty used inferred
      // partitioning
      if (userSpecifiedSchema.isEmpty) {
        val inferredPartitions = tempFileIndex.partitionSchema
        inferredPartitions
      } else {
        val partitionFields = partitionColumns.map { partitionColumn =>
          val equality = sparkSession.sessionState.conf.resolver
          userSpecifiedSchema.flatMap(_.find(c => equality(c.name, partitionColumn))).orElse {
            val inferredPartitions = tempFileIndex.partitionSchema
            val inferredOpt = inferredPartitions.find(p => equality(p.name, partitionColumn))
            if (inferredOpt.isDefined) {
              logDebug(
                s"""Type of partition column: $partitionColumn not found in specified schema
                   |for $format.
                   |User Specified Schema
                   |=====================
                   |${userSpecifiedSchema.orNull}
                   |
                   |Falling back to inferred dataType if it exists.
                 """.stripMargin)
            }
            inferredOpt
          }.getOrElse {
            throw new AnalysisException(s"Failed to resolve the schema for $format for " +
              s"the partition column: $partitionColumn. It must be specified manually.")
          }
        }
        StructType(partitionFields)
      }

由于tempFileIndex的创建本身是lazy的,而创建的边界是userSpecifiedSchema,partitionSchema不同时存在的情况下,会调用到创建过程

解决思路

所以fix的思路就是利用我们现有的元数据系统,把这部分东西都传进去,事实上Datasource本身也可以访问到,我们自己去调用apply方法即可,刚好公司又有一个元数据系统,那些信息元数据那边我们可以rp拿到,所以就直接拿来用了

另外贴一个databricks的博客