apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.39k stars 2.42k forks source link

[SUPPORT] Delete not functioning with deltastreamer #3336

Closed mithalee closed 3 years ago

mithalee commented 3 years ago

Tips before filing an issue

Describe the problem you faced

A clear and concise description of the problem.

To Reproduce

Steps to reproduce the behavior:

1.Perform a spark submit job to insert data into Hudi table saved in S3. I am using the Hudi Delta Streamer utility with the "hoodie_is_deleted" column set to "False" for all the records. This works successfully. The data is in parquet file in S3.The parquet file is generated from a panda data frame which is a different process(not part of the spark submit).

  1. Perform a spark submit job to delete data from the existing Hudi table saved in S3.I am using the Hudi Delta Streamer utility with the "hoodie_is_deleted" column set to "True" for all the records that need to be deleted. The delete file is in parquet format in S3 and is generated from a panda data frame which is a different process(not part of the spark submit). This now throws an error and I see roll back files in .hoodie folder.
  2. I am running this in Spark on Kubernetes. https://spark.apache.org/docs/3.1.1/configuration.html#kubernetes

Expected behavior I expect to see deletion of the row with "hoodie_is_deleted=True"in the existing Hudi table in S3.

A clear and concise description of what you expected to happen.

Environment Description

Additional context SPARK SUMBIT COMMAND FOR INITIAL LOAD TO HUDI TABLE(THIS ONE WORKS SUCCESSFULLY): ./spark-submit --master k8s://https://..sk1.us-west-1.eks.amazonaws.com --deploy-mode cluster --name spark-hudi --jars https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.12/3.1.1/spark-avro_2.12-3.1.1.jar --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --conf spark.executor.instances=1 --conf spark.kubernetes.container.image=../spark:spark-hudi-0.2 --conf spark.kubernetes.namespace=spark-k8 --conf spark.kubernetes.container.image.pullSecrets=dockercloud-secret --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem --conf spark.hadoop.fs.s3a.endpoint=s3.us-west-1.amazonaws.com --conf spark.hadoop.fs.s3a.access.key='A...R' --conf spark.hadoop.fs.s3a.secret.key='L..gG' --conf spark.serializer=org.apache.spark.serializer.KryoSerializer s3a://lightbox-sandbox-dev/hudi-root/spark-submit-jars/hudi-utilities-bundle_2.12-0.8.0.jar --table-type COPY_ON_WRITE --source-ordering-field two --source-class org.apache.hudi.utilities.sources.ParquetDFSSource --target-base-path s3a://.../hudi-root/transformed-tables/hudi_writer_mm4/ --target-table test_table --base-file-format PARQUET --hoodie-conf hoodie.datasource.write.recordkey.field=two --hoodie-conf hoodie.datasource.write.partitionpath.field=two --hoodie-conf hoodie.datasource.write.precombine.field=two --hoodie-conf hoodie.deltastreamer.source.dfs.root=s3a://../jen/example_2.parquet

SPARK SUMBIT COMMAND FOR INITIAL LOAD TO HUDI TABLE(THIS ONE THROWS ERROR): ./spark-submit --master k8s://https://..sk1.us-west-1.eks.amazonaws.com --deploy-mode cluster --name spark-hudi --jars https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.12/3.1.1/spark-avro_2.12-3.1.1.jar --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --conf spark.executor.instances=1 --conf spark.kubernetes.container.image=../spark:spark-hudi-0.2 --conf spark.kubernetes.namespace=spark-k8 --conf spark.kubernetes.container.image.pullSecrets=dockercloud-secret --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem --conf spark.hadoop.fs.s3a.endpoint=s3.us-west-1.amazonaws.com --conf spark.hadoop.fs.s3a.access.key='A...R' --conf spark.hadoop.fs.s3a.secret.key='L..gG' --conf spark.serializer=org.apache.spark.serializer.KryoSerializer s3a://lightbox-sandbox-dev/hudi-root/spark-submit-jars/hudi-utilities-bundle_2.12-0.8.0.jar --table-type COPY_ON_WRITE --source-ordering-field two --source-class org.apache.hudi.utilities.sources.ParquetDFSSource --target-base-path s3a://.../hudi-root/transformed-tables/hudi_writer_mm4/ --target-table test_table --base-file-format PARQUET --hoodie-conf hoodie.datasource.write.recordkey.field=two --hoodie-conf hoodie.datasource.write.partitionpath.field=two --hoodie-conf hoodie.datasource.write.precombine.field=two --hoodie-conf hoodie.deltastreamer.source.dfs.root=s3a://../jen/example_upsert2.parquet

