apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.44k stars 2.23k forks source link

How to read/write iceberg in Spark Structed Streaming #1230

Closed zhangdove closed 4 years ago

zhangdove commented 4 years ago

I did some test consume kafka message, write to iceberg table by Spark structed streaming. I'm having some trouble.

1.My environment

Spark version:3.0.0
Iceberg version:0.9.0

2.Create Iceberg table

  def createPartitionTable(catalog: HadoopCatalog, tableIdentifier: TableIdentifier): Unit = {
    val columns: List[Types.NestedField] = new ArrayList[Types.NestedField]
    columns.add(Types.NestedField.of(1, true, "id", Types.IntegerType.get, "id doc"))
    columns.add(Types.NestedField.of(2, true, "name", Types.StringType.get, "name doc"))

    val schema: Schema = new Schema(columns)
    val table = catalog.createTable(tableIdentifier, schema, PartitionSpec.unpartitioned())
  }

3.The pseudocode is as follows

    val (dbName, tbName, kafkatopic, bootstrapServers) = ("testNs", "doveTb", "topic", "ip:9092....")

    val spark = SparkSession.builder()
      .config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
      .config("spark.sql.catalog.hadoop_prod.type", "hadoop")
      .config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://nameservice1/iceberg/warehouse")
      .getOrCreate()

    // 1. read kafka data
    val streamingDF = spark.readStream.format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("subscribe", kafkatopic)
      .load()

    // 2. consume message
    streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
      executorBatchDf(spark, batchDF.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").toDF("key", "value"), batchId)
    }

    def executorBatchDf(spark: SparkSession, batchDF: DataFrame, batchId: Long): Unit = {
      batchDF.persist()
      val icebergHadoopWarehouse = spark.sparkContext.getConf.get("spark.sql.catalog.hadoop_prod.warehouse")

      val selectArray = Array("database", "table", "type", "data")
      val kafkaSourceDF = batchDF.filter(_.get(1) != null)
        .select(json_tuple(batchDF("value"), selectArray: _*))
        .toDF(selectArray: _*)

      println(s"kafkaSourceDF println(batchId:${batchId})")
      kafkaSourceDF.show(false)

      // case one : read table by spark.table("prod.db.table")
      // val icebergTableDF = spark.table(s"hadoop_prod.${schemaName}.${tableName}")
      // case two : read table by spark.read.format("iceberg").load("hdfs://nn:8020/path/to/table")
      val icebergTableDF = spark.read.format("iceberg").load(s"${icebergHadoopWarehouse}/${dbName}/${tbName}")

      println(s"icebergTableDF println(batchId:${batchId})")
      icebergTableDF.show(false)

      val insertDf = kafkaSourceDF
        .filter(kafkaSourceDF("type") === "insert")
        .select(from_json(kafkaSourceDF("data"), icebergTableDF.schema))
        .toDF("struct")
        .selectExpr(icebergTableDF.schema.fieldNames.map(row => "struct." + row): _*)

      val df = insertDf.union(icebergTableDF)

      df.writeTo(s"hadoop_prod.${dbName}.${tbName}")
        .overwrite(lit(true))
      batchDF.unpersist()
    }

4.Kafka Message

{"database": "testNs","table": "doveTb","type": "insert","data": {"id": 1,"name": "dove1"}}
{"database": "testNs","table": "doveTb","type": "insert","data": {"id": 2,"name": "dove2"}}

5.Result case one : read table by spark.table("prod.db.table"). Result : Read iceberg table is error(table is empty) when the second batch .

kafkaSourceDF println(batchId:1)
+--------+------+------+-----------------------+
|database|table |type  |data                   |
+--------+------+------+-----------------------+
|testNs  |doveTb|insert|{"id":1,"name":"dove1"}|
+--------+------+------+-----------------------+

icebergTableDF println(batchId:1)
+---+----+
|id |name|
+---+----+
+---+----+

kafkaSourceDF println(batchId:2)
+--------+------+------+-----------------------+
|database|table |type  |data                   |
+--------+------+------+-----------------------+
|testNs  |doveTb|insert|{"id":2,"name":"dove2"}|
+--------+------+------+-----------------------+

icebergTableDF println(batchId:2)
+---+----+
|id |name|
+---+----+
+---+----+

case two : read table by spark.read.format("iceberg").load("hdfs://nn:8020/path/to/table"). Result : normal.

kafkaSourceDF println(batchId:1)
+--------+------+------+-----------------------+
|database|table |type  |data                   |
+--------+------+------+-----------------------+
|testNs  |doveTb|insert|{"id":1,"name":"dove1"}|
+--------+------+------+-----------------------+

icebergTableDF println(batchId:1)
+---+----+
|id |name|
+---+----+
+---+----+

kafkaSourceDF println(batchId:2)
+--------+------+------+-----------------------+
|database|table |type  |data                   |
+--------+------+------+-----------------------+
|testNs  |doveTb|insert|{"id":2,"name":"dove2"}|
+--------+------+------+-----------------------+

icebergTableDF println(batchId:2)
+---+-----+
|id |name |
+---+-----+
|1  |dove1|
+---+-----+

6.Question The phenomenon is spark.table("prod.db.table") is not refreshed iceberg table when the next batch. However, spark.read.format("iceberg").load("hdfs://nn:8020/path/to/table") does the opposite by automatically refreshing. Is there a difference between spark.table("prod.db.table") and spark.read.format("iceberg").load("hdfs://nn:8020/path/to/table")?

I'm not sure if I'm using it the wrong way.

Link: https://iceberg.apache.org/spark/#querying-with-dataframes

HeartSaVioR commented 4 years ago

Document is missing, but micro-batch sink is available for Spark structured streaming so you can just write directly without overwriting table (which means you're rewriting all records per batch).

This is the python code I'm experimenting with Iceberg. I've just written it to python to avoid long compilation - there's nothing specific to python/pyspark, so you can simply do the same with Scala as well.

https://github.com/HeartSaVioR/structured_streaming_experiments/blob/master/src/rate_data_source_to_iceberg.py

zhangdove commented 4 years ago

@HeartSaVioR Thansks for your reply.Maybe there's something wrong with the description of my issue name.

I have tested some differences between spark.table("prod.db.table") and spark.read.format("iceberg").load("hdfs://nn:8020/path/to/table")(Spark Structed Streaming), I wonder why

HeartSaVioR commented 4 years ago

Sorry for that case I have no idea. I'm also starting to explore the project.

zhangdove commented 4 years ago

When Analyzing Iceberg's Catalog, I find that There is still an issue left here, and I have made some new discoveries:

spark.read.format("iceberg").load("hdfs://nn:8020/path/to/table") By this way, Iceberg table loading does not use the Iceberg Catalog. Of course, Iceberg's metadata information will not be cached. Instead, Iceberg Table will be obtained directly by using IcebergSource.findTable(options,conf).

However, when Iceberg table is loaded using spark.table("prod.db.table"), CachingCatalog(cache-enableddefault value is true) automatically looks for Iceberg table from the cache(Caffeine Cache).

Finally, whether it is incorrect that I find that the description of the document in this place?

The correct description should not be this ? Using spark.table("prod.db.table") loads an isolated table reference that is not refreshed when other queries update the table.

@rdblue How do you think this description? Should we update this place?

zhangdove commented 4 years ago

At least I can close the current issue now.