Open adevore3 opened 7 months ago
I forgot to mention I did try to register KryoSerializer folowing this SOF: https://stackoverflow.com/questions/52562671/registering-classes-with-kryo-via-sparksession-in-spark-2 doing something like:
val sparkConf = new SparkConf()
.setAppName(appName)
.registerKryoClasses(Array(classOf[java.nio.ByteBuffer], classOf[java.lang.Integer]))
var builder = SparkSession.builder
.config(sparkConf)
But that didn't work either
@adevore3 without knowing your schema and the exact commit you're doing it's difficult to tell what's going wrong. Can you please share your full catalog configuration and more details so that it's possible to reproduce this?
Also btw you don't need compile "org.apache.iceberg:iceberg-aws:1.3.1"
because that's already in the iceberg-spark-runtime
Hi, I was trying to create a code sample but got distracted. I've tried to copy all the relevant code pieces here, 1 thing I couldn't do is the protobuf file. It seems to be failing on the insertIntoTable
function call based on the stacktrace
import com.indeed.osiris.iceberg.exporter.config.OutputTableDefinition
import com.indeed.osiris.iceberg.exporter.datasources.Datasource
import com.indeed.osiris.iceberg.exporter.datasources.JobArchiveOsirisDatasource.getClass
import com.indeed.spark.hivesupport.SessionBuilder
import org.apache.hadoop.fs.Path
import org.apache.iceberg.Table
import org.apache.iceberg.spark.SparkCatalog
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.joda.time.DateTime
import org.slf4j.LoggerFactory
import org.springframework.boot.builder.SpringApplicationBuilder
import org.springframework.boot.{CommandLineRunner, WebApplicationType}
import org.springframework.context.annotation.Bean
class OsirisIcebergExporter(
spark: SparkSession,
datasource: Datasource,
inputCheckpointBucket: Option[String],
inputCheckpointMillis: Option[Long],
outputTableDefinition: OutputTableDefinition,
) extends CommandLineRunner {
import OsirisIcebergExporter._
private val newHighWaterMark = inputCheckpointMillis.getOrElse(System.currentTimeMillis())
private val inputCheckpointPath = inputCheckpointBucket.map(b => new Path(s"s3a://$b/tmp/osiris_iceberg/jatt3/$newHighWaterMark"))
private val icebergCatalog = spark.sessionState.catalogManager.catalog("outbox").asInstanceOf[SparkCatalog].icebergCatalog()
override def run(rawArgs: String*): Unit = {
try {
log.info(s"running OsirisIcebergExporter. new high water mark will is $newHighWaterMark")
val df = inputDf(datasource.dataframe(spark))
df.createOrReplaceTempView("input")
val icebergTable = populateTable(df)
} finally {
// close connections
}
}
def populateTable(df: DataFrame): Table = {
// df not used in this example since we'll read from the temp view
val t = icebergCatalog.loadTable(outputTableDefinition.icebergIdentifier)
mergeIntoTable(spark, "input", t)
t
}
def mergeIntoTable(spark: SparkSession, view: String, icebergTable: Table): Unit = {
val currentSnapshot = icebergTable.currentSnapshot()
log.info(s"mergeIntoTable >>> SET spark.wap.id = ${newHighWaterMark}_delete")
spark.sql(s"SET spark.wap.id = ${newHighWaterMark}_delete")
deleteFromTable(spark, view)
log.info(s"mergeIntoTable >>> SET spark.wap.id = ${newHighWaterMark}_insert")
spark.sql(s"SET spark.wap.id = ${newHighWaterMark}_insert")
insertIntoTable(spark, view)
log.info("mergeIntoTable >>> RESET spark.wap.id")
spark.sql(s"RESET spark.wap.id")
log.info("mergeIntoTable >>> icebergTable.refresh()")
icebergTable.refresh()
}
def deleteFromTable(spark: SparkSession, view: String): Unit = {
val identifierFieldTuple = "(jobId)"
spark.sql(
s"""
|DELETE FROM outbox.osiris_iceberg.jatt3
|WHERE $identifierFieldTuple in (select $identifierFieldTuple from $view)
|""".stripMargin)
}
def insertIntoTable(spark: SparkSession, view: String): Unit = {
spark.sql(
s"""
|INSERT INTO (jobId)
|SELECT * FROM $view ORDER BY outbox.bucket(512, jobId), jobId
|""".stripMargin)
}
def inputDf(sourceDf: DataFrame): DataFrame = {
inputCheckpointPath.map(b => {
val fileSystem = b.getFileSystem(spark.sparkContext.hadoopConfiguration)
if (!fileSystem.exists(b)) {
//write the output, and then read it back
//which seems crazy, but it makes recovery faster if downstream stages fail
//and an osiris partition needs to be reread (which is super slow)
//dont just checkpoint, cus then wed still only have 256 input partitions
sourceDf.write
.option("maxRecordsPerFile", 500000) //this should probably actually be configurable. assume 1k rows, goal is 512-1gb files
.orc(b.toString)
}
spark.createDataFrame(spark.read.orc(b.toString).rdd, sourceDf.schema)
})
.getOrElse({
//if we didnt checkpoint in s3, we should cache
//not just optimization, needed for correctness
// (in case new records come into osiris between the delete and insert)
sourceDf.cache()
sourceDf
})
}
}
object OsirisIcebergExporter {
private val log = LoggerFactory.getLogger(getClass)
def main(args: Array[String]): Unit = {
java.security.Security.setProperty("networkaddress.cache.ttl", "60")
new SpringApplicationBuilder(classOf[OsirisIcebergExporter])
.web(WebApplicationType.NONE)
.run(args: _*)
.close()
}
}
@Bean
def spark(): SparkSession = {
val icebergCatalogConfig = Map(
s"spark.sql.catalog.outbox" -> "org.apache.iceberg.spark.SparkCatalog",
s"spark.sql.catalog.outbox.io-impl" -> "org.apache.iceberg.aws.s3.S3FileIO",
s"spark.sql.catalog.outbox.catalog-impl" -> "org.apache.iceberg.aws.glue.GlueCatalog",
s"spark.sql.catalog.outbox.glue.id" -> "936411429724",
//"spark.sql.catalog.iceberg.s3.staging-dir" -> "/tmp/" //should this be configurable? is it even needed?
)
SessionBuilder(s"OsirisIcebergExporter-jatt3")
.withMoreProperties(icebergCatalogConfig)
.build
}
import org.apache.log4j.Level
case class SessionBuilder(
appName: String,
logLevel: Level = Level.INFO,
properties: Map[String, String] = Map.empty,
) {
def withMoreProperties(properties: Map[String, String]): SessionBuilder = copy(properties = this.properties ++ properties)
def builder: SparkSession.Builder = {
SparkSession.builder
.appName(appName)
.enableHiveSupport
.config("spark.hadoop.hive.exec.dynamic.partition", "true")
.config("spark.hadoop.hive.exec.dynamic.partition.mode", "nonstrict")
.config("spark.hadoop.hive.exec.max.dynamic.partitions", "2048")
.config("spark.hadoop.mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
.config("spark.kryoserializer.buffer.max", "1g")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.hive.metastorePartitionPruning", "true")
.config("spark.sql.orc.cache.stripe.details.size", "10000")
.config("spark.sql.orc.enabled", "true")
.config("spark.sql.orc.enableVectorizedReader", "true")
.config("spark.sql.orc.filterPushdown", "true")
.config("spark.sql.orc.impl", "native")
.config("spark.sql.orc.splits.include.file.footer", "true")
.config("spark.sql.parquet.filterPushdown", "true")
.config("spark.sql.parquet.mergeSchema", "false")
.config("spark.sql.session.timeZone", "-06:00")
.config("spark.sql.sources.partitionOverwriteMode", "dynamic")
.config("spark.ui.view.acls", "*")
}
def build: SparkSession = {
val sparkSession = builder.getOrCreate
sparkSession.sparkContext.setLogLevel(logLevel.toString)
sparkSession
}
}
@Bean
def datasource(): Datasource = {
val startTime: Long = DateTime.now().getMillis
val inputPartitions = 4
val useDfc = false
JobArchiveOsirisDatasource(startTime, Option(inputPartitions).map(_.toInt), useDfc)
}
import com.example.proto.JobArchiveEntry
import org.apache.spark.sql.types.{StructField, StructType}
trait Datasource {
def dataframe(spark: SparkSession): DataFrame
def setNullable(df: DataFrame, fieldName: String, nullable: Boolean) : DataFrame = {
val schema = df.schema
val newSchema = StructType(schema.map {
case StructField( c, t, _, m) if c.equals(fieldName) => StructField( c, t, nullable = nullable, m)
case y: StructField => y
})
df.sqlContext.createDataFrame( df.rdd, newSchema )
}
}
case class OsirisRowv2(value: Array[Byte])
case class JobArchiveOsirisDatasource(minTs: Long, inputPartitions: Option[Int], useDfc: Boolean) extends Datasource {
def dataframe(spark: SparkSession): DataFrame = {
import spark.implicits._
val dfcOptions: Map[String, String] =
if (useDfc)
Map(
"osiris.dfc"-> "true",
"osiris.dfc.datadir"-> "/osiris/",
"osiris.dfc.readahead"-> "4",
"osiris.s3.bucket" -> "cmhprod3-cdcosiris"
)
else
Map(
"servers" -> "osirisserver:26238"
)
val rawDf = spark.read
.format("osirisv2")
.option("keys", "jobId")
.option("keysplitter", "vlong-1000")
.option("table", "jobarchive_new")
.option("osiris.s3.region", "us-east-2")
.option("osiris.request.timeout", "180000")
.option("osiris.retry.timeout", "360000")
.option("minTs", minTs)
.options(dfcOptions)
.load()
val df = rawDf
.as[OsirisRowv2]
.map(r => RowTransformer.parseRow(r))
.toDF()
inputPartitions
.map(ps => df.repartition(ps))
.getOrElse(df)
}
}
object RowTransformer extends Serializable {
val transcoder = new ByteArrayTranscoder()
def parseRow(row: OsirisRowv2): JobInfo = {
val bytes = transcoder.toBytes(row.value)
val jobArchiveEntry = JobArchiveEntry.parseFrom(bytes)
JobInfo(
jobArchiveEntry.getJobId,
jobArchiveEntry.getTitleId,
)
}
}
case class JobInfo(
jobId: Long,
titleId: Int,
)
class ByteArrayTranscoder {
def fromBytes(bytes: Array[Byte]) = bytes
def toBytes(bytes: Array[Byte]) = bytes
def equals(o: AnyRef): Boolean = {
if (this eq o) return true
o != null && (getClass eq o.getClass)
}
override def toString = "byte"
}
Sorry for the complex code sample. I verified that I can reproduce the error w/ a more simple datasource:
case class JobArchiveOsirisDatasource(minTs: Long, inputPartitions: Option[Int], useDfc: Boolean) extends Datasource {
def dataframe(spark: SparkSession): DataFrame = {
spark.sql("select * from db.table")
}
}
After further testing I figured out if I set the serializer to JavaSerializer
instead of using KryoSerializer
then the error goes away.
I set this in my SessionBuilder's properties
"spark.serializer" -> "org.apache.spark.serializer.JavaSerializer"
Hope this helps w/ reproducing the error
+1. The stack is consistent. When using Spark 3.5 x iceberg 1.5.0, I encountered a strange situation where spark-sql and spark-shell performed normally, but spark-submit reported an error.
@adevore3 Kryo has better serialization performance than JavaSerializer. Setting global serialization to JavaSerializer is not a good solution.
From the stack, when deserializing SerializableByteBufferMap value, Kryo received int value that cannot be resolved to byteBuffer. Continuing to look at the org.apache.iceberg.SerializableByteBufferMap#writeReplace method, it can be observed that this Java serialization method will be serialized in the form of int[], byte[][]. So this exception may be related to inconsistent serialization and deserialization methods
I have found a solution. See https://github.com/apache/iceberg/issues/446
public class IcebergRegistrator implements KryoRegistrator {
@Override
public void registerClasses(Kryo kryo) {
try {
kryo.register(Class.forName("org.apache.iceberg.GenericDataFile"), new JavaSerializer());
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
}
SparkSession.builder()
.config("spark.kryo.registrator", "org.apache.spark.sql.serializer.IcebergRegistrator")
For anyone still running into this issue, you may turn on Kryo trace log
com.esotericsoftware.minlog.Log.TRACE();
In my case, it's due java.nio.HeapByteBuffer
being serialized in JDK17 on the cluster and deserialized in JDK8 on the client.
Apache Iceberg version
None
Query engine
None
Please describe the bug 🐞
We were upgrading to spark 3.4.1 when we ran into this issue. Currently running on spark 3.2.1 which works. We're using the following dependencies:
We didn't need
iceberg-spark-extensions
when running on spark 3.2.1 but added it in just in case. Before that we attempted to use version 1.4.3 foriceberg-spark-runtime
&iceberg-aws
.Below is the full error & stacktrace.