apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.46k stars 2.24k forks source link

Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions #7226

Open adigerber opened 1 year ago

adigerber commented 1 year ago

Apache Iceberg version

1.2.0 (latest release)

Query engine

Spark

Please describe the bug 🐞

According to the documentation, when using Iceberg, one should set spark.sql.extensions to org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions, but setting this property seems to cause an exception to be thrown when trying to write to an Iceberg table using Spark structured streaming.

The exception that is thrown is:

Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: days(ts) is not currently supported
=== Streaming Query ===
Identifier: [id = cfb83943-cd87-4c84-bf25-a290e8891e19, runId = ddf71690-7e5d-41f6-8a8e-84c425683a26]
Current Committed Offsets: {}
Current Available Offsets: {MemoryStream[ts#3,a#4,b#5]: 0}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
WriteToMicroBatchDataSource spark_catalog.default.test_iceberg_table, cfb83943-cd87-4c84-bf25-a290e8891e19, [path=test_iceberg_table, fanout-enabled=true, checkpointLocation=/tmp/spark-checkpoint-16659193840247202419], Append
+- StreamingDataSourceV2Relation [ts#3, a#4, b#5], org.apache.spark.sql.execution.streaming.MemoryStreamScanBuilder@4533d1cf, MemoryStream[ts#3,a#4,b#5]

Code to reproduce:

package com.example

import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{Encoders, SparkSession}

import java.nio.file.Files
import java.sql.Timestamp

case class Bla(ts: Timestamp, a: String, b: Double)

object MinEx {
  def main(args: Array[String]): Unit = {
    val warehouseDir = Files.createTempDirectory("spark-warehouse-iceberg-").toString
    val checkpointDir = Files.createTempDirectory("spark-checkpoint-").toString
    val spark = SparkSession.builder()
      .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
      .config("spark.sql.catalog.spark_catalog.type", "hadoop")
      .config("spark.sql.catalog.spark_catalog.warehouse", warehouseDir)
      .config("spark.sql.warehouse.dir", warehouseDir)
      .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
      .appName("BugRepro")
      .master("local[*]")
      .enableHiveSupport()
      .getOrCreate()

    spark.sql("create table test_iceberg_table(ts timestamp, a string, b double) using iceberg partitioned by (days(ts))")

    implicit val sqlContext = spark.sqlContext
    implicit val encoder = Encoders.product[Bla]
    val memStream = MemoryStream[Bla]
    val now = System.currentTimeMillis()
    val day = 86400000
    memStream.addData(List(
      Bla(new Timestamp(now), "test", 12.34),
      Bla(new Timestamp(now - 1 * day), "test 1d", 33.34),
      Bla(new Timestamp(now - 3 * day), "test 3d", 44.34),
      Bla(new Timestamp(now - 2 * day), "test 2d", 55.34),
    ))

    memStream.toDF()
      .writeStream
      .format("iceberg")
      .outputMode("append")
      .option("path", "test_iceberg_table")
      .option("fanout-enabled", true)
      .option("checkpointLocation", checkpointDir)
      .trigger(Trigger.Once())
      .start()
      .awaitTermination()
  }
}

The code works as expected when the statement that configures spark.sql.extensions is commented out.

Fokko commented 1 year ago

Can reproduce this locally. Found in Iceberg 1.0.0 onward.

Fokko commented 1 year ago

@adigerber I know that it is a while ago, but which version of Spark are you using?

Fokko commented 1 year ago

Able to reproduce this using:

rm -rf /tmp/warehouse
rm -rf /tmp/spark-checkpoint

mkdir -p /tmp/warehouse
mkdir -p /tmp/spark-checkpoint

./bin/spark-shell \
 --packages org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.0.0 \
 --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
 --conf spark.sql.catalog.demo=org.apache.iceberg.spark.SparkCatalog \
 --conf spark.sql.catalog.demo.type=hadoop \
 --conf spark.sql.catalog.demo.warehouse=/tmp/warehouse \
 --conf spark.sql.warehouse.dir=/tmp/warehouse \
 --conf spark.sql.defaultCatalog=demo 
spark.sql("create table test_iceberg_table(ts timestamp, a string, b double) using iceberg partitioned by (days(ts))")

import java.nio.file.Files
import java.sql.Timestamp
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{Encoders, SparkSession}

case class Bla(ts: Timestamp, a: String, b: Double)

implicit val sqlContext = spark.sqlContext
implicit val encoder = Encoders.product[Bla]
val memStream = MemoryStream[Bla]
val now = System.currentTimeMillis()
val day = 86400000
memStream.addData(List(
  Bla(new Timestamp(now), "test", 12.34),
  Bla(new Timestamp(now - 1 * day), "test 1d", 33.34),
  Bla(new Timestamp(now - 3 * day), "test 3d", 44.34),
  Bla(new Timestamp(now - 2 * day), "test 2d", 55.34),
))

memStream.toDF()
  .writeStream
  .format("iceberg")
  .outputMode("append")
  .option("path", "/tmp/warehouse/test_iceberg_table")
  .option("checkpointLocation", "/tmp/spark-checkpoint")
  .option("fanout-enabled", true)
  .trigger(Trigger.Once())
  .start()
  .awaitTermination()

Any idea @aokolnychyi? Not really looking forward to bisecting between 0.13.1 and 1.0.0 :3

Fokko commented 1 year ago

Spark 3.1 with Iceberg 1.3.0 works fine. It seems that something is off with Spark 3.2+

Fokko commented 1 year ago

The stack trace:

23/06/19 17:45:53 ERROR MicroBatchExecution: Query [id = 2507e68d-0fb2-4aa2-98e4-427bfa1326c5, runId = 308060c0-6572-4328-9588-ece2b00c054a] terminated with error
org.apache.spark.sql.AnalysisException: days(ts) is not currently supported
    at org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.$anonfun$toCatalyst$1(V2ExpressionUtils.scala:64)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.toCatalyst(V2ExpressionUtils.scala:64)
    at org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$.$anonfun$prepareQuery$1(DistributionAndOrderingUtils.scala:36)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
    at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
    at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
    at scala.collection.TraversableLike.map(TraversableLike.scala:286)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
    at org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$.prepareQuery(DistributionAndOrderingUtils.scala:36)
    at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:93)
    at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:43)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
    at org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:43)
    at org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:39)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211)
    at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
    at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
    at scala.collection.immutable.List.foldLeft(List.scala:91)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
    at scala.collection.immutable.List.foreach(List.scala:431)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
    at org.apache.spark.sql.execution.streaming.IncrementalExecution.$anonfun$optimizedPlan$1(IncrementalExecution.scala:81)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
    at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan$lzycompute(IncrementalExecution.scala:82)
    at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan(IncrementalExecution.scala:79)
    at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:136)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:154)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:151)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:656)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:646)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:256)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:219)
    at org.apache.spark.sql.execution.streaming.SingleBatchExecutor.execute(TriggerExecutor.scala:39)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:213)
    at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)

