cjuexuan / mynote

237 stars 34 forks source link

spark partition discovery #17

Open cjuexuan opened 8 years ago

cjuexuan commented 8 years ago

spark partition discovery

类似hive的分区表,在分区表中数据会分开存储在不同文件夹,区分的标准是分区字段,现在parquet的数据源可以自动的发现分区信息

写入指定分区依据:

    val conf = new SparkConf().setMaster("local[*]").setAppName("partition")
    val sparkContext = new SparkContext(conf)
    val sqlContext = new SQLContext(sparkContext)
    import sqlContext.implicits._
    val data = Seq(
      ("2014",1),
      ("2014",2),
      ("2015",1)
    )
    sparkContext.setLogLevel("ERROR")
    sparkContext.parallelize(data).toDF("year","num").
      write.mode(SaveMode.Overwrite).
      partitionBy(Array("year"):_*)
      .parquet("/Users/cjuexuan/data/partition")

关键函数:partitionBy

  /**
   * Partitions the output by the given columns on the file system. If specified, the output is
   * laid out on the file system similar to Hive's partitioning scheme.
   *
   * This was initially applicable for Parquet but in 1.5+ covers JSON, text, ORC and avro as well.
   *
   * @since 1.4.0
   */
  @scala.annotation.varargs
  def partitionBy(colNames: String*): DataFrameWriter = {
    this.partitioningColumns = Option(colNames)
    this
  }

从这个函数我们可以看出按照我们给出的字段完成了一个partition的过程,接受一个多参数的字段列表

查看输出路径:

_SUCCESS         _common_metadata _metadata        year=2014        year=2015

我们可以发现按照我们的分区规则,在输出路径上建立了两个文件夹,分别为year=2014year=2015

➜  year=2015 ls
part-r-00003-ede03c1f-2bc2-4a1d-ac29-34ee9c777239.gz.parquet
➜  year=2015 ls  ../year=2014
part-r-00001-ede03c1f-2bc2-4a1d-ac29-34ee9c777239.gz.parquet part-r-00002-ede03c1f-2bc2-4a1d-ac29-34ee9c777239.gz.parquet

更细致的查看,我们发现在2014文件夹下有两个parquet文件,在2014只有一个,说明按照字段的分区方式下分别在对应的文件夹下建立了文件,而这个自动发现功能也应该和他有关

读取

sqlContext.read.parquet("/Users/cjuexuan/data/partition/year=2015").show()
println(s"${"- " * 20}")
sqlContext.read.parquet("/Users/cjuexuan/data/partition/year=2014").show()

此时的输出为各自分区的:

16/04/15 17:34:24 INFO ParquetRelation: Listing file:/Users/cjuexuan/data/partition/year=2015 on driver
+---+
|num|
+---+
|  1|
+---+

16/04/15 17:34:28 INFO ParquetRelation: Listing file:/Users/cjuexuan/data/partition/year=2014 on driver
+---+
|num|
+---+
|  1|
|  2|
+---+

note: 在1.6版本开始,如果用了分区路径,那么会丢失用于分区路径那个字段,只能通过basedir拿到,而把分区属性变成分区字段

    sqlContext.read.parquet("/Users/cjuexuan/data/partition").filter($"year" === "2015").show()

输出:

16/04/15 17:42:04 INFO ParquetRelation: Listing file:/Users/cjuexuan/data/partition on driver
16/04/15 17:42:04 INFO ParquetRelation: Listing file:/Users/cjuexuan/data/partition/year=2014 on driver
16/04/15 17:42:04 INFO ParquetRelation: Listing file:/Users/cjuexuan/data/partition/year=2015 on driver

16/04/15 17:42:08 INFO ParquetRelation: Reading Parquet file(s) from file:/Users/cjuexuan/data/partition/year=2015/part-r-00003-ede03c1f-2bc2-4a1d-ac29-34ee9c777239.gz.parquet

16/04/15 17:42:08 INFO ParquetRelation$$anonfun$buildInternalScan$1$$anon$1: Input split: ParquetInputSplit{part: file:/Users/cjuexuan/data/partition/year=2015/part-r-00003-ede03c1f-2bc2-4a1d-ac29-34ee9c777239.gz.parquet start: 0 end: 359 length: 359 hosts: []}
+---+----+
|num|year|
+---+----+
|  1|2015|
+---+----+

关键输出中我们可以发现ParquetRelation真正去读的只有那个对应的分区

多分区字段:

写入:


    val data = Seq(
      ("2014",1,"man"),
      ("2014",2,"woman"),
      ("2015",1,"man")
    )
    sparkContext.setLogLevel("ERROR")
    sparkContext.parallelize(data).toDF("year","num","gender").
      write.mode(SaveMode.Overwrite).
      partitionBy(Array("year","gender"):_*)
      .parquet("/Users/cjuexuan/data/partition")

输出:

➜  partition tree .
.
├── _SUCCESS
├── _common_metadata
├── _metadata
├── year=2014
│   ├── gender=man
│   │   └── part-r-00001-f577334f-9fa9-4a48-8170-6174b73f7f1e.gz.parquet
│   └── gender=woman
│       └── part-r-00002-f577334f-9fa9-4a48-8170-6174b73f7f1e.gz.parquet
└── year=2015
    └── gender=man
        └── part-r-00003-f577334f-9fa9-4a48-8170-6174b73f7f1e.gz.parquet

5 directories, 6 files

读取:


 sqlContext.read.parquet("/Users/cjuexuan/data/partition").filter($"year" === "2014" and $"gender" === "woman").show()

输出:

+---+----+------+
|num|year|gender|
+---+----+------+
|  2|2014| woman|
+---+----+------+

另一种:

sqlContext.read.parquet("/Users/cjuexuan/data/partition/year=2014").show()
+---+------+
|num|gender|
+---+------+
|  2| woman|
|  1|   man|
+---+------+
  sqlContext.read.parquet("/Users/cjuexuan/data/partition/year=2014/gender=man").show()
+---+
|num|
+---+
|  1|
+---+
weisong82 commented 7 years ago

写入hive之后, 必须手动alert add partition才行。。不知道hive为啥不自动识别