apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.31k stars 2.41k forks source link

[SUPPORT] Use structedstreaming to consume kafka to write to hudi error #3280

Closed liujinhui1994 closed 3 years ago

liujinhui1994 commented 3 years ago

Tips before filing an issue

Describe the problem you faced

A clear and concise description of the problem.

To Reproduce

Steps to reproduce the behavior:

  1. structedstreaming to consume kafka

Expected behavior

A clear and concise description of what you expected to happen.

Environment Description

Additional context

Use structedstreaming to consume kafka to write to hudi. When using other versions of hudi, such as 0.7, 0.8, the program can run continuously and stably.

  1. When using the latest master version, I found that writing hudi to an empty folder, the program can run for one batch, but the second batch will fail.
  2. Next, start the yarn task and try again
  3. The program can continue to consume one batch, but the second batch will still fail.
  4. I have written more than 3000 tables in production using the lower version of hudi (0.8 etc.), which proves that the program is no problem. Hudi may have updated some of the causes recently.

I tried to locate the problem, but did not find the problem

Stacktrace

Add the stacktrace of the error.

2021-07-15 18:03:30,437 | INFO  | [block-manager-slave-async-thread-pool-39] | Removing RDD 31 | org.apache.spark.storage.BlockManager.logInfo(Logging.scala:54)
2021-07-15 18:03:30,445 | INFO  | [stream execution thread for [id = 6c7bb949-bcb6-494a-a576-e64b05643c31, runId = d6c5cb5d-7e6c-45b7-935e-0308327a0bab]] | Removing RDD 21 from persistence list | org.apache.spark.rdd.MapPartitionsRDD.logInfo(Logging.scala:54)
2021-07-15 18:03:30,446 | INFO  | [block-manager-slave-async-thread-pool-42] | Removing RDD 21 | org.apache.spark.storage.BlockManager.logInfo(Logging.scala:54)
2021-07-15 18:03:30,460 | INFO  | [stream execution thread for [id = 6c7bb949-bcb6-494a-a576-e64b05643c31, runId = d6c5cb5d-7e6c-45b7-935e-0308327a0bab]] | io.bytes.per.checksum is deprecated. Instead, use dfs.bytes-per-checksum | org.apache.hadoop.conf.Configuration.deprecation.logDeprecation(Configuration.java:1409)
2021-07-15 18:03:30,482 | WARN  | [MetricsSender] | Unable to load JDK7 types (java.nio.file.Path): no Java7 type support added | mrs.shaded.provider.com.fasterxml.jackson.databind.ext.Java7Handlers.<clinit>(Java7Handlers.java:29)
2021-07-15 18:03:30,497 | INFO  | [stream execution thread for [id = 6c7bb949-bcb6-494a-a576-e64b05643c31, runId = d6c5cb5d-7e6c-45b7-935e-0308327a0bab]] | Writing atomically to obs://t3-data-lake-std-prod/datalake/checkpoint/dls/kafka/t3_ts_android_device/day/commits/2 using temp file obs://t3-data-lake-std-prod/datalake/checkpoint/dls/kafka/t3_ts_android_device/day/commits/.2.5817bc89-610b-43d0-be15-c16daffbda17.tmp | org.apache.spark.sql.execution.streaming.CheckpointFileManager.logInfo(Logging.scala:54)
2021-07-15 18:03:30,513 | INFO  | [block-manager-slave-async-thread-pool-48] | Removing RDD 31 | org.apache.spark.storage.BlockManager.logInfo(Logging.scala:54)
2021-07-15 18:03:30,602 | INFO  | [stream execution thread for [id = 6c7bb949-bcb6-494a-a576-e64b05643c31, runId = d6c5cb5d-7e6c-45b7-935e-0308327a0bab]] | Renamed temp file obs://t3-data-lake-std-prod/datalake/checkpoint/dls/kafka/t3_ts_android_device/day/commits/.2.5817bc89-610b-43d0-be15-c16daffbda17.tmp to obs://t3-data-lake-std-prod/datalake/checkpoint/dls/kafka/t3_ts_android_device/day/commits/2 | org.apache.spark.sql.execution.streaming.CheckpointFileManager.logInfo(Logging.scala:54)
2021-07-15 18:03:30,626 | INFO  | [stream execution thread for [id = 6c7bb949-bcb6-494a-a576-e64b05643c31, runId = d6c5cb5d-7e6c-45b7-935e-0308327a0bab]] | Streaming query made progress: {
  "id" : "6c7bb949-bcb6-494a-a576-e64b05643c31",
  "runId" : "d6c5cb5d-7e6c-45b7-935e-0308327a0bab",
  "name" : null,
  "timestamp" : "2021-07-15T10:03:00.705Z",
  "batchId" : 2,
  "numInputRows" : 58847,
  "processedRowsPerSecond" : 1968.2587464044418,
  "durationMs" : {
    "addBatch" : 28029,
    "getBatch" : 7,
    "getEndOffset" : 0,
    "queryPlanning" : 302,
    "setOffsetRange" : 646,
    "triggerExecution" : 29898,
    "walCommit" : 511
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[t3_ts_android_device]]",
    "startOffset" : {
      "t3_ts_android_device" : {
        "2" : 524155,
        "5" : 524155,
        "4" : 524156,
        "7" : 524155,
        "1" : 524155,
        "3" : 524154,
        "6" : 524155,
        "0" : 524154
      }
    },
    "endOffset" : {
      "t3_ts_android_device" : {
        "2" : 531507,
        "5" : 531508,
        "4" : 531509,
        "7" : 531507,
        "1" : 531509,
        "3" : 531506,
        "6" : 531509,
        "0" : 531506
      }
    },
    "numInputRows" : 58847,
    "processedRowsPerSecond" : 1968.2587464044418
  } ],
  "sink" : {
    "description" : "ForeachBatchSink"
  }
} | org.apache.spark.sql.execution.streaming.MicroBatchExecution.logInfo(Logging.scala:54)
2021-07-15 18:03:30,627 | WARN  | [stream execution thread for [id = 6c7bb949-bcb6-494a-a576-e64b05643c31, runId = d6c5cb5d-7e6c-45b7-935e-0308327a0bab]] | Current batch is falling behind. The trigger interval is 10000 milliseconds, but spent 29922 milliseconds | org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.logWarning(Logging.scala:66)
2021-07-15 18:03:30,630 | INFO  | [stream execution thread for [id = 6c7bb949-bcb6-494a-a576-e64b05643c31, runId = d6c5cb5d-7e6c-45b7-935e-0308327a0bab]] | [Consumer clientId=consumer-1, groupId=spark-kafka-source-6c362da9-8740-4aab-92fc-5eb97a666e02-679833050-driver-0] Resetting offset for partition t3_ts_android_device-4 to offset 531556. | org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsetIfNeeded(Fetcher.java:583)
2021-07-15 18:03:30,630 | INFO  | [stream execution thread for [id = 6c7bb949-bcb6-494a-a576-e64b05643c31, runId = d6c5cb5d-7e6c-45b7-935e-0308327a0bab]] | [Consumer clientId=consumer-1, groupId=spark-kafka-source-6c362da9-8740-4aab-92fc-5eb97a666e02-679833050-driver-0] Resetting offset for partition t3_ts_android_device-3 to offset 531554. | org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsetIfNeeded(Fetcher.java:583)
2021-07-15 18:03:30,630 | INFO  | [stream execution thread for [id = 6c7bb949-bcb6-494a-a576-e64b05643c31, runId = d6c5cb5d-7e6c-45b7-935e-0308327a0bab]] | [Consumer clientId=consumer-1, groupId=spark-kafka-source-6c362da9-8740-4aab-92fc-5eb97a666e02-679833050-driver-0] Resetting offset for partition t3_ts_android_device-5 to offset 531556. | org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsetIfNeeded(Fetcher.java:583)
2021-07-15 18:03:30,631 | INFO  | [stream execution thread for [id = 6c7bb949-bcb6-494a-a576-e64b05643c31, runId = d6c5cb5d-7e6c-45b7-935e-0308327a0bab]] | [Consumer clientId=consumer-1, groupId=spark-kafka-source-6c362da9-8740-4aab-92fc-5eb97a666e02-679833050-driver-0] Resetting offset for partition t3_ts_android_device-2 to offset 531555. | org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsetIfNeeded(Fetcher.java:583)
2021-07-15 18:03:30,631 | INFO  | [stream execution thread for [id = 6c7bb949-bcb6-494a-a576-e64b05643c31, runId = d6c5cb5d-7e6c-45b7-935e-0308327a0bab]] | [Consumer clientId=consumer-1, groupId=spark-kafka-source-6c362da9-8740-4aab-92fc-5eb97a666e02-679833050-driver-0] Resetting offset for partition t3_ts_android_device-6 to offset 531556. | org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsetIfNeeded(Fetcher.java:583)
2021-07-15 18:03:30,631 | INFO  | [stream execution thread for [id = 6c7bb949-bcb6-494a-a576-e64b05643c31, runId = d6c5cb5d-7e6c-45b7-935e-0308327a0bab]] | [Consumer clientId=consumer-1, groupId=spark-kafka-source-6c362da9-8740-4aab-92fc-5eb97a666e02-679833050-driver-0] Resetting offset for partition t3_ts_android_device-0 to offset 531554. | org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsetIfNeeded(Fetcher.java:583)
2021-07-15 18:03:30,633 | INFO  | [stream execution thread for [id = 6c7bb949-bcb6-494a-a576-e64b05643c31, runId = d6c5cb5d-7e6c-45b7-935e-0308327a0bab]] | [Consumer clientId=consumer-1, groupId=spark-kafka-source-6c362da9-8740-4aab-92fc-5eb97a666e02-679833050-driver-0] Resetting offset for partition t3_ts_android_device-7 to offset 531555. | org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsetIfNeeded(Fetcher.java:583)
2021-07-15 18:03:30,633 | INFO  | [stream execution thread for [id = 6c7bb949-bcb6-494a-a576-e64b05643c31, runId = d6c5cb5d-7e6c-45b7-935e-0308327a0bab]] | [Consumer clientId=consumer-1, groupId=spark-kafka-source-6c362da9-8740-4aab-92fc-5eb97a666e02-679833050-driver-0] Resetting offset for partition t3_ts_android_device-1 to offset 531556. | org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsetIfNeeded(Fetcher.java:583)
2021-07-15 18:03:30,678 | INFO  | [stream execution thread for [id = 6c7bb949-bcb6-494a-a576-e64b05643c31, runId = d6c5cb5d-7e6c-45b7-935e-0308327a0bab]] | Writing atomically to obs://t3-data-lake-std-prod/datalake/checkpoint/dls/kafka/t3_ts_android_device/day/offsets/3 using temp file obs://t3-data-lake-std-prod/datalake/checkpoint/dls/kafka/t3_ts_android_device/day/offsets/.3.9893a7ac-d49f-4cff-a2db-e7f4d1dfd266.tmp | org.apache.spark.sql.execution.streaming.CheckpointFileManager.logInfo(Logging.scala:54)
2021-07-15 18:03:30,824 | INFO  | [stream execution thread for [id = 6c7bb949-bcb6-494a-a576-e64b05643c31, runId = d6c5cb5d-7e6c-45b7-935e-0308327a0bab]] | Renamed temp file obs://t3-data-lake-std-prod/datalake/checkpoint/dls/kafka/t3_ts_android_device/day/offsets/.3.9893a7ac-d49f-4cff-a2db-e7f4d1dfd266.tmp to obs://t3-data-lake-std-prod/datalake/checkpoint/dls/kafka/t3_ts_android_device/day/offsets/3 | org.apache.spark.sql.execution.streaming.CheckpointFileManager.logInfo(Logging.scala:54)
2021-07-15 18:03:30,824 | INFO  | [stream execution thread for [id = 6c7bb949-bcb6-494a-a576-e64b05643c31, runId = d6c5cb5d-7e6c-45b7-935e-0308327a0bab]] | Committed offsets for batch 3. Metadata OffsetSeqMetadata(0,1626343410634,Map(spark.sql.streaming.stateStore.providerClass -> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider, spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion -> 2, spark.sql.streaming.multipleWatermarkPolicy -> min, spark.sql.streaming.aggregation.stateFormatVersion -> 2, spark.sql.shuffle.partitions -> 200)) | org.apache.spark.sql.execution.streaming.MicroBatchExecution.logInfo(Logging.scala:54)
2021-07-15 18:03:30,883 | INFO  | [stream execution thread for [id = 6c7bb949-bcb6-494a-a576-e64b05643c31, runId = d6c5cb5d-7e6c-45b7-935e-0308327a0bab]] | The key carbon.enable.mv with value true added in the session param | org.apache.carbondata.core.util.SessionParams.addProperty(SessionParams.java:101)
2021-07-15 18:03:30,885 | INFO  | [stream execution thread for [id = 6c7bb949-bcb6-494a-a576-e64b05643c31, runId = d6c5cb5d-7e6c-45b7-935e-0308327a0bab]] | Trying to connect to metastore with URI thrift://node-master1AxuE.1aac75b1-3981-40f3-b8f5-e41e74b4b9ba.com:9083 | org.apache.hadoop.hive.metastore.HiveMetaStoreClient.open(HiveMetaStoreClient.java:582)
2021-07-15 18:03:30,886 | INFO  | [stream execution thread for [id = 6c7bb949-bcb6-494a-a576-e64b05643c31, runId = d6c5cb5d-7e6c-45b7-935e-0308327a0bab]] | Opened a connection to metastore, current connections: 1 | org.apache.hadoop.hive.metastore.HiveMetaStoreClient.open(HiveMetaStoreClient.java:671)
2021-07-15 18:03:30,893 | INFO  | [stream execution thread for [id = 6c7bb949-bcb6-494a-a576-e64b05643c31, runId = d6c5cb5d-7e6c-45b7-935e-0308327a0bab]] | Connected to metastore. | org.apache.hadoop.hive.metastore.HiveMetaStoreClient.open(HiveMetaStoreClient.java:765)
2021-07-15 18:03:30,893 | INFO  | [stream execution thread for [id = 6c7bb949-bcb6-494a-a576-e64b05643c31, runId = d6c5cb5d-7e6c-45b7-935e-0308327a0bab]] | RetryingMetaStoreClient proxy=class org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient ugi=skylake (auth:SIMPLE) retries=1 delay=1 lifetime=0 | org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.<init>(RetryingMetaStoreClient.java:99)
2021-07-15 18:03:32,870 | INFO  | [stream execution thread for [id = 6c7bb949-bcb6-494a-a576-e64b05643c31, runId = d6c5cb5d-7e6c-45b7-935e-0308327a0bab]] | The key carbon.enable.mv with value true added in the session param | org.apache.carbondata.core.util.SessionParams.addProperty(SessionParams.java:101)
2021-07-15 18:03:34,651 | INFO  | [stream execution thread for [id = 6c7bb949-bcb6-494a-a576-e64b05643c31, runId = d6c5cb5d-7e6c-45b7-935e-0308327a0bab]] | Partitions added: Map() | org.apache.spark.sql.kafka010.KafkaMicroBatchReader.logInfo(Logging.scala:54)
2021-07-15 18:03:34,676 | INFO  | [stream execution thread for [id = 6c7bb949-bcb6-494a-a576-e64b05643c31, runId = d6c5cb5d-7e6c-45b7-935e-0308327a0bab]] | The key carbon.enable.mv with value true added in the session param | org.apache.carbondata.core.util.SessionParams.addProperty(SessionParams.java:101)
2021-07-15 18:03:36,631 | INFO  | [stream execution thread for [id = 6c7bb949-bcb6-494a-a576-e64b05643c31, runId = d6c5cb5d-7e6c-45b7-935e-0308327a0bab]] | Starting job: getCallSite at StreamExecution.scala:173 | org.apache.spark.SparkContext.logInfo(Logging.scala:54)
2021-07-15 18:03:36,632 | INFO  | [dag-scheduler-event-loop] | Registering RDD 45 (getCallSite at StreamExecution.scala:173) as input to shuffle 3 | org.apache.spark.scheduler.DAGScheduler.logInfo(Logging.scala:54)
2021-07-15 18:03:36,633 | INFO  | [dag-scheduler-event-loop] | Got job 8 (getCallSite at StreamExecution.scala:173) with 1 output partitions | org.apache.spark.scheduler.DAGScheduler.logInfo(Logging.scala:54)
2021-07-15 18:03:36,633 | INFO  | [dag-scheduler-event-loop] | Final stage: ResultStage 13 (getCallSite at StreamExecution.scala:173) | org.apache.spark.scheduler.DAGScheduler.logInfo(Logging.scala:54)
2021-07-15 18:03:36,633 | INFO  | [dag-scheduler-event-loop] | Parents of final stage: List(ShuffleMapStage 12) | org.apache.spark.scheduler.DAGScheduler.logInfo(Logging.scala:54)
2021-07-15 18:03:36,633 | INFO  | [dag-scheduler-event-loop] | Missing parents: List(ShuffleMapStage 12) | org.apache.spark.scheduler.DAGScheduler.logInfo(Logging.scala:54)
2021-07-15 18:03:36,634 | INFO  | [dag-scheduler-event-loop] | Submitting ShuffleMapStage 12 (MapPartitionsRDD[45] at getCallSite at StreamExecution.scala:173), which has no missing parents | org.apache.spark.scheduler.DAGScheduler.logInfo(Logging.scala:54)
2021-07-15 18:03:36,638 | INFO  | [dag-scheduler-event-loop] | Block broadcast_11 stored as values in memory (estimated size 39.5 KB, free 94.5 MB) | org.apache.spark.storage.memory.MemoryStore.logInfo(Logging.scala:54)
2021-07-15 18:03:36,641 | INFO  | [dag-scheduler-event-loop] | Block broadcast_11_piece0 stored as bytes in memory (estimated size 18.7 KB, free 94.4 MB) | org.apache.spark.storage.memory.MemoryStore.logInfo(Logging.scala:54)
2021-07-15 18:03:36,641 | INFO  | [dispatcher-event-loop-1] | Added broadcast_11_piece0 in memory on node-group-1rXLd.1aac75b1-3981-40f3-b8f5-e41e74b4b9ba.com:22685 (size: 18.7 KB, free: 94.5 MB) | org.apache.spark.storage.BlockManagerInfo.logInfo(Logging.scala:54)
2021-07-15 18:03:36,642 | INFO  | [dag-scheduler-event-loop] | Created broadcast 11 from broadcast at DAGScheduler.scala:1232 | org.apache.spark.SparkContext.logInfo(Logging.scala:54)
2021-07-15 18:03:36,642 | INFO  | [dag-scheduler-event-loop] | Submitting 8 missing tasks from ShuffleMapStage 12 (MapPartitionsRDD[45] at getCallSite at StreamExecution.scala:173) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7)) | org.apache.spark.scheduler.DAGScheduler.logInfo(Logging.scala:54)
2021-07-15 18:03:36,642 | INFO  | [dag-scheduler-event-loop] | Adding task set 12.0 with 8 tasks | org.apache.spark.scheduler.cluster.YarnClusterScheduler.logInfo(Logging.scala:54)
2021-07-15 18:03:36,643 | INFO  | [dispatcher-event-loop-0] | Starting task 1.0 in stage 12.0 (TID 39, node-group-1eKyO.1aac75b1-3981-40f3-b8f5-e41e74b4b9ba.com, executor 4, partition 1, PROCESS_LOCAL, 8871 bytes) | org.apache.spark.scheduler.TaskSetManager.logInfo(Logging.scala:54)
2021-07-15 18:03:36,644 | INFO  | [dispatcher-event-loop-0] | Starting task 0.0 in stage 12.0 (TID 40, node-group-1ViRs.1aac75b1-3981-40f3-b8f5-e41e74b4b9ba.com, executor 1, partition 0, PROCESS_LOCAL, 8871 bytes) | org.apache.spark.scheduler.TaskSetManager.logInfo(Logging.scala:54)
2021-07-15 18:03:36,644 | INFO  | [dispatcher-event-loop-0] | Starting task 6.0 in stage 12.0 (TID 41, node-group-198O4.1aac75b1-3981-40f3-b8f5-e41e74b4b9ba.com, executor 2, partition 6, PROCESS_LOCAL, 8871 bytes) | org.apache.spark.scheduler.TaskSetManager.logInfo(Logging.scala:54)
2021-07-15 18:03:36,644 | INFO  | [dispatcher-event-loop-0] | Starting task 2.0 in stage 12.0 (TID 42, node-group-1jPXG.1aac75b1-3981-40f3-b8f5-e41e74b4b9ba.com, executor 3, partition 2, PROCESS_LOCAL, 8871 bytes) | org.apache.spark.scheduler.TaskSetManager.logInfo(Logging.scala:54)
2021-07-15 18:03:36,644 | INFO  | [dispatcher-event-loop-0] | Starting task 3.0 in stage 12.0 (TID 43, node-group-1eKyO.1aac75b1-3981-40f3-b8f5-e41e74b4b9ba.com, executor 4, partition 3, PROCESS_LOCAL, 8871 bytes) | org.apache.spark.scheduler.TaskSetManager.logInfo(Logging.scala:54)
2021-07-15 18:03:36,644 | INFO  | [dispatcher-event-loop-0] | Starting task 4.0 in stage 12.0 (TID 44, node-group-1ViRs.1aac75b1-3981-40f3-b8f5-e41e74b4b9ba.com, executor 1, partition 4, PROCESS_LOCAL, 8871 bytes) | org.apache.spark.scheduler.TaskSetManager.logInfo(Logging.scala:54)
2021-07-15 18:03:36,645 | INFO  | [dispatcher-event-loop-0] | Starting task 7.0 in stage 12.0 (TID 45, node-group-198O4.1aac75b1-3981-40f3-b8f5-e41e74b4b9ba.com, executor 2, partition 7, PROCESS_LOCAL, 8871 bytes) | org.apache.spark.scheduler.TaskSetManager.logInfo(Logging.scala:54)
2021-07-15 18:03:36,645 | INFO  | [dispatcher-event-loop-0] | Starting task 5.0 in stage 12.0 (TID 46, node-group-1jPXG.1aac75b1-3981-40f3-b8f5-e41e74b4b9ba.com, executor 3, partition 5, PROCESS_LOCAL, 8871 bytes) | org.apache.spark.scheduler.TaskSetManager.logInfo(Logging.scala:54)
2021-07-15 18:03:36,653 | INFO  | [dispatcher-event-loop-0] | Added broadcast_11_piece0 in memory on node-group-1eKyO.1aac75b1-3981-40f3-b8f5-e41e74b4b9ba.com:22816 (size: 18.7 KB, free: 2004.6 MB) | org.apache.spark.storage.BlockManagerInfo.logInfo(Logging.scala:54)
2021-07-15 18:03:36,654 | INFO  | [dispatcher-event-loop-1] | Added broadcast_11_piece0 in memory on node-group-198O4.1aac75b1-3981-40f3-b8f5-e41e74b4b9ba.com:22613 (size: 18.7 KB, free: 2004.6 MB) | org.apache.spark.storage.BlockManagerInfo.logInfo(Logging.scala:54)
2021-07-15 18:03:36,655 | INFO  | [dispatcher-event-loop-0] | Added broadcast_11_piece0 in memory on node-group-1jPXG.1aac75b1-3981-40f3-b8f5-e41e74b4b9ba.com:22740 (size: 18.7 KB, free: 2004.6 MB) | org.apache.spark.storage.BlockManagerInfo.logInfo(Logging.scala:54)
2021-07-15 18:03:36,669 | INFO  | [task-result-getter-2] | Finished task 6.0 in stage 12.0 (TID 41) in 25 ms on node-group-198O4.1aac75b1-3981-40f3-b8f5-e41e74b4b9ba.com (executor 2) (1/8) | org.apache.spark.scheduler.TaskSetManager.logInfo(Logging.scala:54)
2021-07-15 18:03:36,669 | INFO  | [task-result-getter-0] | Finished task 3.0 in stage 12.0 (TID 43) in 25 ms on node-group-1eKyO.1aac75b1-3981-40f3-b8f5-e41e74b4b9ba.com (executor 4) (2/8) | org.apache.spark.scheduler.TaskSetManager.logInfo(Logging.scala:54)
2021-07-15 18:03:36,670 | INFO  | [task-result-getter-1] | Finished task 5.0 in stage 12.0 (TID 46) in 25 ms on node-group-1jPXG.1aac75b1-3981-40f3-b8f5-e41e74b4b9ba.com (executor 3) (3/8) | org.apache.spark.scheduler.TaskSetManager.logInfo(Logging.scala:54)
2021-07-15 18:03:36,670 | INFO  | [task-result-getter-3] | Finished task 7.0 in stage 12.0 (TID 45) in 25 ms on node-group-198O4.1aac75b1-3981-40f3-b8f5-e41e74b4b9ba.com (executor 2) (4/8) | org.apache.spark.scheduler.TaskSetManager.logInfo(Logging.scala:54)
2021-07-15 18:03:36,670 | INFO  | [task-result-getter-2] | Finished task 1.0 in stage 12.0 (TID 39) in 27 ms on node-group-1eKyO.1aac75b1-3981-40f3-b8f5-e41e74b4b9ba.com (executor 4) (5/8) | org.apache.spark.scheduler.TaskSetManager.logInfo(Logging.scala:54)
2021-07-15 18:03:36,670 | INFO  | [task-result-getter-0] | Finished task 2.0 in stage 12.0 (TID 42) in 26 ms on node-group-1jPXG.1aac75b1-3981-40f3-b8f5-e41e74b4b9ba.com (executor 3) (6/8) | org.apache.spark.scheduler.TaskSetManager.logInfo(Logging.scala:54)
2021-07-15 18:03:36,678 | INFO  | [dispatcher-event-loop-1] | Added broadcast_11_piece0 in memory on node-group-1ViRs.1aac75b1-3981-40f3-b8f5-e41e74b4b9ba.com:22798 (size: 18.7 KB, free: 2004.6 MB) | org.apache.spark.storage.BlockManagerInfo.logInfo(Logging.scala:54)
2021-07-15 18:03:36,694 | INFO  | [task-result-getter-1] | Finished task 0.0 in stage 12.0 (TID 40) in 50 ms on node-group-1ViRs.1aac75b1-3981-40f3-b8f5-e41e74b4b9ba.com (executor 1) (7/8) | org.apache.spark.scheduler.TaskSetManager.logInfo(Logging.scala:54)
2021-07-15 18:03:36,694 | INFO  | [task-result-getter-3] | Finished task 4.0 in stage 12.0 (TID 44) in 50 ms on node-group-1ViRs.1aac75b1-3981-40f3-b8f5-e41e74b4b9ba.com (executor 1) (8/8) | org.apache.spark.scheduler.TaskSetManager.logInfo(Logging.scala:54)
2021-07-15 18:03:36,694 | INFO  | [task-result-getter-3] | Removed TaskSet 12.0, whose tasks have all completed, from pool  | org.apache.spark.scheduler.cluster.YarnClusterScheduler.logInfo(Logging.scala:54)
2021-07-15 18:03:36,695 | INFO  | [dag-scheduler-event-loop] | ShuffleMapStage 12 (getCallSite at StreamExecution.scala:173) finished in 0.059 s | org.apache.spark.scheduler.DAGScheduler.logInfo(Logging.scala:54)
2021-07-15 18:03:36,695 | INFO  | [dag-scheduler-event-loop] | looking for newly runnable stages | org.apache.spark.scheduler.DAGScheduler.logInfo(Logging.scala:54)
2021-07-15 18:03:36,695 | INFO  | [dag-scheduler-event-loop] | running: Set() | org.apache.spark.scheduler.DAGScheduler.logInfo(Logging.scala:54)
2021-07-15 18:03:36,695 | INFO  | [dag-scheduler-event-loop] | waiting: Set(ResultStage 13) | org.apache.spark.scheduler.DAGScheduler.logInfo(Logging.scala:54)
2021-07-15 18:03:36,695 | INFO  | [dag-scheduler-event-loop] | failed: Set() | org.apache.spark.scheduler.DAGScheduler.logInfo(Logging.scala:54)
2021-07-15 18:03:36,695 | INFO  | [dag-scheduler-event-loop] | Submitting ResultStage 13 (MapPartitionsRDD[48] at getCallSite at StreamExecution.scala:173), which has no missing parents | org.apache.spark.scheduler.DAGScheduler.logInfo(Logging.scala:54)
2021-07-15 18:03:36,696 | INFO  | [dag-scheduler-event-loop] | Block broadcast_12 stored as values in memory (estimated size 10.5 KB, free 94.4 MB) | org.apache.spark.storage.memory.MemoryStore.logInfo(Logging.scala:54)
2021-07-15 18:03:36,706 | INFO  | [dag-scheduler-event-loop] | Block broadcast_12_piece0 stored as bytes in memory (estimated size 4.9 KB, free 94.4 MB) | org.apache.spark.storage.memory.MemoryStore.logInfo(Logging.scala:54)
2021-07-15 18:03:36,707 | INFO  | [dispatcher-event-loop-1] | Added broadcast_12_piece0 in memory on node-group-1rXLd.1aac75b1-3981-40f3-b8f5-e41e74b4b9ba.com:22685 (size: 4.9 KB, free: 94.5 MB) | org.apache.spark.storage.BlockManagerInfo.logInfo(Logging.scala:54)
2021-07-15 18:03:36,707 | INFO  | [dag-scheduler-event-loop] | Created broadcast 12 from broadcast at DAGScheduler.scala:1232 | org.apache.spark.SparkContext.logInfo(Logging.scala:54)
2021-07-15 18:03:36,708 | INFO  | [dag-scheduler-event-loop] | Submitting 1 missing tasks from ResultStage 13 (MapPartitionsRDD[48] at getCallSite at StreamExecution.scala:173) (first 15 tasks are for partitions Vector(0)) | org.apache.spark.scheduler.DAGScheduler.logInfo(Logging.scala:54)
2021-07-15 18:03:36,708 | INFO  | [dag-scheduler-event-loop] | Adding task set 13.0 with 1 tasks | org.apache.spark.scheduler.cluster.YarnClusterScheduler.logInfo(Logging.scala:54)
2021-07-15 18:03:36,709 | INFO  | [dispatcher-event-loop-0] | Starting task 0.0 in stage 13.0 (TID 47, node-group-1eKyO.1aac75b1-3981-40f3-b8f5-e41e74b4b9ba.com, executor 4, partition 0, NODE_LOCAL, 7833 bytes) | org.apache.spark.scheduler.TaskSetManager.logInfo(Logging.scala:54)
2021-07-15 18:03:36,717 | INFO  | [dispatcher-event-loop-1] | Added broadcast_12_piece0 in memory on node-group-1eKyO.1aac75b1-3981-40f3-b8f5-e41e74b4b9ba.com:22816 (size: 4.9 KB, free: 2004.6 MB) | org.apache.spark.storage.BlockManagerInfo.logInfo(Logging.scala:54)
2021-07-15 18:03:36,725 | INFO  | [dispatcher-event-loop-0] | Asked to send map output locations for shuffle 3 to *.*.42.204:50614 | org.apache.spark.MapOutputTrackerMasterEndpoint.logInfo(Logging.scala:54)
2021-07-15 18:03:36,748 | INFO  | [task-result-getter-2] | Finished task 0.0 in stage 13.0 (TID 47) in 40 ms on node-group-1eKyO.1aac75b1-3981-40f3-b8f5-e41e74b4b9ba.com (executor 4) (1/1) | org.apache.spark.scheduler.TaskSetManager.logInfo(Logging.scala:54)
2021-07-15 18:03:36,748 | INFO  | [task-result-getter-2] | Removed TaskSet 13.0, whose tasks have all completed, from pool  | org.apache.spark.scheduler.cluster.YarnClusterScheduler.logInfo(Logging.scala:54)
2021-07-15 18:03:36,748 | INFO  | [dag-scheduler-event-loop] | ResultStage 13 (getCallSite at StreamExecution.scala:173) finished in 0.052 s | org.apache.spark.scheduler.DAGScheduler.logInfo(Logging.scala:54)
2021-07-15 18:03:36,749 | INFO  | [stream execution thread for [id = 6c7bb949-bcb6-494a-a576-e64b05643c31, runId = d6c5cb5d-7e6c-45b7-935e-0308327a0bab]] | Job 8 finished: getCallSite at StreamExecution.scala:173, took 0.117909 s | org.apache.spark.scheduler.DAGScheduler.logInfo(Logging.scala:54)
2021-07-15 18:03:36,750 | INFO  | [stream execution thread for [id = 6c7bb949-bcb6-494a-a576-e64b05643c31, runId = d6c5cb5d-7e6c-45b7-935e-0308327a0bab]] | batchID => 3 | cn.t3go.bigdata.mains.IncrementBizKafka.call(IncrementBizKafka.java:99)
2021-07-15 18:03:36,841 | ERROR | [stream execution thread for [id = 6c7bb949-bcb6-494a-a576-e64b05643c31, runId = d6c5cb5d-7e6c-45b7-935e-0308327a0bab]] | Query [id = 6c7bb949-bcb6-494a-a576-e64b05643c31, runId = d6c5cb5d-7e6c-45b7-935e-0308327a0bab] terminated with error | org.apache.spark.sql.execution.streaming.MicroBatchExecution.logError(Logging.scala:91)
java.lang.ClassNotFoundException: Failed to find data source: org.apache.hudi. Please find packages at http://spark.apache.org/third-party-projects.html
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:704)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:265)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:249)
    at cn.t3go.bigdata.mains.IncrementBizKafka.sinkHudi(IncrementBizKafka.java:159)
    at cn.t3go.bigdata.mains.IncrementBizKafka$1.call(IncrementBizKafka.java:104)
    at cn.t3go.bigdata.mains.IncrementBizKafka$1.call(IncrementBizKafka.java:91)
    at org.apache.spark.sql.streaming.DataStreamWriter$$anonfun$foreachBatch$1.apply(DataStreamWriter.scala:391)
    at org.apache.spark.sql.streaming.DataStreamWriter$$anonfun$foreachBatch$1.apply(DataStreamWriter.scala:391)
    at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$19.apply(MicroBatchExecution.scala:548)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1$$anonfun$apply$1.apply(SQLExecution.scala:95)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:144)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:86)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:789)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:63)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:546)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:545)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
