AbsaOSS / spline-spark-agent

Spline agent for Apache Spark
https://absaoss.github.io/spline/
Apache License 2.0
183 stars 93 forks source link

Duplicated attribute IDs #272

Closed wajda closed 3 years ago

wajda commented 3 years ago

UPDATE: A temporary workaround - https://github.com/absaoss/spline-spark-agent/issues/272#issuecomment-895947366

The issue was found in and causing AbsaOSS/spline#925

See JSON sample in https://github.com/AbsaOSS/spline/issues/925#issuecomment-874263960

minimal code to replicate the issue:

package za.co.absa.spline

import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

object SplineDuplicatedIds {

  val extraConf: Iterable[(String, String)] = List(
    ("spark.sql.queryExecutionListeners", "za.co.absa.spline.harvester.listener.SplineQueryExecutionListener"),
    ("spark.spline.lineageDispatcher", "console"),
    ("spark.spline.mode", "REQUIRED")
  )

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf()
      .setAll(extraConf)
      .setMaster("local[*]")

    val ss = SparkSession.builder().config(sparkConf).getOrCreate()
    import ss.implicits._

    val df = Seq(("adcde1_12938597", 162))
      .toDF("unique_id", "total_commission")

    val result: DataFrame = df
      .withColumn("commission", col("total_commission"))
      .drop("total_commission")

    val firstValidTransactions = result.withColumnRenamed("commission", "foo")

    val joined = result.join(firstValidTransactions, usingColumns = Seq("unique_id"))

    joined
      .write
      .mode(SaveMode.Overwrite)
      .option("path", "tmp/spline_test_bi_duplicates")
      .saveAsTable("spline_test_bi_duplicated")
  }
}

Happens on Spark 3.1, 2.4 and probably all others as well

wajda commented 3 years ago

@kuhnen, can you share a Spark job source code that reproduces the issue please? We could reconstruct it from the execution plan JSON, but with the source code it would be much easier. Thank you!

wajda commented 3 years ago

Observation:

The sample JSON above contains 3 Read operations that read from the same data source. Each of them emits its own set of attributes (same name and type, but different IDs). Then there are 3 Project operations each follows one of those reads and outputs an attribute cast(total_commission) AS commission. For some reason all those commission attributes are given the same ID!! (in this example is 123)

kuhnen commented 3 years ago