Add any other context about the problem here. Code to generate the parquet files that I am using: New insert parquet file(example2.parquet)

import pyarrow.parquet as pq import numpy as np import pandas as pd import pyarrow as pa df = pd.DataFrame({'one': [-1, 3, 2.5], 'two': [100, 101, 103], 'three': [True, True, True], '_hoodie_is_deleted': [False, False, False]}, index=list('abc')) table = pa.Table.from_pandas(df) print(table) pq.write_table(table, 'example_2.parquet')

Code to generate the parquet file that I am using: Upsert/delete parquet file(example_upsert2.parquet) import pyarrow.parquet as pq import numpy as np import pandas as pd import pyarrow as pa df = pd.DataFrame({'one': [-1, 3, 2.5], 'two': [100, 101, 103], 'three': [True, True, True], '_hoodie_is_deleted': [False, False, True]}, index=list('abc')) table = pa.Table.from_pandas(df) print(table) pq.write_table(table, 'example_upsert2.parquet')

Stacktrace Stack trace from the Kubernetes dashboard is attached. K8_hudi_error S3_hudi_rollback

Add the stacktrace of the error.

nsivabalan commented 3 years ago

have you configured your record keys and partition path correctly ? I see from your configs, you have given "two" for all 3 configs(record keys, partition path and preCombine). Ideally all these should be referring to 3 different columns in incoming dataframe.

nsivabalan commented 3 years ago

Also, have you tried doing an update ("upsert") operation? Once that succeeds, then we know delete has some issue. but if update is failing, then could be some config issue or we might have to investigate further.

mithalee commented 3 years ago

I changed the input data set a bit so that I can provide 3 different fields for the 3 differnt mentioned configs. ./spark-submit --master k8s://https://..sk1.us-west-1.eks.amazonaws.com --deploy-mode cluster --name spark-hudi --jars https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.12/3.1.1/spark-avro_2.12-3.1.1.jar --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --conf spark.executor.instances=1 --conf spark.kubernetes.container.image=../spark:spark-hudi-0.2 --conf spark.kubernetes.namespace=spark-k8 --conf spark.kubernetes.container.image.pullSecrets=dockercloud-secret --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem --conf spark.hadoop.fs.s3a.endpoint=s3.us-west-1.amazonaws.com --conf spark.hadoop.fs.s3a.access.key='A...R' --conf spark.hadoop.fs.s3a.secret.key='L..gG' --conf spark.serializer=org.apache.spark.serializer.KryoSerializer s3a://lightbox-sandbox-dev/hudi-root/spark-submit-jars/hudi-utilities-bundle_2.12-0.8.0.jar --table-type COPY_ON_WRITE --source-ordering-field ts --source-class org.apache.hudi.utilities.sources.ParquetDFSSource --target-base-path s3a://lightbox-sandbox-dev/hudi-root/transformed-tables/hudi_writer_mm4/ --target-table test_table --base-file-format PARQUET --hoodie-conf hoodie.datasource.write.recordkey.field=uuid --hoodie-conf hoodie.datasource.write.partitionpath.field=two --hoodie-conf hoodie.datasource.write.precombine.field=ts --hoodie-conf hoodie.deltastreamer.source.dfs.root=s3a://../jen/example_upsert2.parquet

My input parquet file has below columns: import pyarrow.parquet as pq

import numpy as np import pandas as pd import pyarrow as pa import uuid

df = pd.DataFrame({'uuid': [str(uuid.uuid4()), str(uuid.uuid4()),str(uuid.uuid4())], 'ts': [1, 2, 3], 'two': [100, 101, 103], 'three': [True, True, True], '_hoodie_is_deleted': [False, False, False]}, index=list('abc')) table = pa.Table.from_pandas(df) print(table) pq.write_table(table, 'example_5.parquet')

The above initial insert into Hudi table works successfully. I then tried to perform an update this time as you mentioned: import pyarrow.parquet as pq

import numpy as np import pandas as pd import pyarrow as pa import uuid

df = pd.DataFrame({'uuid': [str(uuid.uuid4()), str(uuid.uuid4()),str(uuid.uuid4())], 'ts': [1, 2, 3], 'two': [100, 101, 103], 'three': [True, True, False], '_hoodie_is_deleted': [False, False, False]}, index=list('abc')) table = pa.Table.from_pandas(df) print(table) pq.write_table(table, 'example_upsert5.parquet')