Caused by: java.lang.ClassNotFoundException: org.apache.hudi.DefaultSource
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$12.apply(DataSource.scala:681)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$12.apply(DataSource.scala:681)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:681)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:681)
    at scala.util.Try.orElse(Try.scala:84)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:681)
    ... 28 more
2021-07-15 18:03:36,868 | ERROR | [Driver] | User class threw exception: org.apache.spark.sql.streaming.StreamingQueryException: Failed to find data source: org.apache.hudi. Please find packages at http://spark.apache.org/third-party-projects.html
=== Streaming Query ===
Identifier: [id = 6c7bb949-bcb6-494a-a576-e64b05643c31, runId = d6c5cb5d-7e6c-45b7-935e-0308327a0bab]
Current Committed Offsets: {KafkaV2[Subscribe[t3_ts_android_device]]: {"t3_ts_android_device":{"2":531507,"5":531508,"4":531509,"7":531507,"1":531509,"3":531506,"6":531509,"0":531506}}}
Current Available Offsets: {KafkaV2[Subscribe[t3_ts_android_device]]: {"t3_ts_android_device":{"2":531555,"5":531556,"4":531556,"7":531555,"1":531556,"3":531554,"6":531556,"0":531554}}}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
KafkaV2[Subscribe[t3_ts_android_device]] | org.apache.spark.deploy.yarn.ApplicationMaster.logError(Logging.scala:91)
org.apache.spark.sql.streaming.StreamingQueryException: Failed to find data source: org.apache.hudi. Please find packages at http://spark.apache.org/third-party-projects.html
=== Streaming Query ===
Identifier: [id = 6c7bb949-bcb6-494a-a576-e64b05643c31, runId = d6c5cb5d-7e6c-45b7-935e-0308327a0bab]
Current Committed Offsets: {KafkaV2[Subscribe[t3_ts_android_device]]: {"t3_ts_android_device":{"2":531507,"5":531508,"4":531509,"7":531507,"1":531509,"3":531506,"6":531509,"0":531506}}}
Current Available Offsets: {KafkaV2[Subscribe[t3_ts_android_device]]: {"t3_ts_android_device":{"2":531555,"5":531556,"4":531556,"7":531555,"1":531556,"3":531554,"6":531556,"0":531554}}}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
KafkaV2[Subscribe[t3_ts_android_device]]
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:297)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
Caused by: java.lang.ClassNotFoundException: Failed to find data source: org.apache.hudi. Please find packages at http://spark.apache.org/third-party-projects.html
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:704)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:265)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:249)
    at cn.t3go.bigdata.mains.IncrementBizKafka.sinkHudi(IncrementBizKafka.java:159)
    at cn.t3go.bigdata.mains.IncrementBizKafka$1.call(IncrementBizKafka.java:104)
    at cn.t3go.bigdata.mains.IncrementBizKafka$1.call(IncrementBizKafka.java:91)
    at org.apache.spark.sql.streaming.DataStreamWriter$$anonfun$foreachBatch$1.apply(DataStreamWriter.scala:391)
    at org.apache.spark.sql.streaming.DataStreamWriter$$anonfun$foreachBatch$1.apply(DataStreamWriter.scala:391)
    at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$19.apply(MicroBatchExecution.scala:548)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1$$anonfun$apply$1.apply(SQLExecution.scala:95)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:144)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:86)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:789)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:63)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:546)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:545)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
    ... 1 more