@wajda I can, but the problem is that it might be hard to follow, We created a framework for our business case, so it might not be that easy to understand the flow of the job :(.

class CustomerTransactionWebgainsCn(val sparkSessionFilesHelper: SparkSessionFilesHelper,
                                    override val dataLakeUpdater: Option[DataLakeUpdaterAthena] = None)
  extends WorkFlow with CustomerTransactionsTransformation {

  val uniqueTransactionsName: CompleteNameConventionBuilder = NameConventions.DataLake.dumps.affiliate_networks.webgains
  val historyName: CompleteNameConventionBuilder = NameConventions.DataLake.dumps.affiliate_networks.webgains_history

  override val colsAddedAfter: Set[String] = Set(
    commissionGroupListColName,
    totalOriginalColName,
    commissionOriginalColName,
    transactionTypeColName,
    orderIdColName,
    vpFlagColName,
  )

  override val dataLakeTransactions: DataFrame = sparkSessionFilesHelper.readDataFrame(buckets.dataLakeRead, uniqueTransactionsName)
    .withColumn(saleValueColName, emtpyOrNullToZeroLogic(saleValueColName))
    .drop(programIdColName)
  override val historicTransactions: DataFrame = sparkSessionFilesHelper.readDataFrame(bucket = buckets.dataLakeRead, historyName)
    .withColumn(saleValueColName, emtpyOrNullToZeroLogic(saleValueColName))
    .select(uniqueIdColName, createdColName, statusColName, paymentStatusColName, saleValueColName, commissionColName)

  override def transformDataLakeTransactions(initialDataLakeTransactions: DataFrame): DataFrame = initialDataLakeTransactions

  override def transformHistoricTransactions(initialHistoricTransactions: DataFrame): DataFrame = initialHistoricTransactions

  override def selectCommissionAndTotalOriginal(historicTransactions: DataFrame): DataFrame = historicTransactions
    .drop(statusColName).drop(paymentStatusColName)
    .withColumnRenamed(saleValueColName, totalOriginalColName)
    .withColumnRenamed(commissionColName, commissionOriginalColName)
    .transform(updateCommissionAndTotalOriginalValues)

  override def selectBiValidated(historicTransactions: DataFrame): DataFrame = historicTransactions
    .drop(saleValueColName, commissionColName)
    .withColumn(transactionStatusColName, transactionStatusLogic)
    .transform(updateBiValidateDate)
    .drop(statusColName, paymentStatusColName, transactionStatusColName)

  override def selectData(dataLakeTransactions: DataFrame, colsAddedAfter: Set[String]): DataFrame = dataLakeTransactions
    .withColumn(clickOutIdColName, clickoutIdLogic(clickRefColName))
    .withColumnRenamed(programNameColName, programIdColName)
    .withColumn(totalColName, emtpyOrNullToZeroLogic(saleValueColName))
    .withColumn(totalColName, col(totalColName).cast(decimalType))
    .withColumn(commissionColName, col(commissionColName).cast(decimalType))
    .withColumnRenamed(transactionDateTimeColName, timeOfEventColName)
    .withColumn(affiliateNetworkColName, typedLit(AffiliateNetworkNames.webgains.toString))
    .withColumn(transactionStatusColName, transactionStatusLogic)
    .selectExpr(colsToBeSelected.filterNot(colsAddedAfter.contains(_)): _*)

  override def prepareDataFrameForUpdatingCommissionAndTotal(historicTransactions: DataFrame): DataFrame = historicTransactions
    .withColumn(totalColName, emtpyOrNullToZeroLogic(saleValueColName))
    .withColumn(totalColName, col(totalColName).cast(decimalType))
    .withColumn(commissionColName, col(commissionColName).cast(decimalType))
    .withColumn(transactionStatusColName, transactionStatusLogic)
    .drop(statusColName, saleValueColName, paymentStatusColName, transactionStatusColName)

  override def joinWithHistoric(selectedData: DataFrame, commissionAndTotalOriginal: DataFrame, biValidated: DataFrame, preparedDataFrameForUpdatingCommissionAndTotal: DataFrame): DataFrame = selectedData
    .join(commissionAndTotalOriginal, usingColumn = uniqueIdColName)
    .withColumn(transactionTypeColName, transactionTypeTotalLogic)
    .withColumn(vpFlagColName, vpFlagLogic)
    .join(biValidated, usingColumn = uniqueIdColName)
    .transform(mainDataFrame => updateTransactionCommissionAndTotal(preparedDataFrameForUpdatingCommissionAndTotal, mainDataFrame))

  override def run(args: Array[String]): Unit = {
    this.transformAndSave(buckets.dataLakeWrite, sparkSessionFilesHelper, dataLakeUpdater)
  }
}
kuhnen commented 3 years ago

@wajda @cerveada thanks for looking into it, is there any way I can help on this? Maybe if you give the direction which class should I start to look at so that I can try to help?
Thanks a lot.

cerveada commented 3 years ago

The key to fixing this is to create a Spark code that I could run and reproduce the issue. I was so far not successful in reproducing the issue myself.

From the json I was able to find some information:

The duplicated attribute is comission it is read from the data source as total_comission then cast to data type Decimal(19, 3) and then renamed to commission. This happens three times in the lineage and the problem is each time it happens the resulting attribute after rename is always the same, so three duplicates are created.

So if you would be able to recreate the code that is used to produce the commission, that could help us or even better a runnable example that reproduces the issue. From the code you provided, it seems to be there are some operations, but it's not clear how it all connects together...

kuhnen commented 3 years ago

@cerveada let me try. I hope to give you the code this week. Thanks a lot.

cerveada commented 3 years ago

@kuhnen thank you, It may be the only way. I tried a few more attempts to replicate the issue, but even though I am able to produce very similar lineage, the duplicated attribute just doesn't occur for me.

I found another duplicate attribute and there is a pattern:

comission [dulicated] <- (rename) <- (cast) <- total_comission [input]
total [dulicated] <- (rename) <- (cast) <- total_shopping_cart [input]

The input attributes are read several times from what seems to be the same parquet file. Each read creates its own set of input attributes with unique ids. That is correct, but then the cast and rename happen and the result is the same attribute multiple times.

It almost seems like some optimization, but spline captures the logical plan, where no optimizations should be present.

Some ideas what could cause the issues:

kuhnen commented 3 years ago

@cerveada thanks a lot :). Since I was not able to have the code ready, let me give you answer for the three points above.

