apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.23k stars 2.39k forks source link

[SUPPORT] Trying to load a parquet file. org.apache.avro.AvroRuntimeException: Not a valid schema field: ts #10696

Closed alberttwong closed 5 months ago

alberttwong commented 5 months ago

Using the NYC taxi dataset. I have no idea why it's asking about a ts column since the parquet doesn't have that field.

atwong@Albert-CelerData Downloads % parquet-tools inspect green_tripdata_2023-01.parquet

############ file meta data ############
created_by: parquet-cpp-arrow version 8.0.0
num_columns: 20
num_rows: 68211
num_row_groups: 1
format_version: 1.0
serialized_size: 10705

############ Columns ############
VendorID
lpep_pickup_datetime
lpep_dropoff_datetime
store_and_fwd_flag
RatecodeID
PULocationID
DOLocationID
passenger_count
trip_distance
fare_amount
extra
mta_tax
tip_amount
tolls_amount
ehail_fee
improvement_surcharge
total_amount
payment_type
trip_type
congestion_surcharge
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import scala.collection.JavaConversions._

val df = spark.read.parquet("s3://huditest/green_tripdata_2023-01.parquet")

val databaseName = "hudi_sample"
val tableName = "hudi_coders_hive"
val basePath = "s3a://huditest/hudi_coders"

df.write.format("hudi").
  option(org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME, tableName).
  option(RECORDKEY_FIELD_OPT_KEY, "lpep_pickup_datetime").
  option("hoodie.datasource.hive_sync.enable", "true").
  option("hoodie.datasource.hive_sync.mode", "hms").
  option("hoodie.datasource.hive_sync.database", databaseName).
  option("hoodie.datasource.hive_sync.table", tableName).
  option("hoodie.datasource.hive_sync.metastore.uris", "thrift://hive-metastore:9083").
  option("fs.defaultFS", "s3://huditest/").
  mode(Overwrite).
  save(basePath)
alberttwong commented 5 months ago

It looks like ts is the default for precombine. You have to define PRECOMBINE_FIELD_OPT_KEY to a field in your schema.

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import scala.collection.JavaConversions._

val df = spark.read.parquet("s3://huditest/green_tripdata_2023-01.parquet")

val databaseName = "hudi_sample"
val tableName = "hudi_coders_hive"
val basePath = "s3a://huditest/hudi_coders"

df.write.format("hudi").
  option(org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME, tableName).
  option(RECORDKEY_FIELD_OPT_KEY, "lpep_pickup_datetime").
  option(PRECOMBINE_FIELD_OPT_KEY, "lpep_pickup_datetime").
  option("hoodie.datasource.hive_sync.enable", "true").
  option("hoodie.datasource.hive_sync.mode", "hms").
  option("hoodie.datasource.hive_sync.database", databaseName).
  option("hoodie.datasource.hive_sync.table", tableName).
  option("hoodie.datasource.hive_sync.metastore.uris", "thrift://hive-metastore:9083").
  option("fs.defaultFS", "s3://huditest/").
  mode(Overwrite).
  save(basePath)