Caused by: java.lang.ClassNotFoundException: org.apache.hudi.DefaultSource
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$12.apply(DataSource.scala:681)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$12.apply(DataSource.scala:681)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:681)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:681)
    at scala.util.Try.orElse(Try.scala:84)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:681)
    ... 28 more
2021-07-15 18:03:36,870 | INFO  | [Driver] | Final app status: FAILED, exitCode: 15, (reason: User class threw exception: org.apache.spark.sql.streaming.StreamingQueryException: Failed to find data source: org.apache.hudi. Please find packages at http://spark.apache.org/third-party-projects.html
=== Streaming Query ===
Identifier: [id = 6c7bb949-bcb6-494a-a576-e64b05643c31, runId = d6c5cb5d-7e6c-45b7-935e-0308327a0bab]
Current Committed Offsets: {KafkaV2[Subscribe[t3_ts_android_device]]: {"t3_ts_android_device":{"2":531507,"5":531508,"4":531509,"7":531507,"1":531509,"3":531506,"6":531509,"0":531506}}}
Current Available Offsets: {KafkaV2[Subscribe[t3_ts_android_device]]: {"t3_ts_android_device":{"2":531555,"5":531556,"4":531556,"7":531555,"1":531556,"3":531554,"6":531556,"0":531554}}}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
KafkaV2[Subscribe[t3_ts_android_device]]
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:297)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
Caused by: java.lang.ClassNotFoundException: Failed to find data source: org.apache.hudi. Please find packages at http://spark.apache.org/third-party-projects.html
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:704)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:265)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:249)
    at cn.t3go.bigdata.mains.IncrementBizKafka.sinkHudi(IncrementBizKafka.java:159)
    at cn.t3go.bigdata.mains.IncrementBizKafka$1.call(IncrementBizKafka.java:104)
    at cn.t3go.bigdata.mains.IncrementBizKafka$1.call(IncrementBizKafka.java:91)
    at org.apache.spark.sql.streaming.DataStreamWriter$$anonfun$foreachBatch$1.apply(DataStreamWriter.scala:391)
    at org.apache.spark.sql.streaming.DataStreamWriter$$anonfun$foreachBatch$1.apply(DataStreamWriter.scala:391)
    at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$19.apply(MicroBatchExecution.scala:548)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1$$anonfun$apply$1.apply(SQLExecution.scala:95)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:144)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:86)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:789)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:63)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:546)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:545)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
    ... 1 more