I can add we are reading s3 files instead of using the tables approach. Could it be the renaming? We use lots of renaming to build the datamarts for the analytics people. I hope this already gives you some extra help.

Which version should I use? 0.6.1 ? Thanks a lot.

cerveada commented 3 years ago

I can add we are reading s3 files instead of using the tables approach. Could it be the renaming? We use lots of renaming to build the datamarts for the analytics people.

Maybe, If I will be able to debug any code example that is causing the issue I am sure I will be able to find the cause, but without it, it's just guessing.

Which version should I use? 0.6.1 ? Thanks a lot.

Yes, 0.6.1

wajda commented 3 years ago

Another code snippet that potentially reproduces it (see https://github.com/AbsaOSS/spline-spark-agent/issues/265#issuecomment-894070863):

SELECT 
    concat_ws(',', col1,col2) AS row_info,
    b.file_row_count AS file_row_count,
    a.filename AS filename,
    run_timestamp,
    load_date,
    batch_id
FROM (
    SELECT *, row_number() OVER (PARTITION BY filename ORDER BY batch_id) AS batch_id_rank 
    FROM schema_validated_failed
    WHERE schema_validation_result = False
) a
JOIN (
    SELECT count(1) AS file_row_count, filename
    FROM schema_validated_failed
    GROUP BY filename
) b 
ON a.filename = b.filename
WHERE batch_id_rank <= 100
kuhnen commented 3 years ago

@wajda thanks a lot :). @cerveada please let me know if you still need my code. Sorry, so many meetings this week,

kuhnen commented 3 years ago

@wajda @cerveada , I created a new project spark project to help with this task. But now I am getting this error:

java.lang.NoClassDefFoundError: scala/tools/reflect/package$
test_aws_catalog[ERROR]     at za.co.absa.commons.reflect.ReflectionUtils$.compile(ReflectionUtils.scala:46)

any idea how to solve it or why it began to happen?

cerveada commented 3 years ago

It could be Scala version mismatch. Do you use the same Scala version for everything?

kuhnen commented 3 years ago

@cerveada thanks. I found the issue, I mean, I could solve it, but I do not understand why 😂 . At least I can work and hopefully give you guys a code that recreates the issue 🙌 .

kuhnen commented 3 years ago

@cerveada @wajda I have a piece of code, without any external dependencies that shows the problem. Please let me know if you can use it. I will try to find the exactly line in the mean time:

import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.{col, lower, typedLit, when}
import org.apache.spark.sql.types.DecimalType
import org.apache.spark.sql.{Column, DataFrame, SaveMode, SparkSession}

object SplineDuplicatedIds {

  val extraConf: Iterable[(String, String)] = List(
    ("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"),
    ("hive.imetastoreclient.factory.class", "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"),
    ("spark.sql.queryExecutionListeners", "za.co.absa.spline.harvester.listener.SplineQueryExecutionListener"),
    ("spark.spline.producer.url", "http://localhost:8080/producer"),
    ("spark.spline.mode", "REQUIRED")
  )

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf()
      .setAll(extraConf)
      .setMaster("local[*]")

