hortonworks-spark / spark-llap

Apache License 2.0
101 stars 68 forks source link

It always create table and why "tableExists" is always false in HiveWarehouseDataSourceWriter? #266

Closed rollingdeep closed 1 year ago

rollingdeep commented 5 years ago

My Goal

I want use external table and append data without drop because our hdp3 require us to specify the location of table.

Question

I describe the table. And it shows table structure correctly and exsist in hive. But, DefaultJDBCWrapper tried to create new table because "tableExists:false". I'm using hive-warehouse-connector-assembly-1.0.0.3.1.0.0-78.jar which was installed with ambari.

log shows like this:

19/04/18 15:37:03 INFO HiveWarehouseDataSourceWriter: Handling write: database:my_db, table:test_table, savemode: Append, tableExists:false, createTable:true, loadData:true
19/04/18 15:37:04 ERROR DefaultJDBCWrapper$: executeUpdate failed for query: CREATE TABLE test_table(dd string) STORED AS ORC
at shadehive.org.apache.hive.jdbc.HiveStatement.waitForOperationToComplete(HiveStatement.java:401)
    at shadehive.org.apache.hive.jdbc.HiveStatement.execute(HiveStatement.java:266)
    at shadehive.org.apache.hive.jdbc.HivePreparedStatement.execute(HivePreparedStatement.java:101)
    at org.apache.commons.dbcp2.DelegatingPreparedStatement.execute(DelegatingPreparedStatement.java:198)
    at org.apache.commons.dbcp2.DelegatingPreparedStatement.execute(DelegatingPreparedStatement.java:198)
    at com.hortonworks.spark.sql.hive.llap.JDBCWrapper.executeUpdate(HS2JDBCWrapper.scala:257)
    at com.hortonworks.spark.sql.hive.llap.DefaultJDBCWrapper.executeUpdate(HS2JDBCWrapper.scala)
    at com.hortonworks.spark.sql.hive.llap.HiveWarehouseDataSourceWriter.handleWriteWithSaveMode(HiveWarehouseDataSourceWriter.java:164)
    at com.hortonworks.spark.sql.hive.llap.HiveWarehouseDataSourceWriter.commit(HiveWarehouseDataSourceWriter.java:111)
    at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2.scala:91)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:656)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:256)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225)
    at com.netease.love.feature.TestHdp3$.main(TestHdp3.scala:285)

I think this is a bug.

my source code:

val spark = SparkSession.builder
      .enableHiveSupport()
      .appName(TestHdp3.getClass.getName)
      .getOrCreate()

val hive = HiveWarehouseSession.session(spark).build()
hive.setDatabase("my_db")
hive.session().conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
hive.session().conf.set("hive.exec.dynamic.partition", "true")

hive.describeTable("test_table").show(100, false)

    add_features
      .write
      .format(HIVE_WAREHOUSE_CONNECTOR)
      .mode("append")
      .option("database", "my_db")
      .option("table", "test_table")
      .partitionBy("dt")
      .save("/warehouse/xxx/my_db.db/test_table/")
dmueller1607 commented 4 years ago

I had a similar issue using the Hive Warehouse Connector (also version 1.0.0.3.1.0.0-78).

For me, the problem was solved when I used DB and table name without underscore characters. So I changed my db from test_db to testdb and test_table to testtable.

rollingdeep commented 4 years ago

I had a similar issue using the Hive Warehouse Connector (also version 1.0.0.3.1.0.0-78).

For me, the problem was solved when I used DB and table name without underscore characters. So I changed my db from test_db to testdb and test_table to testtable.

Maybe you are right. Now I am using hive external table and inserting data into the table from hdfs.

  def save2hive(hive: HiveWarehouseSession, df: DataFrame, database: String, tableName: String, pt: Map[String, String]): Unit = {
    var path = s"/warehouse/xxx/$database.db/$tableName"

    var pt_str = ""
    if (pt.nonEmpty) {
      var pt_info = List[String]()

      for((k,v) <- pt) { //TODO Ordered Map is need
        path = path + s"/$k=$v"
        pt_info = pt_info :+ s"$k='$v'"
      }
      pt_str = pt_info.mkString(",")
    }

    df.write.mode(SaveMode.Overwrite).format("orc").save(path)
    if (pt.nonEmpty) {
      hive.executeUpdate(s"ALTER TABLE $database.$tableName DROP IF EXISTS PARTITION ($pt_str)")
      hive.executeUpdate(s"ALTER TABLE $database.$tableName ADD IF NOT EXISTS PARTITION ($pt_str) LOCATION '$path'")
    } else {
      hive.executeUpdate(s"ALTER TABLE $database.$tableName SET LOCATION '$path'")
    }

    println("success!")
  }
zoltanctoth commented 4 years ago

I'm having the same issue and renaming the table doesn't fix it either. What I've found is that it only occurs if you try to use a partitionBy at writing, like:

df.write.partitionBy("part").mode(SaveMode.Overwrite).format(com.hortonworks.hwc.HiveWarehouseSession.HIVE_WAREHOUSE_CONNECTOR).option("table", "`default`.`testout`").save;

On an other note, if you remove the partitionBy piece, partitioning works as expected (as partition info is already stored in the Hive table), but if you use overwrite mode (and not, for example, append), HWC will drop and recreate your table and it won't reapply partitioning info.

rollingdeep commented 4 years ago

I'm having the same issue and renaming the table doesn't fix it either. What I've found is that it only occurs if you try to use a partitionBy at writing, like:

df.write.partitionBy("part").mode(SaveMode.Overwrite).format(com.hortonworks.hwc.HiveWarehouseSession.HIVE_WAREHOUSE_CONNECTOR).option("table", "`default`.`testout`").save;

On an other note, if you remove the partitionBy piece, partitioning works as expected (as partition info is already stored in the Hive table), but if you use overwrite mode (and not, for example, append), HWC will drop and recreate your table and it won't reapply partitioning info.

The problem was submitted about one year ago! But the maintainer never reply a word! you can try the external table mode which I answered above. If your table has only one partition, "save2hive" function will perfectly cover your demand.

zoltanctoth commented 4 years ago

Indeed, it seems there are many issues with this project. Our client's use-case requires us to use managed ACID tables, so we need to stick with that.

Finally my solution, and it seems to work, hope it can be helpful for some of you (it manages some of the bugs I found):