Caused by: java.lang.ClassNotFoundException: org.apache.hudi.DefaultSource
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$12.apply(DataSource.scala:681)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$12.apply(DataSource.scala:681)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:681)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:681)
    at scala.util.Try.orElse(Try.scala:84)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:681)
    ... 28 more
) | org.apache.spark.deploy.yarn.ApplicationMaster.logInfo(Logging.scala:54)
2021-07-15 18:03:36,881 | INFO  | [shutdown-hook-0] | Invoking stop() from shutdown hook | org.apache.spark.SparkContext.logInfo(Logging.scala:54)
2021-07-15 18:03:36,895 | INFO  | [shutdown-hook-0] | Stopped Spark@78afb29{HTTP/1.1, (http/1.1)}{10.4.45.248:22618} | org.spark_project.jetty.server.AbstractConnector.doStop(AbstractConnector.java:381)
2021-07-15 18:03:36,895 | INFO  | [shutdown-hook-0] | node0 Stopped scavenging | org.spark_project.jetty.server.session.stopScavenging(HouseKeeper.java:149)
2021-07-15 18:03:36,896 | INFO  | [shutdown-hook-0] | Stopped Spark web UI at http://node-group-1rXLd.1aac75b1-3981-40f3-b8f5-e41e74b4b9ba.com:22618 | org.apache.spark.ui.SparkUI.logInfo(Logging.scala:54)
2021-07-15 18:03:36,901 | INFO  | [dispatcher-event-loop-0] | Driver requested a total number of 0 executor(s). | org.apache.spark.deploy.yarn.YarnAllocator.logInfo(Logging.scala:54)
2021-07-15 18:03:36,903 | INFO  | [shutdown-hook-0] | Shutting down all executors | org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend.logInfo(Logging.scala:54)
2021-07-15 18:03:36,904 | INFO  | [dispatcher-event-loop-1] | Asking each executor to shut down | org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint.logInfo(Logging.scala:54)
2021-07-15 18:03:36,907 | INFO  | [shutdown-hook-0] | Stopping SchedulerExtensionServices
(serviceOption=None,
 services=List(),
 started=false) | org.apache.spark.scheduler.cluster.SchedulerExtensionServices.logInfo(Logging.scala:54)