    val ss = SparkSession.builder().config(sparkConf).getOrCreate()
    import ss.implicits._
    val uniqueIdColName = "unique_id"
    val createdColName = "created"
    val totalCommissionColName = "total_commission"
    val totalShoppingCartColName = "total_shopping_cart"
    val statusColName = "status"
    val commissionColName = "commission"

    val adcellTransactionStatusLogic: Column = {

      when(lower(col(statusColName)) === "accepted", "Validated")
        .otherwise(
          when(lower(col(statusColName)) === "open", "Pending")
            .otherwise(
              when(lower(col(statusColName)) === "cancelled", "Refused")
                .otherwise(typedLit[String](null))))

    }

    val decimalType = new DecimalType(19, 3)
    val transactionStatusColName = "transaction_status"
    val df = Seq(("adcde1_12938597", 1.162, 7.260, "2021-06-28 21:43:43", "accepted"))
      .toDF("unique_id", "total_commission", "total_shopping_cart", "created", "status")

    val historicTransactions: DataFrame = df
      .withColumn("commission", col(totalCommissionColName).cast(decimalType))

    def updateBiValidateDate(df: DataFrame): DataFrame = {

      val firstValidStatusDateColName = "first_valid_status_date"
      val firstValidTransactions = df
        .select(uniqueIdColName, transactionStatusColName, createdColName)
        .where(col(transactionStatusColName).isInCollection(Set("Paid", "Validated")))
        .withColumnRenamed(createdColName, firstValidStatusDateColName)
        .drop(transactionStatusColName)

      val withFirstValidDate = df.join(firstValidTransactions, usingColumns = Seq(uniqueIdColName), joinType = "left")
      withFirstValidDate

    }

    val result = historicTransactions
      .drop(totalCommissionColName, totalShoppingCartColName, commissionColName)
      .withColumn(transactionStatusColName, adcellTransactionStatusLogic)
      .transform(updateBiValidateDate)
      .drop(transactionStatusColName, statusColName)

    result
      .write
      .mode(SaveMode.Overwrite)
      .option("path", "tmp/spline_test_bi_duplicates")
      .saveAsTable("spline_test_bi_duplicated")

  }
}
cerveada commented 3 years ago

I will try it out, thanks a lot!

kuhnen commented 3 years ago

Sorry about this line ("hive.imetastoreclient.factory.class", "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"). for sure you do not need that

cerveada commented 3 years ago

The duplicates are there, now I can finally try to fix this.

kuhnen commented 3 years ago

@cerveada great :). I was trying to simplify even more the code. Is there a way I can long the name of the attributes? I can only see this: Cannot send lineage data to http://localhost:8080/producer/execution-plans. HTTP Response: 400 Duplicated attribute IDs: 33, 26

cerveada commented 3 years ago

You can use this config

val extraConf: Iterable[(String, String)] = List(
    ("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"),
    ("spark.sql.queryExecutionListeners", "za.co.absa.spline.harvester.listener.SplineQueryExecutionListener"),
    ("spark.spline.lineageDispatcher", "console"),
    ("spark.spline.mode", "REQUIRED")
  )

The console dispatcher will log the json in the console instead of trying to send it.

Or I just use debug breakpoint in LineageHarvester.Scala Line 150. Where I can see the plan object directly in the IDE.

kuhnen commented 3 years ago

@cerveada quick update from my side :) If I add localCheckpoint here

  val firstValidTransactions = df
        .select(uniqueIdColName, transactionStatusColName, createdColName)
        .where(col(transactionStatusColName).isInCollection(Set("Paid", "Validated")))
        .withColumnRenamed(createdColName, firstValidStatusDateColName)
        .drop(transactionStatusColName)
       .localCheckPoint()

The duplicated ids are solved. Maybe this helps with your debugging 👍