apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.23k stars 2.17k forks source link

[Spark] Cannot append to Glue table - StorageDescriptor#InputFormat cannot be null for table #5565

Closed t0ma-sz closed 2 years ago

t0ma-sz commented 2 years ago

Apache Iceberg version

0.14.0 (latest release)

Query engine

Spark

Please describe the bug 🐞

Hello,

I'm trying to test Iceberg on AWS Glue (Glue version 3.0, Spark 3.1).

I was able to create new table, however when I want to append dataframe to the table, I'm receiving following error: "Exception in User Class: org.apache.spark.sql.AnalysisException : org.apache.hadoop.hive.ql.metadata.HiveException: Unable to fetch table mytable. StorageDescriptor#InputFormat cannot be null for table: mytable (Service: null; Status Code: 0; Error Code: null; Request ID: null; Proxy: null)"

Do I need to specify InputFormat when I'm creating table?

Here is the code of my Glue job:

import com.amazonaws.services.glue.log.GlueLogger
import com.amazonaws.services.glue.util.{GlueArgParser, Job}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, lit, map, when}

import scala.collection.JavaConverters._

object Main {

  def main(sysArgs: Array[String]): Unit = {
    val sparkConf = new SparkConf

    val catalog = "glue_catalog"
    val tableName = "mytable"
    val dbName = "mydb"
    val s3Bucket = "mybucket"
    val nRows = 100

    implicit val sparkSession: SparkSession = SparkSession
      .builder()
      .config(sparkConf)
      .config(s"spark.sql.catalog.$catalog", "org.apache.iceberg.spark.SparkCatalog")
      .config(s"spark.sql.catalog.$catalog.warehouse", s"s3:/$s3Bucket/iceberg/")
      .config(s"spark.sql.catalog.$catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
      .config(s"spark.sql.catalog.$catalog}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
      .config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
      .appName(s"iceberg_poc")
      .enableHiveSupport()
      .getOrCreate()

    val glueContext: GlueContext = new GlueContext(sparkSession.sparkContext)
    val logger                   = new GlueLogger

    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)

    Job.init(args("JOB_NAME"), glueContext, args.asJava)

    val partitionDate = "2022-08-17"
    val dateFormat = new java.text.SimpleDateFormat("yyyy-MM-dd")
    val utilDate = dateFormat.parse(partitionDate)
    val sqlDate = new java.sql.Date(utilDate.getTime())

    import scala.util.Random
    import sparkSession.implicits._

    val df = (1 to nRows)
        .map(_ => (Random.nextLong,Random.nextString(10),Random.nextDouble, Random.nextBoolean, sqlDate))
        .toDF("test_integer","test_string","test_double", "test_boolean", "partition_date")

    val tableExists = sparkSession.catalog.tableExists(s"$dbName.$tableName")

    println(s"Table $tableName exists=$tableExists")

    if (tableExists){
        println("Appending table")

        df.writeTo(s"$catalog.dbName.$tableName")
        .append

    }

    else {
        println("Creating table")

        df.writeTo(s"$catalog.$dbName.$tableName")
        .partitionedBy(col("partition_date"))
        .tableProperty("format-version", "2")
        .create
    }

    Job.commit()
  }
}

Glue table metadata:

{
    "Table": {
        "Name": "mytable",
        "DatabaseName": "mydb",
        "CreateTime": "2022-08-17T15:40:22+02:00",
        "UpdateTime": "2022-08-17T15:40:22+02:00",
        "Retention": 0,
        "StorageDescriptor": {
            "Columns": [
                {
                    "Name": "test_integer",
                    "Type": "bigint",
                    "Parameters": {
                        "iceberg.field.current": "true",
                        "iceberg.field.id": "1",
                        "iceberg.field.optional": "true"
                    }
                },
                {
                    "Name": "test_string",
                    "Type": "string",
                    "Parameters": {
                        "iceberg.field.current": "true",
                        "iceberg.field.id": "2",
                        "iceberg.field.optional": "true"
                    }
                },
                {
                    "Name": "test_double",
                    "Type": "double",
                    "Parameters": {
                        "iceberg.field.current": "true",
                        "iceberg.field.id": "3",
                        "iceberg.field.optional": "true"
                    }
                },
                {
                    "Name": "test_boolean",
                    "Type": "boolean",
                    "Parameters": {
                        "iceberg.field.current": "true",
                        "iceberg.field.id": "4",
                        "iceberg.field.optional": "true"
                    }
                },
                {
                    "Name": "partition_date",
                    "Type": "date",
                    "Parameters": {
                        "iceberg.field.current": "true",
                        "iceberg.field.id": "5",
                        "iceberg.field.optional": "true"
                    }
                }
            ],
            "Location": "s3://mybucket/mytable",
            "Compressed": false,
            "NumberOfBuckets": 0,
            "SortColumns": [],
            "StoredAsSubDirectories": false
        },
        "TableType": "EXTERNAL_TABLE",
        "Parameters": {
            "metadata_location": "s3://mybucket/metadata.json",
            "table_type": "ICEBERG"
        },
        "CreatedBy": "arn:aws:sts::00000000:assumed-role/xxx/GlueJobRunnerSession",
        "IsRegisteredWithLakeFormation": false,
        "CatalogId": "0000000",
        "VersionId": "0"
    }
}

Error stacktrace:

2022-08-17 13:49:28,852 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(94)): Exception in User Class
org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: Unable to fetch table mytable. StorageDescriptor#InputFormat cannot be null for table: mytable (Service: null; Status Code: 0; Error Code: null; Request ID: null; Proxy: null)
    at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:135)
    at org.apache.spark.sql.hive.HiveExternalCatalog.tableExists(HiveExternalCatalog.scala:879)
    at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.tableExists(ExternalCatalogWithListener.scala:146)
    at org.apache.spark.sql.catalyst.catalog.SessionCatalog.tableExists(SessionCatalog.scala:462)
    at org.apache.spark.sql.internal.CatalogImpl.tableExists(CatalogImpl.scala:260)
    at org.apache.spark.sql.internal.CatalogImpl.tableExists(CatalogImpl.scala:252)
    at Main$.main(Main.scala:61)
    at Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.amazonaws.services.glue.SparkProcessLauncherPlugin.invoke(ProcessLauncher.scala:48)
    at com.amazonaws.services.glue.SparkProcessLauncherPlugin.invoke$(ProcessLauncher.scala:48)
    at com.amazonaws.services.glue.ProcessLauncher$$anon$1.invoke(ProcessLauncher.scala:78)
    at com.amazonaws.services.glue.ProcessLauncher.launch(ProcessLauncher.scala:143)
    at com.amazonaws.services.glue.ProcessLauncher$.main(ProcessLauncher.scala:30)
    at com.amazonaws.services.glue.ProcessLauncher.main(ProcessLauncher.scala)