2021-07-15 18:03:36,940 | INFO  | [dispatcher-event-loop-0] | MapOutputTrackerMasterEndpoint stopped! | org.apache.spark.MapOutputTrackerMasterEndpoint.logInfo(Logging.scala:54)
User class threw exception: org.apache.spark.sql.streaming.StreamingQueryException: Invalid value org.apache.kafka.common.serialization.ByteArrayDeserializer for configuration key.deserializer: Class org.apache.kafka.common.serialization.ByteArrayDeserializer could not be found.
=== Streaming Query ===
Identifier: [id = 6c7bb949-bcb6-494a-a576-e64b05643c31, runId = f93f604d-dfe0-4f1d-a88b-5be64d2fb853]
Current Committed Offsets: {KafkaV2[Subscribe[t3_ts_android_device]]: {"t3_ts_android_device":{"2":531555,"5":531556,"4":531556,"7":531555,"1":531556,"3":531554,"6":531556,"0":531554}}}
Current Available Offsets: {KafkaV2[Subscribe[t3_ts_android_device]]: {"t3_ts_android_device":{"2":531555,"5":531556,"4":531556,"7":531555,"1":531556,"3":531554,"6":531556,"0":531554}}}
Current State: ACTIVE
Thread State: RUNNABLE
Logical Plan:
KafkaV2[Subscribe[t3_ts_android_device]]
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:297)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
Caused by: org.apache.kafka.common.config.ConfigException: Invalid value org.apache.kafka.common.serialization.ByteArrayDeserializer for configuration key.deserializer: Class org.apache.kafka.common.serialization.ByteArrayDeserializer could not be found.
at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:724)
at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:469)
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:462)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:62)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:75)
at org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:499)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:615)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:596)
at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62)
at org.apache.spark.sql.kafka010.KafkaOffsetReader.consumer(KafkaOffsetReader.scala:86)
at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$11.apply(KafkaOffsetReader.scala:217)
at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$11.apply(KafkaOffsetReader.scala:215)
at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply$mcV$sp(KafkaOffsetReader.scala:358)
at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply(KafkaOffsetReader.scala:357)
at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply(KafkaOffsetReader.scala:357)
at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
at org.apache.spark.sql.kafka010.KafkaOffsetReader.org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt(KafkaOffsetReader.scala:356)
at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1.apply(KafkaOffsetReader.scala:215)
at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1.apply(KafkaOffsetReader.scala:215)
at org.apache.spark.sql.kafka010.KafkaOffsetReader.runUninterruptibly(KafkaOffsetReader.scala:325)
at org.apache.spark.sql.kafka010.KafkaOffsetReader.fetchLatestOffsets(KafkaOffsetReader.scala:214)
at org.apache.spark.sql.kafka010.KafkaMicroBatchReader$$anonfun$setOffsetRange$4.apply(KafkaMicroBatchReader.scala:97)
at org.apache.spark.sql.kafka010.KafkaMicroBatchReader$$anonfun$setOffsetRange$4.apply(KafkaMicroBatchReader.scala:95)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.kafka010.KafkaMicroBatchReader.setOffsetRange(KafkaMicroBatchReader.scala:95)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$6$$anonfun$apply$3.apply$mcV$sp(MicroBatchExecution.scala:364)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$6$$anonfun$apply$3.apply(MicroBatchExecution.scala:364)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$6$$anonfun$apply$3.apply(MicroBatchExecution.scala:364)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$6.apply(MicroBatchExecution.scala:360)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$6.apply(MicroBatchExecution.scala:352)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:352)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:348)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:348)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:568)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:348)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:183)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
liujinhui1994 commented 3 years ago

image image

liujinhui1994 commented 3 years ago

image image

liujinhui1994 commented 3 years ago
2.4.5 3.0.0 0.9.0-SNAPSHOT 2.3.1 2.11.12 2.11 1.72 1.10.0 5.1.47
liujinhui1994 commented 3 years ago

@n3nash @nsivabalan Can you help me? This problem has troubled me for a week. I tried my best to find the problem, but it still didn't solve it. Thank you very much~

pengzhiwei2018 commented 3 years ago

java.lang.ClassNotFoundException: org.apache.hudi.DefaultSource at java.net.URLClassLoader.findClass(URLClassLoader.java:382)

It seems that you have not put the hudi-spark bundle to the $SPARK_HOME/jars

liujinhui1994 commented 3 years ago

No, please take a look at my description. The first batch of the same program can write data normally, but the second batch cannot be written. If it is a jar problem, the problem should also occur the first time.

------------------ Original ------------------ From: pengzhiwei @.> Date: Thu,Jul 15,2021 10:33 PM To: apache/hudi @.> Cc: liujinhui @.>, Author @.> Subject: Re: [apache/hudi] [SUPPORT] Use structedstreaming to consume kafka to write to hudi error (#3280)

liujinhui1994 commented 3 years ago

Is there a partner who can help me?

nsivabalan commented 3 years ago

I could see two exceptions from your stacktrace:

  1. Caused by: java.lang.ClassNotFoundException: org.apache.hudi.DefaultSource
  2. User class threw exception: org.apache.spark.sql.streaming.StreamingQueryException: Invalid value org.apache.kafka.common.serialization.ByteArrayDeserializer for configuration key.deserializer: Class org.apache.kafka.common.serialization.ByteArrayDeserializer could not be found.

@liujinhui1994 :

  1. Did you change any of your kafka configs when compared to previous version.
    1. Also, can you enable logs to see whats the checkpoint value (after first batch in deltastreamer).

@vinothchandar @n3nash : Do you folks know of any changes that went in after 0.8.0 that might be of interest to us.

liujinhui1994 commented 3 years ago
  1. No changes to any kafka-related configuration, or any other configuration
  2. I am using structuredstreaming not hudi-deltastreamer
liujinhui1994 commented 3 years ago

The latest discovery is that when I turn off hive sync I don't get the above error, when I turn it on I get the above error @nsivabalan

vinothchandar commented 3 years ago

@codope can we do a git blame of the spark bundle to see what has changed since?

codope commented 3 years ago

@liujinhui1994 I tried to reproduce this locally but for me the streaming ran fine. My setup is Spark 2.4.7, Hadoop 2.7.3 and Hive 2.3.7. I tried for both COW and MOR table

val query = df.writeStream.queryName("demo").foreachBatch { (batchDF: DataFrame, _: Long) => {
        batchDF.persist()

        batchDF.write.format("org.apache.hudi")
          .option(TABLE_TYPE_OPT_KEY.key, "COPY_ON_WRITE")
          .option(PRECOMBINE_FIELD_OPT_KEY.key, "kafka_timestamp")
          .option(RECORDKEY_FIELD_OPT_KEY.key, "kafka_partition_offset")
          .option(PARTITIONPATH_FIELD_OPT_KEY.key, "partition_date")
          .option(TABLE_NAME_OPT_KEY.key, "copy_on_write_table")
          .option("hoodie.table.name", "copy_on_write_table")
          .option(HIVE_SYNC_ENABLED_OPT_KEY.key, true)
          .option(HIVE_STYLE_PARTITIONING_OPT_KEY.key, true)
          .mode(SaveMode.Append)
          .save("/tmp/sparkHudi/COPY_ON_WRITE")

        batchDF.write.format("org.apache.hudi")
          .option(TABLE_TYPE_OPT_KEY.key, "MERGE_ON_READ")
          .option(TABLE_TYPE_OPT_KEY.key, "COPY_ON_WRITE")
          .option(PRECOMBINE_FIELD_OPT_KEY.key, "kafka_timestamp")
          .option(RECORDKEY_FIELD_OPT_KEY.key, "kafka_partition_offset")
          .option(PARTITIONPATH_FIELD_OPT_KEY.key, "partition_date")
          .option(TABLE_NAME_OPT_KEY.key, "merge_on_read_table")
          .option("hoodie.table.name", "merge_on_read_table")
          .option(HIVE_SYNC_ENABLED_OPT_KEY.key, true)
          .option(HIVE_STYLE_PARTITIONING_OPT_KEY.key, true)
          .mode(SaveMode.Append)
          .save("/tmp/sparkHudi/MERGE_ON_READ")

        batchDF.unpersist()
      }}.option("checkpointLocation", "/tmp/sparkHudi/checkpoint/").start()

Could you try again with the latest master?

liujinhui1994 commented 3 years ago

I re-synchronized the latest code this time, and now it works. I think a lot of content has been updated recently. It should have been repaired.I have informed my colleagues that they are now also updating to the latest version. I found it was also successful @codope

nsivabalan commented 3 years ago

thanks @codope

liujinhui1994 commented 3 years ago

@nsivabalan @codope @vinothchandar thanks