apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.39k stars 2.42k forks source link

[SUPPORT] streamRead from s3 hudi table sometimes returns incorrect column values #8974

Closed Tyler-Rendina closed 1 year ago

Tyler-Rendina commented 1 year ago

Describe the problem you faced

Issue with readStream pulling column values back to another column when a previous column is not present in the batch.

To Reproduce

Steps to reproduce the behavior:

  1. Upstream glue 4.0 job upserts parquet CoW data from kinesis stream successfully using a near identical method to this demo.
  2. Launch spark in notebook on ipykernel running pyspark
    
    import os
    import sys
    import re
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import *
    from pyspark.sql.types import StringType, IntegerType
    from pyspark.ml import PipelineModel
    import sparknlp
    from sparknlp.annotator import *
    from sparknlp.base import *
    from sparknlp.functions import *

packages = [ "org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.1", "org.apache.hadoop:hadoop-aws:3.3.2", "com.amazonaws:aws-java-sdk-bundle:1.12.477", "com.johnsnowlabs.nlp:spark-nlp-aarch64_2.12:4.4.3" ] SUBMIT_ARGS = f"--packages={','.join(packages)} pyspark-shell" os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS os.environ["PYSPARK_PYTHON"] = sys.executable os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable os.environ["AWS_ACCESS_KEY_ID"] = os.getenv("AWS_ACCESS_KEY_ID") os.environ["AWS_SECRET_ACCESS_KEY"] = os.getenv("AWS_SECRET_ACCESS_KEY") os.environ["AWS_REGION"] = "us-east-1"

spark = ( SparkSession.builder.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.sql.hive.convertMetastoreParquet", "false") .config("spark.driver.memory", "10g") .config("spark.hadoop.fs.s3a.path.style.access", "true") .config("spark.hadoop.fs.s3a.access.key", os.getenv("AWS_ACCESS_KEY_ID")) .config("spark.hadoop.fs.s3a.secret.key", os.getenv("AWS_SECRET_ACCESS_KEY")) .config("spark.hadoop.fs.s3a.endpoint", "s3.us-east-1.amazonaws.com") .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") .config( "spark.driver.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true" ) .getOrCreate()

3. Launch stream (note all debug output modes have been tested with the same result)
```python
def foreach_batch_function(df, epoch_id):
    df.show()

df_bw = (
    spark
    .readStream
    .format("hudi")
    .load("s3a://path/to/table")
    .writeStream
    .foreachBatch(foreach_batch_function)
    .start()
)

Expected behavior

(Added selectExpr on some columns to highlight the issue) Batches to have consistent data in each named column i.e:

4 +----------+--------------------+--------------------+ | published| resourceid| fulltext| +----------+--------------------+--------------------+ |2023-06-14|c6e820d0640887665...|...audiences thro...| +----------+--------------------+--------------------+

5 +----------+--------------------+--------------------+ | published| resourceid| fulltext| +----------+--------------------+--------------------+ |2023-06-14|747473179dc94626d...|Indusladies Forum...| |2023-06-14|05a0a824663209bd5...|Indusladies Forum...| |2023-06-14|8984d7d17788562ad...|these, projected ...| +----------+--------------------+--------------------+

But the next batch is returned as (note fulltext is an initial column so it wasn't distorted):

6 +--------------------+--------------------+--------------------+ | published| resourceid| fulltext| +--------------------+--------------------+--------------------+ |1c2fc53b259cb0b52...|[https://news.dove...|...Pharmaceutica...| |5c7aeb1014894b03f...|[https://going-pos...|...Biontech lore...| |91ed3fc50812240b7...|[http://www.allusa...|...whos lorepsum...| +--------------------+--------------------+--------------------+

Environment Description

Additional context

This issue doesn't seem to appear anywhere else and never occurs when I read the whole table. I suspect it could have something to do with the upsert schema parquet when columns are missing.

Stacktrace

When using memory sink I will get type mismatch errors for obvious reasons.

Tyler-Rendina commented 1 year ago

This can be closed, but would love an expert opinion.

I figured out a patch for my use case, I also could be misunderstanding schema de-evolution.

The data source was randomly rearranging structs so I had already made a whitelist of fields to keep.

I added another function to my upstream listener to add the whitelist fields with null values and that worked.

ad1happy2go commented 1 year ago

@Tyler-Rendina Would you mind sharing the patch with the community?

Tyler-Rendina commented 1 year ago

@ad1happy2go Of course, let me add some more context since the fix was a python cleaning step:

The overall AWS based ETL system developed until running into the unexpected behavior

  1. ECS Fargate container listening for API data, producing to stream.
  2. Kinesis stream consumer registered as lakeformation table
  3. Consumer Glue 4.0 script with hudi connector and 0.13.1 jars creating then upserting to lakeformation hudi table
  4. Local/EMR pyspark notebook leaveraging readStream to eventually transform to EMR Serverless script.

(Credit to @soumilshah1995 for the demo on step 2 and 3)

Problem (unexpected behavior)

The readStream function returned the first batch of data correctly, subsequent batches would return the values of proceeding columns. This was due to missing column(s) in all of the new records for the batch.

Solution

In the ECS Fargate container I had already created a whitelist of columns since the API would return one-off extra fields, some containing structs inconsistent with preceding schemas. This created issues upserting data from glue while initially developing the pipeline.

I added one more step into the whitelist filter that would add a null value to any missing fields from the whitelist:

def _whitelisted_fields(x):
    whitelist = ["fields", "to_keep", "or_add"]
    x = {k.lower(): v for k, v in x.items() if k.lower() in whitelist}
    for key in whitelist:
        if key not in x.keys():
            x[key] = None
    return x

Thoughts

I spent a ton of time looking for a solution to this since non-incremental queries did not have this issue, which led me to believe I had an issue installing 0.13 or had a config issue with hudi. I'm not sure if this actually is because the parquet file is picked up by readStream before some cleaning can happen, but I thought the behavior was interesting enough to mention to the community.

ad1happy2go commented 1 year ago

Thanks @Tyler-Rendina . Closing the issue. Let us know in case you need any more help.