singhpk234 commented 2 years ago
val tableExists = sparkSession.catalog.tableExists(s"$dbName.$tableName")

I think this is happening because you haven't specified the catalog name in identifier of write to and neither made my_catalog as default ....

can you please try once with 3 part identifier here i.e sparkSession.catalog.tableExists(s"$catalog.$dbName.$tableName")

ref : A ticket stating similar issue : https://github.com/apache/iceberg/issues/2202#issuecomment-786301938

t0ma-sz commented 2 years ago
val tableExists = sparkSession.catalog.tableExists(s"$dbName.$tableName")

I think this is happening because you haven't specified the catalog name in identifier of write to and neither made my_catalog as default ....

can you please try once with 3 part identifier here i.e sparkSession.catalog.tableExists(s"$catalog.$dbName.$tableName")

ref : A ticket stating similar issue : #2202 (comment)

Thanks for your input. This line is not creating the issue, it is caused by DataFrameWriterV2.append() method. df.writeTo(s"$catalog.dbName.$tableName") .append

Anyway I changed the line you asked, including catalog name in the tableExists method, but I received another error. The error is not there, when I downgraded to Iceberg 0.13.0

Error trace, when I used iceberg 0.14.0 and included catalog name in

sparkSession.catalog.tableExists(s"$catalog.$dbName.$tableName")

2022-08-17 12:56:03,376 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(94)): Exception in User Class
org.apache.spark.sql.catalyst.parser.ParseException: 
mismatched input '.' expecting {<EOF>, '-'}(line 1, pos 43)

== SQL ==
glue_catalog.mydb.mytable
-----------------^^^

    at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:255)
    at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:124)
    at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:49)
    at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parseTableIdentifier(ParseDriver.scala:48)
    at org.apache.spark.sql.catalyst.parser.extensions.IcebergSparkSqlExtensionsParser.parseTableIdentifier(IcebergSparkSqlExtensionsParser.scala:73)
    at org.apache.spark.sql.internal.CatalogImpl.tableExists(CatalogImpl.scala:251)
    at Main$.main(Main.scala:78)
    at Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.amazonaws.services.glue.SparkProcessLauncherPlugin.invoke(ProcessLauncher.scala:48)
    at com.amazonaws.services.glue.SparkProcessLauncherPlugin.invoke$(ProcessLauncher.scala:48)
    at com.amazonaws.services.glue.ProcessLauncher$$anon$1.invoke(ProcessLauncher.scala:78)
    at com.amazonaws.services.glue.ProcessLauncher.launch(ProcessLauncher.scala:143)
    at com.amazonaws.services.glue.ProcessLauncher$.main(ProcessLauncher.scala:30)
    at com.amazonaws.services.glue.ProcessLauncher.main(ProcessLauncher.scala)