Spark 3.2 uses a V2 data source.

adigerber commented 1 year ago

@adigerber I know that it is a while ago, but which version of Spark are you using?

Either 3.3.1 or 3.3.2. IIRC it still happens in Spark 3.3.2 + Iceberg 1.2.1

Fokko commented 1 year ago

Was able to chase this down. Looks like the catalog is not passed down to the execution.

    // TODO (SPARK-27484): we should add the writing node before the plan is analyzed.
    sink match {
      case s: SupportsWrite =>
        val relationOpt = plan.catalogAndIdent.map {
          case (catalog, ident) => DataSourceV2Relation.create(s, Some(catalog), Some(ident))
        }
        WriteToMicroBatchDataSource(
          relationOpt,
          table = s,
          query = _logicalPlan,
          queryId = id.toString,
          extraOptions,
          outputMode)

The relationOpt is None

Fokko commented 1 year ago

Closing in. One workaround is to use toTable('test_iceberg_table ') instead of start().

shuvaevv commented 1 year ago

Closing in. One workaround is to use toTable('test_iceberg_table ') instead of start().

@Fokko For Spark 3.3.0 and Iceberg 1.2.1, calling the toTable method didn't solve the issue. Code:

df.writeStream
  .format("iceberg")
  .outputMode("append")
  .option("checkpointLocation", checkpointLocation)
  .toTable(tableName)
  .awaitTermination()

Error: org.apache.spark.sql.AnalysisException: hours(dt) is not currently supported

Could you suggest any other workarounds, please?

Fokko commented 1 year ago

@shuvaevv Sorry for the late reply, I missed this one. Do you have a stacktrace? I've cleaned up the PR, and that should be ready for the next release.

Marcus-Rosti commented 1 year ago

same issue for me

Marcus-Rosti commented 1 year ago

The stack trace:

23/06/19 17:45:53 ERROR MicroBatchExecution: Query [id = 2507e68d-0fb2-4aa2-98e4-427bfa1326c5, runId = 308060c0-6572-4328-9588-ece2b00c054a] terminated with error
org.apache.spark.sql.AnalysisException: days(ts) is not currently supported
  at org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.$anonfun$toCatalyst$1(V2ExpressionUtils.scala:64)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.toCatalyst(V2ExpressionUtils.scala:64)
  at org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$.$anonfun$prepareQuery$1(DistributionAndOrderingUtils.scala:36)
  at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
  at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
  at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
  at scala.collection.TraversableLike.map(TraversableLike.scala:286)
  at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
  at org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$.prepareQuery(DistributionAndOrderingUtils.scala:36)
  at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:93)
  at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:43)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
  at org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:43)
  at org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:39)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211)
  at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
  at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
  at scala.collection.immutable.List.foldLeft(List.scala:91)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
  at scala.collection.immutable.List.foreach(List.scala:431)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
  at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
  at org.apache.spark.sql.execution.streaming.IncrementalExecution.$anonfun$optimizedPlan$1(IncrementalExecution.scala:81)
  at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
  at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
  at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
  at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
  at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
  at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan$lzycompute(IncrementalExecution.scala:82)
  at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan(IncrementalExecution.scala:79)
  at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:136)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:154)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:151)
  at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:656)
  at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
  at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
  at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
  at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:646)
  at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:256)
  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
  at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
  at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
  at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
  at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:219)
  at org.apache.spark.sql.execution.streaming.SingleBatchExecutor.execute(TriggerExecutor.scala:39)
  at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:213)
  at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
  at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
  at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)

