bartosz25 / spark-scala-playground

Sample processing code using Spark 2.1+ and Scala
50 stars 25 forks source link

Insert overwrite only one partition and keep other partitions unchanged using DataFrame/Dataset API. #9

Closed bithw1 closed 5 years ago

bithw1 commented 5 years ago

Hi @bartosz25

I would ask you a question,

I have following test code that only wants to insert overwrite one partition (a=1), I am following https://stackoverflow.com/questions/50006526/overwrite-only-some-partitions-in-a-partitioned-spark-dataset

test("SparkTest0234") {
    val conf = new SparkConf()
    conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
    val spark = SparkSession.builder().master("local[4]").appName("SparkTest 0234").config(conf).enableHiveSupport().getOrCreate()

    import spark.implicits._
    import org.apache.spark.sql.SaveMode
    spark.range(0, 10).map(i => (i % 3, i * i)).toDF("a", "b").write.partitionBy("a").mode(SaveMode.Overwrite).saveAsTable("SparkTest0234")

//only want to overwrite partition a=1, and leave a=0, a=2 not changed.  
spark.range(0, 20).map(i => (1, i * i)).toDF("a", "b").write.mode(SaveMode.Overwrite).insertInto("SparkTest0234")
  }

but I got following directories on disk, it looks that the result directory is using b's values to create the partition directory, could you please help take a look? Thank you very much

d:\spark-warehouse\sparktest0234
a=0
a=1
...
a=324
a=361
bartosz25 commented 5 years ago

Hello @bithw1,

I'm not a big expert of Hive integration but after playing a little with your code I observed something strange:

    val df = Seq((1, 1111)).toDF("a", "b")
    df.printSchema()
    df.write.mode(SaveMode.Overwrite).partitionBy("a").saveAsTable("SparkTest0234")
    Seq((2, 2222)).toDF("a", "b").write.mode(SaveMode.Overwrite).insertInto("SparkTest0234")
    Seq((3, 3333)).toDF("a", "b").write.mode(SaveMode.Overwrite).insertInto("SparkTest0234")
    Seq((3, 333333)).toDF("a", "b").write.mode(SaveMode.Overwrite).insertInto("SparkTest0234")

    spark.sql("DESCRIBE EXTENDED SparkTest0234").show()
    spark.sql("SELECT * FROM SparkTest0234").show()

It shows:

root
 |-- a: integer (nullable = false)
 |-- b: integer (nullable = false)

