hortonworks-spark / spark-llap

Apache License 2.0
102 stars 68 forks source link

streaming to struct type column results in nulls #255

Open massoudm opened 5 years ago

massoudm commented 5 years ago

I'm trying to write to a table with Struct type column and all properties in that column are null after saving. HDP 3.0.1.0-187 hive-warehouse-connector_2.11-1.0.0.3.0.1.0-187.jar

Here is code to reproduce:

CREATE TABLE `test`(
    `messageid` string,
    `tag` struct<`name`:string,`value`:int>
)
STORED AS ORC 
LOCATION
  '/user/hive/hivedata/test'
TBLPROPERTIES ("transactional"="true")

Spark:

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

val sparkConf = new SparkConf()
  .set("spark.sql.streaming.checkpointLocation", "./checkpoint")

val sparkSession = SparkSession.builder()
  .appName("HiveStreamingExample")
  .config(sparkConf)
  .enableHiveSupport()
  .getOrCreate()

import com.hortonworks.hwc.HiveWarehouseSession
import sparkSession.implicits._

case class Tag(name: String, value: Int)
case class Message(messageid: String, tag: Tag)

val data = List(
   Message("id_1", Tag("tag_1", 1) ),
   Message("id_2", Tag("tag_2", 2) )
)

val messages = data.toDF().as[Message]

messages.printSchema

messages
    .write
    .format(HiveWarehouseSession.DATAFRAME_TO_STREAM)
    .option("table", "test")
    .save()

messages.show(10, false) +---------+----------+ |messageid|tag | +---------+----------+ |id_1 |[tag_1, 1]| |id_2 |[tag_2, 2]| +---------+----------+

but when I query the table using hive:

in hive

SELECT * FROM test

I get:

"test.messageid","test.tag" "id_1","{""name"":null,""value"":null}" "id_2","{""name"":null,""value"":null}"

Since I do not see any code example doing streaming with Struct columns, I believe this may be an undetected bug.

P.S.: Same code works fine if I use HIVE_WAREHOUSE_CONNECTOR instead of DATAFRAME_TO_STREAM.

massoudm commented 5 years ago

I have found what the problem is: HiveStreamingDataWriter uses StrictDelimitedInputWriter and does not account for Struct types. I'm trying to implement a Json DataWriter, but cannot find a branch that builds. Can anyone point me to the right branch?

massoudm commented 5 years ago

I managed to use master branch and get it to build. Now my implementation of HiveStreamingJsonDataWriter throws an exception when it finishes writing the first batch and tries to create a new transaction. The issue seems to be related to assembly shading:

java.lang.ClassCastException: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat cannot be cast to shadehive.org.apache.hadoop.hive.ql.io.AcidOutputFormat
at org.apache.hive.streaming.AbstractRecordWriter.init(AbstractRecordWriter.java:164)
at org.apache.hive.streaming.HiveStreamingConnection$TransactionBatch.<init>(HiveStreamingConnection.java:669)
at org.apache.hive.streaming.HiveStreamingConnection$TransactionBatch.<init>(HiveStreamingConnection.java:596)
at org.apache.hive.streaming.HiveStreamingConnection.createNewTransactionBatch(HiveStreamingConnection.java:485)
at org.apache.hive.streaming.HiveStreamingConnection.beginNextTransaction(HiveStreamingConnection.java:478)
at org.apache.hive.streaming.HiveStreamingConnection.beginTransaction(HiveStreamingConnection.java:507)
at com.hortonworks.spark.sql.hive.llap.HiveStreamingJsonDataWriter.write(HiveStreamingJsonDataWriter.java:110)
at com.hortonworks.spark.sql.hive.llap.HiveStreamingJsonDataWriter.write(HiveStreamingJsonDataWriter.java:21)

Fix for this issue is included in HIVE-20059, so I will try to include the fix in Hive 3.1.1 and build it locally to see if it fixes the issue.

mazar commented 5 years ago

created pull request #258

massoudm commented 5 years ago

So the suggested PR above uses following format to use StrictJsonWriter:

messages
    .write
    .format(HiveWarehouseSession.DATAFRAME_TO_STREAM)
    .option("writer", "json")
    .option("table", "test")
    .save()
GrzesiuKo commented 4 years ago

I have the same problem, but in Structured Streaming and I am using HiveWarehouseSession.STREAM_TO_STREAM. As @massoudm wrote, I am also trying to write to stuct type column and all fields are null.

GrzesiuKo commented 4 years ago

@massoudm changes worked for me in Structured Streaming

What i did:

  1. Cloned @massoudm branch
  2. In the project root directory I ran sbt assembly
  3. I used the new created hwc jar
  4. My code:
    data
      .writeStream
      .queryName(config("stream.name") + "_query")
      .options(hiveConfig)
      .option("writer", "json")
      .format(HiveWarehouseSession.STREAM_TO_STREAM)
      .outputMode("append")
      .start()
  5. Most important are:
    .option("writer", "json")
    .format(HiveWarehouseSession.STREAM_TO_STREAM)