singhpk234 commented 2 years ago

Thanks for getting back.

I think i found the issue here :

wouldn't this be : i.e $dbName rather than dbName in your if tableExists branch

    if (tableExists){
        println("Appending table")

        df.writeTo(s"$catalog.dbName.$tableName")
        .append

    }

it should be this right ?

        df.writeTo(s"$catalog.$dbName.$tableName")
        .append

since you haven't added $ in dbName it is treating it as a string implying your fully qualified name = "glue_catalog.dbName.mytable" but I think you created "glue_catalog.mydb.mytable" ...

Can you please try it with the above ..

I tried your script in my local setup with 3.1.2 spark with 0.14 iceberg and can confirm it worked for me post correcting it.

Also on a side note : there seems be an extra } in the fileIO impl. s"spark.sql.catalog.$catalog}.io-impl".

t0ma-sz commented 2 years ago

Thanks for getting back.

I think i found the issue here :

wouldn't this be : i.e $dbName rather than dbName in your if tableExists branch

    if (tableExists){
        println("Appending table")

        df.writeTo(s"$catalog.dbName.$tableName")
        .append

    }

it should be this right ?

        df.writeTo(s"$catalog.$dbName.$tableName")
        .append

since you haven't added $ in dbName it is treating it as a string implying your fully qualified name = "glue_catalog.dbName.mytable" but I think you created "glue_catalog.mydb.mytable" ...

Can you please try it with the above ..

I tried your script in my local setup with 3.1.2 spark with 0.14 iceberg and can confirm it worked for me post correcting it.

Also on a side note : there seems be an extra } in the fileIO impl. s"spark.sql.catalog.$catalog}.io-impl".

Hello, thanks for your help. I fixed typo errors (it came when I masked script to hide actual values).

Unfortunately append operation is still not working on Glue environment, I also tried using DataFrameWriter (V1), but the error is the same: "Exception in User Class: org.apache.spark.sql.AnalysisException : org.apache.hadoop.hive.ql.metadata.HiveException: Unable to fetch table mytable. StorageDescriptor#InputFormat cannot be null for table: mytable (Service: null; Status Code: 0; Error Code: null; Request ID: null; Proxy: null)"

t0ma-sz commented 2 years ago

Hello again @singhpk234 ,

I was able to fix the issue by rewriting all the code based on official AWS docs, append function started to working again. I was not able to fix the sparkSession.catalog.tableExists(s"$catalog.$dbName.$tableName"), so I did a workaround that is working as expected:

  def doesTableExistIceberg(spark: SparkSession, table: String): Boolean = {
      println(s"Checking if table $table exists")
      Try {spark.read.table(table)} match {
    case Success(i) => true
    case Failure(s) => println(s"Failed. Reason: $s"); false
}

Thanks for your help

jhchee commented 1 year ago

@t0ma-sz Would reading the table trigger full table scan or only table metadata will be scanned?

jhchee commented 1 year ago

I hope someone from AWS or Iceberg would fix this, the workaround is not really well known

t0ma-sz commented 1 year ago

@t0ma-sz Would reading the table trigger full table scan or only table metadata will be scanned?

Sorry I don’t work with Spark for some time, but I believe it will not do full table scan, as no Spark action is called and Spark is lazy evaluated.

AlbertoSoto25 commented 11 months ago

Hello again @singhpk234 ,

I was able to fix the issue by rewriting all the code based on official AWS docs, append function started to working again. I was not able to fix the sparkSession.catalog.tableExists(s"$catalog.$dbName.$tableName"), so I did a workaround that is working as expected:

  def doesTableExistIceberg(spark: SparkSession, table: String): Boolean = {
      println(s"Checking if table $table exists")
      Try {spark.read.table(table)} match {
    case Success(i) => true
    case Failure(s) => println(s"Failed. Reason: $s"); false
}

Thanks for your help

Hi, could you show us the doc in order to achieve it? I need the same implementation as you and I cannot find it.

Thanks

t0ma-sz commented 11 months ago

@AlbertoSoto25 I believe originally it was there: https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-format-iceberg.html , but as it was some time ago most probably docs have been changed, but at least you could try to reuse parts of code that are there.