Spark 3.2 uses a V2 data source.

same stack trace for me on 1.3.1 and spark 3.3

Marcus-Rosti commented 1 year ago

toTable wasn't a fix

arun-dhingra commented 1 year ago

same issue for me

aokolnychyi commented 1 year ago

This is a Spark issue, not an Iceberg issue (at least in 3.4). We may consider fixing Spark 3.3 and older but I am not so sure about 3.4. In Spark 3.4, we are relying on the function catalog API to resolve transforms.

@Marcus-Rosti, could you confirm toTable works in 3.4? I believe you tried 3.3 before.

@Fokko is right that start(), unlike toTable(), does not populate the catalog, hence we can't resolve the transforms. I believe the right solution would be to fix Spark to use SupportsCatalogOptions when loading Table from TableProvider.

FabricioZGalvani commented 1 year ago

Hello, my friends.

I'm using Structured streaming writes to partitioned table, and I'm using Iceberg's transformations to partition.

Trying to record with .start(), I get the same error, but testing to use the approach of passing toTable('iceberg_table') instead of start(), it worked.

I am using Iceberg 1.3.0 and Spark 3.4.0.

This way it didn't work

df.writeStream.format("iceberg").outputMode("append").trigger(
    once=True
).option("path", iceberg_table).option("fanout-enabled", "true").option(
    "checkpointLocation",
    checkpoint_location,
).start().awaitTermination()

This way it worked:

df.writeStream.format("iceberg").outputMode("append").trigger(
    once=True
).option("path", iceberg_table).option("fanout-enabled", "true").option(
    "checkpointLocation",
    checkpoint_location,
).toTable(
    iceberg_table
).awaitTermination()
Marcus-Rosti commented 1 year ago

Hello, my friends.

I'm using Structured streaming writes to partitioned table, and I'm using Iceberg's transformations to partition.

Trying to record with .start(), I get the same error, but testing to use the approach of passing toTable('iceberg_table') instead of start(), it worked.

I am using Iceberg 1.3.0 and Spark 3.4.0.

This way it didn't work

df.writeStream.format("iceberg").outputMode("append").trigger(
    once=True
).option("path", iceberg_table).option("fanout-enabled", "true").option(
    "checkpointLocation",
    checkpoint_location,
).start().awaitTermination()

This way it worked:

df.writeStream.format("iceberg").outputMode("append").trigger(
    once=True
).option("path", iceberg_table).option("fanout-enabled", "true").option(
    "checkpointLocation",
    checkpoint_location,
).toTable(
    iceberg_table
).awaitTermination()

Yeah I got an error with iceberg 1.3 and spark 3.3, that's interesting

arundh93 commented 1 year ago

Hello, my friends.

I'm using Structured streaming writes to partitioned table, and I'm using Iceberg's transformations to partition.

Trying to record with .start(), I get the same error, but testing to use the approach of passing toTable('iceberg_table') instead of start(), it worked.

I am using Iceberg 1.3.0 and Spark 3.4.0.

This way it didn't work

df.writeStream.format("iceberg").outputMode("append").trigger( once=True ).option("path", iceberg_table).option("fanout-enabled", "true").option( "checkpointLocation", checkpoint_location, ).start().awaitTermination() This way it worked:

df.writeStream.format("iceberg").outputMode("append").trigger( once=True ).option("path", iceberg_table).option("fanout-enabled", "true").option( "checkpointLocation", checkpoint_location, ).toTable( iceberg_table ).awaitTermination()

For me as well toTable works in spark 3.4

aokolnychyi commented 1 year ago

If I remember correctly, Spark 3.3 does not have the function catalog. Therefore, we can't resolve Iceberg transforms. I think the right approach is either to use toTable in Spark 3.4 and above or fix Spark the old loading API (probably won't be trivial).

kaijiezhang0319 commented 9 months ago

hello @aokolnychyi . We are meeting similar issue. Spark 3.3.1 and iceberg 1.1.0. We previously use spark3.2.1 and iceberg 1.1.0 it works fine. And we find the issue when we bump up to spark 3.3.1. Any suggestion is appreciate. thanks

greg-roberts-bbc commented 8 months ago

We're encountering this issue using iceberg 1.4.3 on Spark 3.3.0 (AWS Glue 4.0) and using toTable hasn't fixed the issue.

greg-roberts-bbc commented 8 months ago

We've found a workaround in our use case. (Iceberg 1.4.3, Spark 3.3.0 on Glue 4.0).

Our previous flow was:

TABLE = "glue_catalog.<database>.<table>"

# set up readStream
read_stream = spark.readStream.format(
<setup read stream>
.load()

# dataframe operations
df = read_stream.select(
<various dataframe operations>
)

# setup write stream
write_stream = df.writeStream.format("iceberg").outputMode("append").trigger(
    processingTime=job_args["TRIGGER_PROCESSING_TIME"]
).options(**{
    "fanout-enabled": job_args["FANOUT_ENABLED"],
    "checkpointLocation": job_args["CHECKPOINT_LOCATION"],
}).toTable(TABLE)

which always failed on the insert with the above described error.

Our new flow is to use processBatch:

def process_batch(df, batch_id):
    df = df.select(
    <various dataframe operations>
    )

    df.writeTo(TABLE).append()

read_stream.writeStream.forEachBatch(process_batch).start()

The above is for completeness, as we're actually using Glue's inbuilt GlueContext.forEachBatch but it does exactly the same thing.

and this is no longer failing. We're able to write to the table with partition transforms (we're using hour() to partition our data).

Interestingly, the data is now being written to S3 as you'd expect for the S3FileIO implementation (i.e. writes are prefixed with a random string, where previously this wasn't happening.

It would be nice to use the inbuilt write triggers as described in the docs but we are happy with a working solution. and this allows us to add MERGE behaviour in with SQL.

Hope someone else finds this useful!

stevenlii commented 7 months ago

We've found a workaround in our use case. (Iceberg 1.4.3, Spark 3.3.0 on Glue 4.0).

Our previous flow was:

TABLE = "glue_catalog.<database>.<table>"

# set up readStream
read_stream = spark.readStream.format(
<setup read stream>
.load()

# dataframe operations
df = read_stream.select(
<various dataframe operations>
)

# setup write stream
write_stream = df.writeStream.format("iceberg").outputMode("append").trigger(
    processingTime=job_args["TRIGGER_PROCESSING_TIME"]
).options(**{
    "fanout-enabled": job_args["FANOUT_ENABLED"],
    "checkpointLocation": job_args["CHECKPOINT_LOCATION"],
}).toTable(TABLE)

which always failed on the insert with the above described error.

Our new flow is to use processBatch:

def process_batch(df, batch_id):
    df = df.select(
    <various dataframe operations>
    )

    df.writeTo(TABLE).append()

read_stream.writeStream.forEachBatch(process_batch).start()

The above is for completeness, as we're actually using Glue's inbuilt GlueContext.forEachBatch but it does exactly the same thing.

and this is no longer failing. We're able to write to the table with partition transforms (we're using hour() to partition our data).

Interestingly, the data is now being written to S3 as you'd expect for the S3FileIO implementation (i.e. writes are prefixed with a random string, where previously this wasn't happening.

It would be nice to use the inbuilt write triggers as described in the docs but we are happy with a working solution. and this allows us to add MERGE behaviour in with SQL.

Hope someone else finds this useful!

But this way there is no checkpointLocation. How do you manage the offset?

greg-roberts-bbc commented 7 months ago

@stevenlii

But this way there is no checkpointLocation. How do you manage the offset?

As I said, we're using GlueContext.forEachBatch which allows you to specify the checkpoint location as follows:

glue_context.forEachBatch(
    frame=read_stream,
    batch_function=process_batch,
    options={
        "windowSize": job_args["TRIGGER_PROCESSING_TIME"],
        "fanout-enabled": job_args["FANOUT_ENABLED"],
        "checkpointLocation": job_args["CHECKPOINT_LOCATION"],
        "persistDataFrame": "false",
    }
)

The equivalent call to writeStream.forEachBatch would be something like:

read_stream.writeStream.foreachBatch(process_batch).trigger(processingTime=windowSize).option("checkpointLocation", checkpointLocation)

As can be seen in the Glue source code I linked above, here: https://github.com/awslabs/aws-glue-libs/blob/master/awsglue/context.py#L641

UtkarshSharma2612 commented 6 months ago

I am writing using structured spark streaming to iceberg, my spark version is 3.4.2 and iceberg version is 2. I am facing this issue also and changing .option() to .toTable didn't help. `24/04/22 12:15:45 WARN V2ExpressionUtils: V2 function years with parameter types (timestamp) is used in partition transforms, but its definition couldn't be found in the function catalog provided 24/04/22 12:15:45 WARN V2ExpressionUtils: V2 function years with parameter types (timestamp) is used in partition transforms, but its definition couldn't be found in the function catalog provided 24/04/22 12:15:45 ERROR MicroBatchExecution: Query [id = 5222ef61-86e7-4f64-981c-6f330483730b, runId = c873f460-5c5b-43f3-a182-9bfa09d29b34] terminated with error org.apache.spark.sql.AnalysisException: years(timestamp) is not currently supported at org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.$anonfun$toCatalyst$1(V2ExpressionUtils.scala:71) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.toCatalyst(V2ExpressionUtils.scala:71) at org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$.$anonfun$prepareQuery$2(DistributionAndOrderingUtils.scala:45) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198) at org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$.prepareQuery(DistributionAndOrderingUtils.scala:45) at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:95) at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:43) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488) at org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:43) at org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:39) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222) at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126) at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122) at scala.collection.immutable.List.foldLeft(List.scala:91) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211) at scala.collection.immutable.List.foreach(List.scala:431) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88) at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182) at org.apache.spark.sql.execution.streaming.IncrementalExecution.$anonfun$optimizedPlan$1(IncrementalExecution.scala:91) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201) at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan$lzycompute(IncrementalExecution.scala:83) at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan(IncrementalExecution.scala:83) at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:153) at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:171) at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:168) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:717) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:706) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:284) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:247) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:237) at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:306) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:284) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:207) 24/04/22 12:15:45 ERROR MicroBatchExecution: Query [id = 4cf32767-e385-43b7-8550-09125c6f638c, runId = 6de5e2e7-7638-4289-9a5c-aa45f2281889] terminated with error org.apache.spark.sql.AnalysisException: years(timestamp) is not currently supported at org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.$anonfun$toCatalyst$1(V2ExpressionUtils.scala:71) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.toCatalyst(V2ExpressionUtils.scala:71) at org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$.$anonfun$prepareQuery$2(DistributionAndOrderingUtils.scala:45) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198) at org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$.prepareQuery(DistributionAndOrderingUtils.scala:45) at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:95) at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:43) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488) at org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:43) at org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:39) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222) at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126) at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122) at scala.collection.immutable.List.foldLeft(List.scala:91) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211) at scala.collection.immutable.List.foreach(List.scala:431) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88) at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182) at org.apache.spark.sql.execution.streaming.IncrementalExecution.$anonfun$optimizedPlan$1(IncrementalExecution.scala:91) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201) at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan$lzycompute(IncrementalExecution.scala:83) at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan(IncrementalExecution.scala:83) at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:153) at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:171) at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:168) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:717) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:706) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:284) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:247) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:237) at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:306) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:284) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:207) 24/04/22 12:15:45 INFO AppInfoParser: App info kafka.admin.client for adminclient-2 unregistered 24/04/22 12:15:45 INFO AppInfoParser: App info kafka.admin.client for adminclient-1 unregistered 24/04/22 12:15:45 INFO Metrics: Metrics scheduler closed 24/04/22 12:15:45 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter 24/04/22 12:15:45 INFO Metrics: Metrics reporters closed 24/04/22 12:15:45 INFO MicroBatchExecution: Async log purge executor pool for query [id = 4cf32767-e385-43b7-8550-09125c6f638c, runId = 6de5e2e7-7638-4289-9a5c-aa45f2281889] has been shutdown 24/04/22 12:15:45 INFO Metrics: Metrics scheduler closed 24/04/22 12:15:45 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter 24/04/22 12:15:45 INFO Metrics: Metrics reporters closed 24/04/22 12:15:45 INFO MicroBatchExecution: Async log purge executor pool for query [id = 5222ef61-86e7-4f64-981c-6f330483730b, runId = c873f460-5c5b-43f3-a182-9bfa09d29b34] has been shutdown 24/04/22 12:15:45 ERROR TestJobPipelineImpl: stream error: {} org.apache.spark.sql.streaming.StreamingQueryException: years(timestamp) is not currently supported === Streaming Query === Identifier: [id = 4cf32767-e385-43b7-8550-09125c6f638c, runId = 6de5e2e7-7638-4289-9a5c-aa45f2281889] Current Committed Offsets: {} Current Available Offsets: {KafkaV2[Subscribe[com.engati.write.user.journey.conversion.data.to.db]]: {"com.engati.write.user.journey.conversion.data.to.db":{"0":729547}}}

Current State: ACTIVE Thread State: RUNNABLE

Logical Plan: WriteToMicroBatchDataSource RelationV2[bot_ref#441, user_id#442, timestamp#443, source_type#444, conversion_type#445, ad_id#446, ad_source#447, ad_type#448, broadcast_id#449, broadcast_response_type#450, flow_id#451, attribute_id#452] local.user_journey.conversion_analytics local.user_journey.conversion_analytics, local.user_journey.conversion_analytics, 4cf32767-e385-43b7-8550-09125c6f638c, [checkpointLocation=/tmp/checkpointOne, fanout-enabled=true], Append +- Project [bot_ref#78, user_id#91, timestamp#143, source_type#156, conversion_type#195, ad_id#104, ad_source#117, ad_type#130, broadcast_id#169, broadcast_response_type#182, flow_id#208, attribute_id#221] +- Project [ad_id#104, ad_source#117, ad_type#130, attributeId#43 AS attribute_id#221, bot_ref#78, broadcast_id#169, broadcast_response_type#182, conversion_type#195, flow_id#208, source_type#156, timestamp#143, user_id#91] +- Project [ad_id#104, ad_source#117, ad_type#130, attributeId#43, bot_ref#78, broadcast_id#169, broadcast_response_type#182, conversion_type#195, flowId#48 AS flow_id#208, source_type#156, timestamp#143, user_id#91] +- Project [ad_id#104, ad_source#117, ad_type#130, attributeId#43, bot_ref#78, broadcast_id#169, broadcast_response_type#182, conversionType#47 AS conversion_type#195, flowId#48, source_type#156, timestamp#143, user_id#91] +- Project [ad_id#104, ad_source#117, ad_type#130, attributeId#43, bot_ref#78, broadcast_id#169, broadcastResponseType#46 AS broadcast_response_type#182, conversionType#47, flowId#48, source_type#156, timestamp#143, user_id#91] +- Project [ad_id#104, ad_source#117, ad_type#130, attributeId#43, bot_ref#78, broadcastId#45 AS broadcast_id#169, broadcastResponseType#46, conversionType#47, flowId#48, source_type#156, timestamp#143, user_id#91] +- Project [ad_id#104, ad_source#117, ad_type#130, attributeId#43, bot_ref#78, broadcastId#45, broadcastResponseType#46, conversionType#47, flowId#48, sourceType#49 AS source_type#156, timestamp#143, user_id#91] +- Project [ad_id#104, ad_source#117, ad_type#130, attributeId#43, bot_ref#78, broadcastId#45, broadcastResponseType#46, conversionType#47, flowId#48, sourceType#49, timestamp#50 AS timestamp#143, user_id#91] +- Project [ad_id#104, ad_source#117, adType#42 AS ad_type#130, attributeId#43, bot_ref#78, broadcastId#45, broadcastResponseType#46, conversionType#47, flowId#48, sourceType#49, timestamp#50, user_id#91] +- Project [ad_id#104, adSource#41 AS ad_source#117, adType#42, attributeId#43, bot_ref#78, broadcastId#45, broadcastResponseType#46, conversionType#47, flowId#48, sourceType#49, timestamp#50, user_id#91] +- Project [adId#40 AS ad_id#104, adSource#41, adType#42, attributeId#43, bot_ref#78, broadcastId#45, broadcastResponseType#46, conversionType#47, flowId#48, sourceType#49, timestamp#50, user_id#91] +- Project [adId#40, adSource#41, adType#42, attributeId#43, bot_ref#78, broadcastId#45, broadcastResponseType#46, conversionType#47, flowId#48, sourceType#49, timestamp#50, userId#51 AS user_id#91] +- Project [adId#40, adSource#41, adType#42, attributeId#43, botRef#44 AS bot_ref#78, broadcastId#45, broadcastResponseType#46, conversionType#47, flowId#48, sourceType#49, timestamp#50, userId#51] +- TypedFilter com.engati.analytics.etl.pipeline.impl.TestJobPipelineImpl$$Lambda$1655/1994143461@2556e117, class com.engati.analytics.etl.extract.models.ConversionDTO, [StructField(adId,StringType,true), StructField(adSource,StringType,true), StructField(adType,StringType,true), StructField(attributeId,IntegerType,true), StructField(botRef,IntegerType,true), StructField(broadcastId,StringType,true), StructField(broadcastResponseType,StringType,true), StructField(conversionType,StringType,true), StructField(flowId,IntegerType,true), StructField(sourceType,StringType,true), StructField(timestamp,TimestampType,true), StructField(userId,StringType,true)], initializejavabean(newInstance(class com.engati.analytics.etl.extract.models.ConversionDTO), (setFlowId,staticinvoke(class java.lang.Integer, ObjectType(class java.lang.Integer), valueOf, cast(flowId#48 as int), true, false, true)), (setConversionType,cast(conversionType#47 as string).toString), (setTimestamp,staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, cast(timestamp#50 as timestamp), true, false, true)), (setBotRef,staticinvoke(class java.lang.Integer, ObjectType(class java.lang.Integer), valueOf, cast(botRef#44 as int), true, false, true)), (setBroadcastId,cast(broadcastId#45 as string).toString), (setAdType,cast(adType#42 as string).toString), (setAdSource,cast(adSource#41 as string).toString), (setBroadcastResponseType,cast(broadcastResponseType#46 as string).toString), (setAttributeId,staticinvoke(class java.lang.Integer, ObjectType(class java.lang.Integer), valueOf, cast(attributeId#43 as int), true, false, true)), (setSourceType,cast(sourceType#49 as string).toString), (setUserId,cast(userId#51 as string).toString), (setAdId,cast(adId#40 as string).toString)) +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, com.engati.analytics.etl.extract.models.ConversionDTO, true])).getAdId, true, false, true) AS adId#40, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, com.engati.analytics.etl.extract.models.ConversionDTO, true])).getAdSource, true, false, true) AS adSource#41, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, com.engati.analytics.etl.extract.models.ConversionDTO, true])).getAdType, true, false, true) AS adType#42, knownnotnull(assertnotnull(input[0, com.engati.analytics.etl.extract.models.ConversionDTO, true])).getAttributeId.intValue AS attributeId#43, knownnotnull(assertnotnull(input[0, com.engati.analytics.etl.extract.models.ConversionDTO, true])).getBotRef.intValue AS botRef#44, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, com.engati.analytics.etl.extract.models.ConversionDTO, true])).getBroadcastId, true, false, true) AS broadcastId#45, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, com.engati.analytics.etl.extract.models.ConversionDTO, true])).getBroadcastResponseType, true, false, true) AS broadcastResponseType#46, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, com.engati.analytics.etl.extract.models.ConversionDTO, true])).getConversionType, true, false, true) AS conversionType#47, knownnotnull(assertnotnull(input[0, com.engati.analytics.etl.extract.models.ConversionDTO, true])).getFlowId.intValue AS flowId#48, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, com.engati.analytics.etl.extract.models.ConversionDTO, true])).getSourceType, true, false, true) AS sourceType#49, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, knownnotnull(assertnotnull(input[0, com.engati.analytics.etl.extract.models.ConversionDTO, true])).getTimestamp, true, false, true) AS timestamp#50, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, com.engati.analytics.etl.extract.models.ConversionDTO, true])).getUserId, true, false, true) AS userId#51] +- MapElements com.engati.analytics.etl.pipeline.impl.TestJobPipelineImpl$$Lambda$1641/1836606934@1e93834b, class java.lang.String, [StructField(value,StringType,true)], obj#39: com.engati.analytics.etl.extract.models.ConversionDTO +- DeserializeToObject cast(value#21 as string).toString, obj#38: java.lang.String +- Project [cast(value#8 as string) AS value#21] +- StreamingDataSourceV2Relation [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@48bd89fe, KafkaV2[Subscribe[com.engati.write.user.journey.conversion.data.to.db]]

at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:333)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:207)

Caused by: org.apache.spark.sql.AnalysisException: years(timestamp) is not currently supported at org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.$anonfun$toCatalyst$1(V2ExpressionUtils.scala:71) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.toCatalyst(V2ExpressionUtils.scala:71) at org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$.$anonfun$prepareQuery$2(DistributionAndOrderingUtils.scala:45) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198) at org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$.prepareQuery(DistributionAndOrderingUtils.scala:45) at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:95) at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:43) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488) at org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:43) at org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:39) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222) at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126) at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122) at scala.collection.immutable.List.foldLeft(List.scala:91) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211) at scala.collection.immutable.List.foreach(List.scala:431) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88) at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182) at org.apache.spark.sql.execution.streaming.IncrementalExecution.$anonfun$optimizedPlan$1(IncrementalExecution.scala:91) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201) at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan$lzycompute(IncrementalExecution.scala:83) at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan(IncrementalExecution.scala:83) at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:153) at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:171) at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:168) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:717) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:706) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:284) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:247) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:237) at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:306) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:284) ... 1 more 24/04/22 12:15:45 INFO SparkContext: Invoking stop() from shutdown hook 24/04/22 12:15:45 INFO SparkContext: SparkContext is stopping with exitCode 0. 24/04/22 12:15:45 INFO SparkUI: Stopped Spark web UI at http://ip-10-12-72-49.ap-south-1.compute.internal:4040 24/04/22 12:15:45 INFO StandaloneSchedulerBackend: Shutting down all executors 24/04/22 12:15:45 INFO StandaloneSchedulerBackend$StandaloneDriverEndpoint: Asking each executor to shut down 24/04/22 12:15:45 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 24/04/22 12:15:45 INFO MemoryStore: MemoryStore cleared 24/04/22 12:15:45 INFO BlockManager: BlockManager stopped 24/04/22 12:15:45 INFO BlockManagerMaster: BlockManagerMaster stopped 24/04/22 12:15:45 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 24/04/22 12:15:45 INFO SparkContext: Successfully stopped SparkContext 24/04/22 12:15:45 INFO ShutdownHookManager: Shutdown hook called 24/04/22 12:15:45 INFO ShutdownHookManager: Deleting directory /tmp/spark-bd10ae64-0bf2-4638-a070-2074fe0aeef7 24/04/22 12:15:45 INFO ShutdownHookManager: Deleting directory /tmp/spark-75a1686e-6c8f-483b-9923-0d09834ccbd3`

wfxxh commented 3 months ago

spark version: 3.3.3 , iceberg version: 1.4.3 ,same stack trace for me. my table is:

CREATE TABLE iceberg_catalog.doi.doi_log ( date_time TIMESTAMP, ip STRING, protocol STRING, operation STRING, response STRING, duration STRING, doi STRING) USING iceberg PARTITIONED BY (days(date_time)) TBLPROPERTIES ( 'format' = 'iceberg/parquet', 'format-version' = '2', 'write.metadata.delete-after-commit.enabled' = 'true', 'write.metadata.previous-versions-max' = '20', 'write.parquet.bloom-filter-enabled.column.doi' = 'true', 'write.parquet.compression-codec' = 'zstd', 'write.target-file-size-bytes' = '1073741824')

the error is "org.apache.spark.sql.AnalysisException: days(date_time) is not currently supported" when I set 'write.distribution-mode=none' the error is "days(date_time) ASC NULLS FIRST is not currently supported" My code is like

val kafkaDF = spark read from kafka topic kafkaDF.writeStream .outputMode("append") .queryName("doi") .option("checkpointLocation", "path/to/checkpoint") .option("fanout-enabled", "true") .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES)) .format("iceberg") .toTable("iceberg_catalog.doi.doi_log")