+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|                   b|                 int|   null|
|                   a|                 int|   null|
|# Partition Infor...|                    |       |
|          # col_name|           data_type|comment|
|                   a|                 int|   null|
|                    |                    |       |
|# Detailed Table ...|                    |       |
|            Database|             default|       |
|               Table|       sparktest0234|       |
|               Owner|             bartosz|       |
|        Created Time|Sat Oct 13 18:39:...|       |
|         Last Access|Thu Jan 01 01:00:...|       |
|          Created By|         Spark 2.3.0|       |
|                Type|             MANAGED|       |
|            Provider|             parquet|       |
|    Table Properties|[transient_lastDd...|       |
|            Location|file:/home/bartos...|       |
|       Serde Library|org.apache.hadoop...|       |
|         InputFormat|org.apache.hadoop...|       |
|        OutputFormat|org.apache.hadoop...|       |
+--------------------+--------------------+-------+
only showing top 20 rows

+----+------+
|   b|     a|
+----+------+
|1111|     1|
|   2|  2222|
|   3|  3333|
|   3|333333|
+----+------+

As you can see the engine reversed columns for the DataFrames without explicit definition for partitioning. It explains why you see more than 3 directories in spark-warehouse. Altough I don't know why it happens. It looks like Hive's intergration for insertInto ignores the schema provided with .toDF. Maybe it's somehow related to https://stackoverflow.com/questions/43249907/does-the-column-order-matter-in-insert-overwrite-statement-in-hive https://gist.github.com/cloud-fan/14ada3f2b3225b5db52ccaa12aacfbd4 So I played a little bit more and tried to insert new rows into a table with the columns of 3 different types: string, int and boolean:

val df = Seq(("a", 1, true)).toDF("letter", "nr", "bool")
df.printSchema()
df.write.mode(SaveMode.Overwrite).partitionBy("letter").saveAsTable("SparkTest0234")
Seq(("b", 2, true)).toDF("letter", "nr", "bool").write.mode(SaveMode.Overwrite).insertInto("SparkTest0234")
Seq(("c", 3, true)).toDF("letter", "nr", "bool").write.mode(SaveMode.Overwrite).insertInto("SparkTest0234")
Seq(("c", 33, true)).toDF("letter", "nr", "bool").write.mode(SaveMode.Overwrite).insertInto("SparkTest0234")

spark.sql("DESCRIBE EXTENDED SparkTest0234").show()
spark.sql("SELECT * FROM SparkTest0234").show()

And the debug result was:

root
 |-- letter: string (nullable = true)
 |-- nr: integer (nullable = false)
 |-- bool: boolean (nullable = false)

+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|                  nr|                 int|   null|
|                bool|             boolean|   null|
|              letter|              string|   null|
|# Partition Infor...|                    |       |
|          # col_name|           data_type|comment|
|              letter|              string|   null|
|                    |                    |       |
|# Detailed Table ...|                    |       |
|            Database|             default|       |
|               Table|       sparktest0234|       |
|               Owner|             bartosz|       |
|        Created Time|Sun Oct 14 07:06:...|       |
|         Last Access|Thu Jan 01 01:00:...|       |
|          Created By|         Spark 2.3.0|       |
|                Type|             MANAGED|       |
|            Provider|             parquet|       |
|    Table Properties|[transient_lastDd...|       |
|            Location|file:/home/bartos...|       |
|       Serde Library|org.apache.hadoop...|       |
|         InputFormat|org.apache.hadoop...|       |
+--------------------+--------------------+-------+
only showing top 20 rows

+----+----+------+
|  nr|bool|letter|
+----+----+------+
|   1|true|     a|
|null|true|  true|
+----+----+------+

Now I can't investigate more to see what happens in the code but I can take a look next week. Or, I can let you to do so and keep me informed in this Issue ?

bithw1 commented 5 years ago

Thanks @bartosz25 very mucn for the detail information and investigation. Sure I will try to investigate into the code and see what causes this strange behavior, and I will let you know if I find something.

bithw1 commented 5 years ago

Hi @bartosz25

I tried in our testing environment,

In spark-shell, I create a hive table scala> spark.sql("create table db1.t1(a int, b int) using parquet partitioned by (a)")

In the hive command line, I show the create table, it is create table db1.t1(b int) partitioned by (a int)

Then I directly use hive command line to create table with create table db1.t2(a int, b int) using parquet partitioned by (a)

It is an invalid create table command for hive, I have to write it as create table db1.t2(b int) partitioned by (a int) stored as parquet

That is, when using spark sql to create the table, even if we place the paritioned column a before b, in the end, hive will place the partitioned column at the end, that is, b comes before a

When we write out a DataFrame(a comes before b) to hive, hive doesn't respect column or schema information, so that, a column of DataFrame will become b's value in hive table

When exchange a and b's position, it works as expected now.

  test("SparkTest 0234_2") {
    val conf = new SparkConf()
    conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
    val spark = SparkSession.builder().master("local[1]").appName("SparkTest 0234_2").config(conf).enableHiveSupport().getOrCreate()
    import spark.implicits._

    import org.apache.spark.sql.SaveMode
    val df = spark.range(0, 10).map(i => (i % 3, i * i)).toDF("a", "b")

    df.write.format("json").partitionBy("a").mode(SaveMode.Overwrite).saveAsTable("SparkTest0234_2")
    spark.sql("show create table SparkTest0234_2 ").map(r => r.getString(0)).collect().foreach(println)
    spark.sql("select * from SparkTest0234_2 ").show(truncate = false)
    if (true) {
      val df = spark.range(0, 20).map(i => (i * i, 1)).toDF("b", "a") //b first, a last
      df.show(truncate = false)
      df.write.mode(SaveMode.Overwrite).insertInto("SparkTest0234_2")
    }

  }
bartosz25 commented 5 years ago

Thanks, it was the behavior I observed too. It's pretty strange but I don't know the internal details that may explain that logic. But it's worth investigating. I put the topic to my backlog :)

Could you close the issue if you're satisfied by our exchange ?

bithw1 commented 5 years ago

Thanks @bartosz25 . I think what we have found so far has explained the behavior. The key point is that Hive doesn't honor schema at runtime. When we are writing DF(a,b) to hive, while hive's schema is <b,a>, so that Hive will placeb column of DF into the a column of Hive table

I am closing this issue, thank you @bartosz25

bithw1 commented 5 years ago

Hi @bartosz25 , following is the javadoc from the DataFrameWriter#insertInto method, it says that the insertInto method is using position-based resolution.

So that, when we try to insert df(a,b) into hive table, whose last column is the partitioned column, so that b's value will be used to do dynamic partitioning.

/**
   * Inserts the content of the `DataFrame` to the specified table. It requires that
   * the schema of the `DataFrame` is the same as the schema of the table.
   *
   * @note Unlike `saveAsTable`, `insertInto` ignores the column names and just uses position-based
   * resolution. For example:
   *
   * {{{
   *    scala> Seq((1, 2)).toDF("i", "j").write.mode("overwrite").saveAsTable("t1")
   *    scala> Seq((3, 4)).toDF("j", "i").write.insertInto("t1")
   *    scala> Seq((5, 6)).toDF("a", "b").write.insertInto("t1")
   *    scala> sql("select * from t1").show
   *    +---+---+
   *    |  i|  j|
   *    +---+---+
   *    |  5|  6|
   *    |  3|  4|
   *    |  1|  2|
   *    +---+---+
   * }}}
   *
   * Because it inserts data to an existing table, format or options will be ignored.
   *
   * @since 1.4.0
   */
bartosz25 commented 5 years ago

Hi @bithw1

I've just published the post about Hive's insertInto integration on top of Apache Spark SQL: https://www.waitingforcode.com/apache-spark-sql/apache-spark-sql-hive-insertinto-command/read

Best regards, Bartosz.

bithw1 commented 5 years ago

Thanks you very much, @bartosz25 . That's amazing, I am now reading it, :+1: