apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.32k stars 2.42k forks source link

[SUPPORT] Duplicate records in COW table within same partition path #4318

Closed stym06 closed 2 years ago

stym06 commented 2 years ago

Describe the problem you faced We have written IOT data from Kafka to Azure blob using Deltastreamer utility in continuous mode and are querying the table through Presto. We are seeing duplicate records with the same _hoodie_record_key but different commit file number and different parquet file in the same partition path.

 _hoodie_commit_time |   _hoodie_commit_seqno   |                           _hoodie_record_key                            | _hoodie_partition_path |                             _hoodie_file_name                             | master_timestamp |   timest
---------------------+--------------------------+-------------------------------------------------------------------------+------------------------+---------------------------------------------------------------------------+------------------+---------
 20211206002458      | 20211206002458_1_4116796 | vehicle_identification_number:P53ACDCB2AKA00081,timestamp:1638708846929 | dt=2021-12-05          | 5885895a-78d1-468b-9e7b-045d77644d1c-0_1-1706-2959_20211206002458.parquet |    1638708851906 | 16387088
 20211206120116      | 20211206120116_1_1745292 | vehicle_identification_number:P53ACDCB2AKA00081,timestamp:1638708846929 | dt=2021-12-05          | df619ce7-cd21-41fc-9e6b-68386748bde4-0_1-470-1174_20211206120116.parquet  |    1638708851906 | 1638708

Deltastreamer configs:

#base properties
hoodie.upsert.shuffle.parallelism=500
hoodie.insert.shuffle.parallelism=500
hoodie.delete.shuffle.parallelism=50
hoodie.bulkinsert.shuffle.parallelism=10
hoodie.embed.timeline.server=true
hoodie.filesystem.view.type=EMBEDDED_KV_STORE
hoodie.compact.inline=false

#cleaning
hoodie.cleaner.policy=KEEP_LATEST_COMMITS
hoodie.cleaner.commits.retained=1
hoodie.clean.async=true

#archival
hoodie.keep.min.commits=2
hoodie.keep.max.commits=10

#datasource properties
hoodie.deltastreamer.schemaprovider.registry.url=http://10.xx.yy.zz:8081/subjects/dp.hmi.quectel.event.lpe.packet.v2/versions/latest
hoodie.datasource.write.recordkey.field=vehicle_identification_number,timestamp
hoodie.deltastreamer.source.kafka.topic=dp.hmi.quectel.event.lpe.packet.v2
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.CustomKeyGenerator
hoodie.datasource.write.partitionpath.field=timestamp:TIMESTAMP

hoodie.deltastreamer.keygen.timebased.timestamp.type=EPOCHMILLISECONDS
hoodie.deltastreamer.keygen.timebased.input.timezone=UTC
hoodie.deltastreamer.keygen.timebased.output.timezone=UTC
hoodie.deltastreamer.keygen.timebased.output.dateformat='dt='yyyy-MM-dd

#kafka props
bootstrap.servers=localhost:9092
auto.offset.reset=earliest
schema.registry.url=http://10.xx.yy.zz:8081

#prometheus
hoodie.metrics.pushgateway.host=k8s-prometheus-pushgateway.observability.svc.cluster.local
hoodie.metrics.pushgateway.port=9091
hoodie.metrics.on=true
hoodie.deltastreamer.kafka.source.maxEvents=10000000
hoodie.metrics.reporter.type=PROMETHEUS_PUSHGATEWAY

Environment Description

nsivabalan commented 2 years ago

@stym06 : Did you try querying via spark datasource? Do you see the same. would help if we can rule out if there is any issue w/ underlying storage or some query eng specifics in play. Also, can you post the contents of .hoodie. Wanna check if the two commits times are two writes or one is from compaction. @bvaradar @bhasudha @satishkotha : have you folks encountered duplicates records anytime. Any idea on why this could happen.

nsivabalan commented 2 years ago

also, can you give us spark-submit command (mask any info as per necessity) you used while triggering the detlastreamer. I assume there is no clustering. can you confirm please.

stym06 commented 2 years ago

@nsivabalan I tried querying through both Presto and Hive and got duplicate records. Yet to query through Spark datasource. Will post the .hoodie folder in some time. Posting the spark-submit below:

#
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# 

apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
  name: hudi-lpe-ds-{{ ti.job_id }} 
  namespace: dataplatform
  annotations:
    spark.platform/type: streaming
  labels:
    spark_name: hudi-lpe-ds-{{ ti.job_id }}
    dag_name: hudi-lpe
    task_name: ds
    environment: "prod"
    cloud: "azure"
    tier: "t1"
    team: "dataplatform"
    service_type: "airflow"
    k8s_cluster_name: "kai"

spec:
  type: Java
  mode: cluster
  image: "hudi-ds-azure-0.2"
  imagePullPolicy: Always
  mainClass: org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer
  mainApplicationFile: "local:///opt/spark/hudi/hudi-utilities-bundle_2.11-0.9.0-SNAPSHOT.jar"
  sparkConf:
    "spark.serializer": "org.apache.spark.serializer.KryoSerializer"
  arguments:
    - "--table-type"
    - "COPY_ON_WRITE"
    - "--props"
    - "/opt/spark/hudi/config/source.properties"
    - "--schemaprovider-class"
    - "org.apache.hudi.utilities.schema.SchemaRegistryProvider"
    - "--source-class"
    - "org.apache.hudi.utilities.sources.JsonKafkaSource"
    - "--target-base-path"
    - "wasb://container-v1@account.blob.core.windows.net/data/pipelines/hudi/kafka/telemetrics_v2/dp.hmi.quectel.event.lpe.packet.v2"
    - "--target-table"
    - "dp_hmi_quectel_event_lpe_packet_v2"
    - "--op"
    - "INSERT"
    - "--source-ordering-field"
    - "timestamp"
    - "--continuous"
    - "--min-sync-interval-seconds"
    - "60"
  sparkVersion: "2.4.4"
  restartPolicy:
    type: Always
    onFailureRetries: 100000
    onFailureRetryInterval: 60
    onSubmissionFailureRetries: 100000
    onSubmissionFailureRetryInterval: 60
  timeToLiveSeconds: 3600
  volumes:
    - name: hudi-lpe-ds
      configMap:
        name: hudi-lpe-ds
  driver:
    env:
      - name: HOODIE_ENV_fs_DOT_azure_DOT_wasb_DOT_account_DOT_name
        value: {{ var.value.HOODIE_ENV_fs_DOT_azure_DOT_wasb_DOT_account_DOT_name }}
      - name: HOODIE_ENV_fs_DOT_azure_DOT_account_DOT_key_DOT_{{ var.value.DP_DPV3_BLOB_STORAGE }}_DOT_blob_DOT_core_DOT_windows_DOT_net
        value: {{ var.value.HOODIE_ENV_fs_DOT_azure_DOT_account_DOT_key_DOT_account_DOT_blob_DOT_core_DOT_windows_DOT_net }}
    cores: 1
    coreLimit: "1200m"
    memory: "4G"
    serviceAccount: "dataplatform"
    volumeMounts:
      - name: hudi-lpe-ds
        mountPath: /opt/spark/hudi/config
        subpath: config.yaml
    memoryOverhead: "1024"
    javaOptions: "-Dnetworkaddress.cache.ttl=60 -Duser.timezone=IST -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -XX:+PrintGCTimeStamps -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/varadarb_ds_driver.hprof"
    # affinity:
    #  nodeAffinity:
    #    requiredDuringSchedulingIgnoredDuringExecution:
    #      nodeSelectorTerms:
    #      - matchExpressions:
    #        - key: service
    #          operator: In
    #          values:
    #          - airflow-spark
    #        - key: "node-lifecycle"
    #          operator: In
    #          values:
    #          - "ondemand"
  executor:
    env:
      - name: HOODIE_ENV_fs_DOT_azure_DOT_wasb_DOT_account_DOT_name
        value: {{ var.value.HOODIE_ENV_fs_DOT_azure_DOT_wasb_DOT_account_DOT_name }}
      - name: HOODIE_ENV_fs_DOT_azure_DOT_account_DOT_key_DOT_{{ var.value.DP_DPV3_BLOB_STORAGE }}_DOT_blob_DOT_core_DOT_windows_DOT_net
        value: {{ var.value.HOODIE_ENV_fs_DOT_azure_DOT_account_DOT_key_DOT_account_DOT_blob_DOT_core_DOT_windows_DOT_net }}
    cores: 1
    instances: 3
    memory: "6G"
    volumeMounts:
      - name: hudi-lpe-ds
        mountPath: /opt/spark/hudi/config
        subpath: config.yaml
    memoryOverhead: "3072"
    javaOptions: "-Dnetworkaddress.cache.ttl=60 -Duser.timezone=IST -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -XX:+PrintGCTimeStamps -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/varadarb_ds_driver.hprof"
    # affinity:
    #  nodeAffinity:
    #    requiredDuringSchedulingIgnoredDuringExecution:
    #      nodeSelectorTerms:
    #      - matchExpressions:
    #        - key: service
    #          operator: In
    #          values:
    #          - airflow-spark
    #        - key: "node-lifecycle"
    #          operator: In
    #          values:
    #          - "ondemand"
  sparkUIOptions:
    ingressAnnotations:
      kubernetes.io/ingress.class: nginx
  # Prometheus Monitoring
  # Comment out if not supported
  # monitoring:
  #   exposeDriverMetrics: true
  #   exposeExecutorMetrics: true
  #   prometheus:
  #     jmxExporterJar: "/prometheus/jmx_prometheus_javaagent-0.11.0.jar"
  #     port: 8090
nsivabalan commented 2 years ago

thanks. can you post the contents of /opt/spark/hudi/config/source.properties as well.

stym06 commented 2 years ago

@nsivabalan I have pasted the contents of the source.properties file above. btw, any idea how I can start spark-shell with the azure related dependencies? I'm getting the below error:

spark-shell \
  --jars hadoop-azure-3.2.0.jar, azure-storage-7.0.0.jar \
  --packages org.apache.hudi:hudi-spark3-bundle_2.12:0.10.0,org.apache.spark:spark-avro_2.12:3.1.2 \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

org.apache.hudi.exception.HoodieIOException: Failed to get instance of org.apache.hadoop.fs.FileSystem
  at org.apache.hudi.common.fs.FSUtils.getFs(FSUtils.java:104)
  at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:87)
  at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:69)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:354)
  at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:326)
  at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:308)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:308)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:240)
  ... 59 elided
Caused by: java.io.IOException: No FileSystem for scheme: wasb
  at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2660)
  at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
  at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
  at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
  at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
  at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
  at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
  at org.apache.hudi.common.fs.FSUtils.getFs(FSUtils.java:102)
nsivabalan commented 2 years ago

I see you are using "INSERT" as the operation type. If your incoming records have duplicates, those can reflect as duplicates in hudi as well. only with "UPSERT" we de-dup explicitly. for "INSERT", you need to set this config https://hudi.apache.org/docs/configurations/#hoodiecombinebeforeinsert to true if you want hudi to dedup before ingesting to hudi.

Can you try setting this and let us know how it goes.

nsivabalan commented 2 years ago

Or if you wish to not have duplicates at all, can you use "UPSERT" operation.

nsivabalan commented 2 years ago

@kywe665 Do you know anyone from the community who have used azure blob storage with hudi. this user is having difficulty using spark-shell with azure. I don't have any exp.

stym06 commented 2 years ago

Does anyone know how to get spark-shell or hudi-cli working with s3 or azure ?

nsivabalan commented 2 years ago

for S3, I came across this https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-hudi-cli.html

stym06 commented 2 years ago

This is set up on EMR. Is there a doc to set it up in local env?

nsivabalan commented 2 years ago

you mean, to access S3 dataset from your local laptop ? or to access a local dataset in your laptop ?

stym06 commented 2 years ago

yes, to access s3 data from the local

nsivabalan commented 2 years ago

@stym06 : Can you give this a try https://github.com/apache/hudi/pull/3222. would help to certify the patch too.

nsivabalan commented 2 years ago

Have updated instructions to access S3 via hudi-cli here.

nsivabalan commented 2 years ago

wrt duplicates, in general, a pair of partition path and record key is unique in hudi. If not, you need to use global index or non partitioned dataset if you wish to have unique record keys globally. And in addition, preCombine configs has to be set appropriately.

Feel free to close out the issue it was a mis-configuration on your end. If not, we can keep the issue open and discuss further.

stym06 commented 2 years ago

@nsivabalan #3222 worked for me. Thanks for the help. We can close it out as the operation mode was INSERT and there were duplicate records coming in the Kafka topic as well, leading to this.