hortonworks-spark / spark-llap

Apache License 2.0
101 stars 68 forks source link

Unable to catch Exception in save() of spark sql #276

Open gitgraghu opened 4 years ago

gitgraghu commented 4 years ago

I am using HIve Warehouse connector to write a DataFrame to a hive table. Code for save looks like below:

inputDS.write() .mode(SaveMode.Append) .format(HiveWarehouseSession.HIVE_WAREHOUSE_CONNECTOR) .option("table","tablename") .save();

However I am unable to catch exception whenever the executeUpdate fails to insert records into table. I would like to catch exception and stop the spark execution as soon as a Runtime exception happens. I see in code that the exception is logged but not thrown

Is there any way I can stop the spark execution when the save() method ends up in error.

https://github.com/hortonworks-spark/spark-llap/blob/26d164e62b45cfa1420d5d43cdef13d1d29bb877/src/main/scala/com/hortonworks/spark/sql/hive/llap/HS2JDBCWrapper.scala#L227

rahulmod commented 4 years ago

It looks like the executeUpdate function is not throwing exception and hence the client code is not able to catch. After logging error the function should throw exception at line 228. You can do following in scala to check the success:

val res = inputDS.write() .mode(SaveMode.Append) .format(HiveWarehouseSession.HIVE_WAREHOUSE_CONNECTOR) .option("table","tablename") .save()

if(res) print("success") else ("failure")

gitgraghu commented 4 years ago

Hi rahulmod

Thanks for the reply !!

the save() function does not return a boolean. It returns void in Java and Unit in scala. So the above code which you gave won't work.

This is a problem in the code. They try to catch and throw exception at executeUpdate as below code. But executeUpdate never throws an Exception. So we cannot catch the Exception from save(). There is no way to handle exception on save() in spark with hortonworks warehouse connector plugin.

https://github.com/hortonworks-spark/spark-llap/blob/26d164e62b45cfa1420d5d43cdef13d1d29bb877/src/main/java/com/hortonworks/spark/sql/hive/llap/HiveWarehouseDataSourceWriter.java#L71

rahulmod commented 4 years ago

You have to use hive_warehouse_connector to connect to Hive and use "insert into table ..." in executeUpdate command. First store data in staging table before inserting into final table. If you use executeUpdate then we can throw exception as I mentioned in my previous comment. https://www.nitendratech.com/bigdata/spark/access-hive-in-hdp3-using-apache-spark/