SPARK SUBMIT FOR UPSERT: ./spark-submit --master k8s://https://..sk1.us-west-1.eks.amazonaws.com --deploy-mode cluster --name spark-hudi --jars https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.12/3.1.1/spark-avro_2.12-3.1.1.jar --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --conf spark.executor.instances=1 --conf spark.kubernetes.container.image=../spark:spark-hudi-0.2 --conf spark.kubernetes.namespace=spark-k8 --conf spark.kubernetes.container.image.pullSecrets=dockercloud-secret --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem --conf spark.hadoop.fs.s3a.endpoint=s3.us-west-1.amazonaws.com --conf spark.hadoop.fs.s3a.access.key='A...R' --conf spark.hadoop.fs.s3a.secret.key='L..gG' --conf spark.serializer=org.apache.spark.serializer.KryoSerializer s3a://lightbox-sandbox-dev/hudi-root/spark-submit-jars/hudi-utilities-bundle_2.12-0.8.0.jar --table-type COPY_ON_WRITE --source-ordering-field ts --source-class org.apache.hudi.utilities.sources.ParquetDFSSource --target-base-path s3a://lightbox-sandbox-dev/hudi-root/transformed-tables/hudi_writer_mm4/ --target-table test_table --base-file-format PARQUET --hoodie-conf hoodie.datasource.write.recordkey.field=uuid --hoodie-conf hoodie.datasource.write.partitionpath.field=two --hoodie-conf hoodie.datasource.write.precombine.field=ts --hoodie-conf hoodie.deltastreamer.source.dfs.root=s3a://../jen/example_upsert5.parquet This one fails with error. The stack trace from Kubernetes dashboard is attached. logs-from-spark-kubernetes-driver-in-spark-hudi-7c40487ae63865bc-driver.txt K8_hudi_update_error

mithalee commented 3 years ago

Does the source ordering config and precombine field has to be a timestamp field? image

danny0405 commented 3 years ago

Does the source ordering config and precombine field has to be a timestamp field? image

Not necessary, a Comparable is enough.

codope commented 3 years ago

@mithalee Can you try with the latest master branch? I built the master code and tried to reproduce the scenario in a local docker environment. It runs fine. For example, after first ingest, you can see _hoodie_is_deleted is false for both timestamp and after second ingest (in which I set _hoodie_is_deleted to true for a timestamp), it is present only for one timestamp.

// after first ingest
scala> spark.sql("select symbol, ts, _hoodie_is_deleted from stock_ticks_cow WHERE symbol = 'MSFT'").show(100, false)
+------+-------------------+------------------+
|symbol|ts                 |_hoodie_is_deleted|
+------+-------------------+------------------+
|MSFT  |2018-08-31 09:59:00|false             |
|MSFT  |2018-08-31 10:29:00|false             |
+------+-------------------+------------------+

// after second ingest
scala> spark.sql("select symbol, ts, _hoodie_is_deleted from stock_ticks_cow WHERE symbol = 'MSFT'").show(100, false)
+------+-------------------+------------------+
|symbol|ts                 |_hoodie_is_deleted|
+------+-------------------+------------------+
|MSFT  |2018-08-31 09:59:00|false             |
+------+-------------------+------------------+

My schema is similar to this except that I added _hoodie_is_deleted field with default false.

FYI, my spark-submit command is same as mentioned here.

nsivabalan commented 3 years ago

It would be a regression if its failing in 0.8.0 bcoz, don't think we touched this path in 0.8.0. @codope : Can you please test it out w/ 0.8.0.

mithalee commented 3 years ago

@mithalee Can you try with the latest master branch? I built the master code and tried to reproduce the scenario in a local docker environment. It runs fine. For example, after first ingest, you can see _hoodie_is_deleted is false for both timestamp and after second ingest (in which I set _hoodie_is_deleted to true for a timestamp), it is present only for one timestamp.

// after first ingest
scala> spark.sql("select symbol, ts, _hoodie_is_deleted from stock_ticks_cow WHERE symbol = 'MSFT'").show(100, false)
+------+-------------------+------------------+
|symbol|ts                 |_hoodie_is_deleted|
+------+-------------------+------------------+
|MSFT  |2018-08-31 09:59:00|false             |
|MSFT  |2018-08-31 10:29:00|false             |
+------+-------------------+------------------+

// after second ingest
scala> spark.sql("select symbol, ts, _hoodie_is_deleted from stock_ticks_cow WHERE symbol = 'MSFT'").show(100, false)
+------+-------------------+------------------+
|symbol|ts                 |_hoodie_is_deleted|
+------+-------------------+------------------+
|MSFT  |2018-08-31 09:59:00|false             |
+------+-------------------+------------------+

My schema is similar to this except that I added _hoodie_is_deleted field with default false.

FYI, my spark-submit command is same as mentioned here.

Sure.I will try and get back to you.

mithalee commented 3 years ago

@codope Hi, I did try the HoodieDeltaStreamer on below version of EMR: Release label:emr-6.3.0 Hadoop distribution:Amazon 3.2.1 Applications:Tez 0.9.2, Spark 3.1.1, Hive 3.1.2, JupyterHub 1.2.0, Sqoop 1.4.7, Zeppelin 0.9.0, Hue 4.9.0, Presto 0.245.1 HUDI Utility: https://mvnrepository.com/artifact/org.apache.hudi/hudi-utilities-bundle_2.12/0.8.0 The HoodieDeltaStreamer works as expected(upserts/deletes). But the same data set and same spark submit configuration does not work on the Spark on K8 binaries. https://spark.apache.org/docs/3.1.1/submitting-applications.html I can perform the initial insert into Hudi table through the below spark submit but the upserts/deletes are throwing error. Can you confirm if the upserts /deletes works using the DS in Spark on K8s.

SPARK SUBMIT spark-submit --master k8s://https://...sk1.us-west-1.eks.amazonaws.com --deploy-mode cluster --name spark-hudi --jars https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.12/3.1.1/spark-avro_2.12-3.1.1.jar --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --conf spark.executor.instances=1 --conf spark.kubernetes.container.image=d.../spark:spark-hudi-0.2 --conf spark.kubernetes.namespace=spark-k8 --conf spark.kubernetes.container.image.pullSecrets=dockercloud-secret --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem --conf spark.hadoop.fs.s3a.endpoint=s3.us-west-1.amazonaws.com --conf spark.hadoop.fs.s3a.access.key='A........' --conf spark.hadoop.fs.s3a.secret.key='L.........' --conf spark.serializer=org.apache.spark.serializer.KryoSerializer s3a://l...v/hudi-root/spark-submit-jars/hudi-utilities-bundle_2.12-0.8.0.jar --table-type COPY_ON_WRITE --source-ordering-field one --source-class org.apache.hudi.utilities.sources.ParquetDFSSource --target-base-path s3a://......./hudi-root/transformed-tables/hudi_writer_mm7/ --target-table test_table --base-file-format PARQUET --hoodie-conf hoodie.datasource.write.recordkey.field=one --hoodie-conf hoodie.datasource.write.partitionpath.field=two --hoodie-conf hoodie.deltastreamer.source.dfs.root=s3a://....../jen/example_4.parquet

Parquet file example_4.parquet : import pyarrow.parquet as pq

import numpy as np import pandas as pd import pyarrow as pa import uuid

df = pd.DataFrame({'one': [-1, 3, 2.5], 'two': [100, 200, 300], 'three': [True, True, True], '_hoodie_is_deleted': [False, False, False]}, index=list('abc')) table = pa.Table.from_pandas(df) print(table) pq.write_table(table, 'example_4.parquet')

Delete file: import pyarrow.parquet as pq

import numpy as np import pandas as pd import pyarrow as pa import uuid df = pd.DataFrame({'one': [-1, 3, 2.5], 'two': [100, 200, 300], 'three': [True, True, True], '_hoodie_is_deleted': [True, False, False]}, index=list('abc')) table = pa.Table.from_pandas(df) print(table) pq.write_table(table, 'example_delete4.parquet')

mithalee commented 3 years ago

@nsivabalan @codope I came across this issue: https://issues.apache.org/jira/browse/HADOOP-17338 This may be the root cause of the error I am running into. I am running into this issue with Spark on kubernetes 3.1.1,Hadoop 3.2. Looks like they fixed the issue in Hadoop 3.3.1. But Hadoop 3.3.1 is not available in Spark on Kubernetes. So I am not able to confirm.

nsivabalan commented 3 years ago

hmmm, could be. I vaguely remember very few people in the community trying to use hudi in k8s. haven't heard any issues from them or I am not sure if there weren't any issues and hence no users complained. sorry couldn't be of much help here.

mithalee commented 3 years ago

@nsivabalan @codope Hi, I found the issue. Spark on K8 3.1.1 has some bugs and throws the "Premature Content" error. I tried the Spark on K8 3.1.2 which is the latest release and the Hudi DS upserts work perfectly fine. We can close this ticket. Thank you for your help. https://spark.apache.org/docs/latest/running-on-kubernetes.html