apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.39k stars 2.42k forks source link

[SUPPORT] Trino can't read tables created by Flink Hudi conector #9435

Open Riddle4045 opened 1 year ago

Riddle4045 commented 1 year ago

I am creating a hudi table using Flink Hudi connector

CREATE TABLE flink.flink_hudi_hms3 (
  uuid VARCHAR(20),
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
  'connector' = 'hudi',
  'path' =  'abfs://flink@test.dfs.core.windows.net/hudi/t1hms3',
  'table.type' = 'COPY_ON_WRITE',  
  'hive_sync.enable' = 'true',    
  'hive_sync.mode' = 'hms',     
  'hive_sync.metastore.uris' = 'thrift://hive-metastore:9083' 
);

I Insert data into the table

INSERT INTO  flink_hudi_hms3 VALUES
  ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
  ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
  ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
  ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
  ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
  ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
  ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
  ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');

I can see the registered table in Trino, but when I try to read data from the table, I get the following error

io.trino.spi.TrinoException: line 1:8: SELECT * not allowed from relation that has no columns
    at io.trino.sql.analyzer.SemanticExceptions.semanticException(SemanticExceptions.java:48)
    at io.trino.sql.analyzer.SemanticExceptions.semanticException(SemanticExceptions.java:43)
    at io.trino.sql.analyzer.StatementAnalyzer$Visitor.analyzeSelectAllColumns(StatementAnalyzer.java:4249)
    at io.trino.sql.analyzer.StatementAnalyzer$Visitor.analyzeSelect(StatementAnalyzer.java:4185)
    at io.trino.sql.analyzer.StatementAnalyzer$Visitor.visitQuerySpecification(StatementAnalyzer.java:2913)
    at io.trino.sql.analyzer.StatementAnalyzer$Visitor.visitQuerySpecification(StatementAnalyzer.java:479)
    at io.trino.sql.tree.QuerySpecification.accept(QuerySpecification.java:155)
    at io.trino.sql.tree.AstVisitor.process(AstVisitor.java:27)
    at io.trino.sql.analyzer.StatementAnalyzer$Visitor.process(StatementAnalyzer.java:496)
    at io.trino.sql.analyzer.StatementAnalyzer$Visitor.process(StatementAnalyzer.java:504)
    at io.trino.sql.analyzer.StatementAnalyzer$Visitor.visitQuery(StatementAnalyzer.java:1470)
    at io.trino.sql.analyzer.StatementAnalyzer$Visitor.visitQuery(StatementAnalyzer.java:479)
    at io.trino.sql.tree.Query.accept(Query.java:107)
    at io.trino.sql.tree.AstVisitor.process(AstVisitor.java:27)
    at io.trino.sql.analyzer.StatementAnalyzer$Visitor.process(StatementAnalyzer.java:496)
    at io.trino.sql.analyzer.StatementAnalyzer.analyze(StatementAnalyzer.java:458)
    at io.trino.sql.analyzer.Analyzer.analyze(Analyzer.java:79)
    at io.trino.sql.analyzer.Analyzer.analyze(Analyzer.java:71)
    at io.trino.execution.SqlQueryExecution.analyze(SqlQueryExecution.java:268)
    at io.trino.execution.SqlQueryExecution.<init>(SqlQueryExecution.java:203)
    at io.trino.execution.SqlQueryExecution$SqlQueryExecutionFactory.createQueryExecution(SqlQueryExecution.java:850)
    at io.trino.dispatcher.LocalDispatchQueryFactory.lambda$createDispatchQuery$0(LocalDispatchQueryFactory.java:149)
    at io.trino.$gen.Trino_0_410_0_1_0_5_2____20230812_012523_2.call(Unknown Source)
    at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:131)
    at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:74)
    at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:82)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)

Show create table in Trino returns no details on the table.

trino:flink> show create table flink_hudi_hms3;
               Create Table
-------------------------------------------
 CREATE TABLE hive.flink.flink_hudi_hms3 (

 )
(1 row)

What am I missing here? I tried reading this from Trino Hive catalog above, then I use Hudi connector - I get the error

USE
trino:flink> select * from  flink_hudi_hms3;
Query 20230812_014715_00026_st296 failed: Not a Hudi table: flink.flink_hudi_hms3 
danny0405 commented 1 year ago

Did you check the log of JM to ensure the hive sync works as expected?

Riddle4045 commented 1 year ago

Did you check the log of JM to ensure the hive sync works as expected?

@danny0405 just checked JM logs. I do see some errors, and they seem to be related to HMS DDL execution failure.

023-08-14 06:23:44.846 [] pool-44124-thread-1 INFO  flink org.apache.hudi.client.HoodieTimelineArchiver 179 No Instants to archive
2023-08-14 06:23:44.916 [] pool-44124-thread-1 INFO  flink org.apache.hudi.sink.StreamWriteOperatorCoordinator 542 Commit instant [20230814062340927] success!
2023-08-14 06:23:44.921 [] pool-44124-thread-1 INFO  flink apache.hudi.common.table.HoodieTableMetaClient 130 Loading HoodieTableMetaClient from abfs://flink@test.dfs.core.windows.net/hudi/t1hms3
2023-08-14 06:23:44.934 [] pool-44124-thread-1 INFO  flink apache.hudi.common.table.HoodieTableConfig 268 Loading table properties from abfs://flink@test.dfs.core.windows.net/hudi/t1hms3/.hoodie/hoodie.properties
2023-08-14 06:23:44.960 [] pool-44124-thread-1 INFO  flink apache.hudi.common.table.HoodieTableMetaClient 149 Finished Loading Table of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from abfs://flink@test.dfs.core.windows.net/hudi/t1hms3
2023-08-14 06:23:44.960 [] pool-44124-thread-1 INFO  flink apache.hudi.common.table.HoodieTableMetaClient 152 Loading Active commit timeline for abfs://flink@test.dfs.core.windows.net/hudi/t1hms3
2023-08-14 06:23:44.979 [] pool-44124-thread-1 INFO  flink hudi.common.table.timeline.HoodieActiveTimeline 171 Loaded instants upto : Option{val=[20230814062340927__commit__COMPLETED]}
2023-08-14 06:23:44.982 [] pool-44124-thread-1 ERROR flink org.apache.hudi.sink.StreamWriteOperatorCoordinator 140 Executor executes action [handle end input event for instant 20230814062340927] error
java.lang.NoClassDefFoundError: org/apache/calcite/plan/RelOptRule
    at java.lang.Class.getDeclaredMethods0(Native Method) ~[?:?]
    at java.lang.Class.privateGetDeclaredMethods(Unknown Source) ~[?:?]
    at java.lang.Class.getMethodsRecursive(Unknown Source) ~[?:?]
    at java.lang.Class.getMethod0(Unknown Source) ~[?:?]
    at java.lang.Class.getMethod(Unknown Source) ~[?:?]
    at org.apache.hudi.hive.ddl.HMSDDLExecutor.<init>(HMSDDLExecutor.java:86) ~[blob_p-f6d17d46787238a244a654163d9291f028bd4b50-f555f20a32e666dc6370967ae417e027:0.13.0]
    at org.apache.hudi.hive.HoodieHiveSyncClient.<init>(HoodieHiveSyncClient.java:87) ~[blob_p-f6d17d46787238a244a654163d9291f028bd4b50-f555f20a32e666dc6370967ae417e027:0.13.0]
    at org.apache.hudi.hive.HiveSyncTool.initSyncClient(HiveSyncTool.java:119) ~[blob_p-f6d17d46787238a244a654163d9291f028bd4b50-f555f20a32e666dc6370967ae417e027:0.13.0]
    at org.apache.hudi.hive.HiveSyncTool.<init>(HiveSyncTool.java:113) ~[blob_p-f6d17d46787238a244a654163d9291f028bd4b50-f555f20a32e666dc6370967ae417e027:0.13.0]
    at org.apache.hudi.sink.utils.HiveSyncContext.hiveSyncTool(HiveSyncContext.java:81) ~[blob_p-f6d17d46787238a244a654163d9291f028bd4b50-f555f20a32e666dc6370967ae417e027:0.13.0]
    at org.apache.hudi.sink.StreamWriteOperatorCoordinator.doSyncHive(StreamWriteOperatorCoordinator.java:338) ~[blob_p-f6d17d46787238a244a654163d9291f028bd4b50-f555f20a32e666dc6370967ae417e027:0.13.0]
    at org.apache.hudi.sink.StreamWriteOperatorCoordinator.syncHive(StreamWriteOperatorCoordinator.java:329) ~[blob_p-f6d17d46787238a244a654163d9291f028bd4b50-f555f20a32e666dc6370967ae417e027:0.13.0]
    at org.apache.hudi.sink.StreamWriteOperatorCoordinator.handleEndInputEvent(StreamWriteOperatorCoordinator.java:433) ~[blob_p-f6d17d46787238a244a654163d9291f028bd4b50-f555f20a32e666dc6370967ae417e027:0.13.0]
    at org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$handleEventFromOperator$3(StreamWriteOperatorCoordinator.java:281) ~[blob_p-f6d17d46787238a244a654163d9291f028bd4b50-f555f20a32e666dc6370967ae417e027:0.13.0]
    at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:130) ~[blob_p-f6d17d46787238a244a654163d9291f028bd4b50-f555f20a32e666dc6370967ae417e027:0.13.0]
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [?:?]
    at java.util.concurrent.FutureTask.run(Unknown Source) [?:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
    at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: java.lang.ClassNotFoundException: org.apache.calcite.plan.RelOptRule
    at jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown Source) ~[?:?]
    at jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown Source) ~[?:?]
    at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
    ... 20 more
2023-08-14 06:23:44.983 [] flink-akka.actor.default-dispatcher-20 INFO  flink apache.flink.runtime.executiongraph.ExecutionGraph 1435 stream_write: flink_hudi_hms3 -> Sink: clean_commits (1/1) (95e11513c14db026e5f45d6c8222d2aa_8d3451599e14a899830b68533f64c818_0_0) switched from RUNNING to FINISHED.
2023-08-14 06:23:44.983 [] flink-akka.actor.default-dispatcher-15 INFO  flink flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager 292 Clearing resource requirements of job feae5f184bade9247fe744f103bc101a
2023-08-14 06:23:44.984 [] flink-akka.actor.default-dispatcher-20 INFO  flink apache.flink.runtime.executiongraph.ExecutionGraph 1152 Job insert-into_myhive.flink.flink_hudi_hms3 (feae5f184bade9247fe744f103bc101a) switched from state RUNNING to FINISHED.
2023-08-14 06:23:44.984 [] flink-akka.actor.default-dispatcher-20 INFO  flink apache.flink.runtime.checkpoint.CheckpointCoordinator 411 Stopping checkpoint coordinator for job feae5f184bade9247fe744f103bc101a.
2023-08-14 06:23:44.985 [] flink-akka.actor.default-dispatcher-20 INFO  flink apache.flink.runtime.jobmaster.JobMaster 297 Trying to recover from a global failure.
org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'stream_write: flink_hudi_hms3 -> Sink: clean_commits' (operator 8d3451599e14a899830b68533f64c818).
    at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:617) ~[flink-dist-1.16.0-0.0.18.jar:1.16.0-0.0.18]
    at org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$start$0(StreamWriteOperatorCoordinator.java:190) ~[?:?]
    at org.apache.hudi.sink.utils.NonThrownExecutor.handleException(NonThrownExecutor.java:142) ~[?:?]
    at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:133) ~[?:?]
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
    at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
    at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: org.apache.hudi.exception.HoodieException: Executor executes action [handle end input event for instant 20230814062340927] error
    ... 8 more
Caused by: java.lang.NoClassDefFoundError: org/apache/calcite/plan/RelOptRule
    at java.lang.Class.getDeclaredMethods0(Native Method) ~[?:?]
    at java.lang.Class.privateGetDeclaredMethods(Unknown Source) ~[?:?]
    at java.lang.Class.getMethodsRecursive(Unknown Source) ~[?:?]
    at java.lang.Class.getMethod0(Unknown Source) ~[?:?]
    at java.lang.Class.getMethod(Unknown Source) ~[?:?]
    at org.apache.hudi.hive.ddl.HMSDDLExecutor.<init>(HMSDDLExecutor.java:86) ~[?:?]
    at org.apache.hudi.hive.HoodieHiveSyncClient.<init>(HoodieHiveSyncClient.java:87) ~[?:?]
    at org.apache.hudi.hive.HiveSyncTool.initSyncClient(HiveSyncTool.java:119) ~[?:?]
    at org.apache.hudi.hive.HiveSyncTool.<init>(HiveSyncTool.java:113) ~[?:?]
    at org.apache.hudi.sink.utils.HiveSyncContext.hiveSyncTool(HiveSyncContext.java:81) ~[?:?]
    at org.apache.hudi.sink.StreamWriteOperatorCoordinator.doSyncHive(StreamWriteOperatorCoordinator.java:338) ~[?:?]
    at org.apache.hudi.sink.StreamWriteOperatorCoordinator.syncHive(StreamWriteOperatorCoordinator.java:329) ~[?:?]
    at org.apache.hudi.sink.StreamWriteOperatorCoordinator.handleEndInputEvent(StreamWriteOperatorCoordinator.java:433) ~[?:?]
    at org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$handleEventFromOperator$3(StreamWriteOperatorCoordinator.java:281) ~[?:?]
    at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:130) ~[?:?]
    ... 5 more
Caused by: java.lang.ClassNotFoundException: org.apache.calcite.plan.RelOptRule
    at jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown Source) ~[?:?]
    at jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown Source) ~[?:?]
    at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
    at java.lang.Class.getDeclaredMethods0(Native Method) ~[?:?]
    at java.lang.Class.privateGetDeclaredMethods(Unknown Source) ~[?:?]
    at java.lang.Class.getMethodsRecursive(Unknown Source) ~[?:?]
    at java.lang.Class.getMethod0(Unknown Source) ~[?:?]
    at java.lang.Class.getMethod(Unknown Source) ~[?:?]
    at org.apache.hudi.hive.ddl.HMSDDLExecutor.<init>(HMSDDLExecutor.java:86) ~[?:?]
    at org.apache.hudi.hive.HoodieHiveSyncClient.<init>(HoodieHiveSyncClient.java:87) ~[?:?]
    at org.apache.hudi.hive.HiveSyncTool.initSyncClient(HiveSyncTool.java:119) ~[?:?]
    at org.apache.hudi.hive.HiveSyncTool.<init>(HiveSyncTool.java:113) ~[?:?]
    at org.apache.hudi.sink.utils.HiveSyncContext.hiveSyncTool(HiveSyncContext.java:81) ~[?:?]
    at org.apache.hudi.sink.StreamWriteOperatorCoordinator.doSyncHive(StreamWriteOperatorCoordinator.java:338) ~[?:?]
    at org.apache.hudi.sink.StreamWriteOperatorCoordinator.syncHive(StreamWriteOperatorCoordinator.java:329) ~[?:?]
    at org.apache.hudi.sink.StreamWriteOperatorCoordinator.handleEndInputEvent(StreamWriteOperatorCoordinator.java:433) ~[?:?]
    at org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$handleEventFromOperator$3(StreamWriteOperatorCoordinator.java:281) ~[?:?]
    at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:130) ~[?:?]
    ... 5 more
2023-08-14 06:23:44.991 [] flink-akka.actor.default-dispatcher-20 INFO  flink apache.flink.runtime.dispatcher.StandaloneDispatcher 1113 Job feae5f184bade9247fe744f103bc101a reached terminal state FINISHED.
2023-08-14 06:23:45.108 [] cluster-io-thread-1 INFO  flink apache.flink.runtime.history.FsJobArchivist 91 Job feae5f184bade9247fe744f103bc101a has been archived at abfs://flink@test.dfs.core.windows.net/completed-jobs/feae5f184bade9247fe744f103bc101a.
2023-08-14 06:23:45.162 [] cluster-io-thread-3 INFO  flink apache.flink.kubernetes.highavailability.KubernetesMultipleComponentLeaderElectionHaServices 214 Clean up the high availability data for job 8b89dcbd2662be08651d7a1fcacb1c4f.
2023-08-14 06:23:45.164 [] cluster-io-thread-3 WARN  flink flink.runtime.dispatcher.cleanup.DefaultResourceCleaner 223 Cleanup of HighAvailabilityServices failed for job 8b89dcbd2662be08651d7a1fcacb1c4f due to a CompletionException: java.util.concurrent.ExecutionException: io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: DELETE at: https://10.0.0.1/api/v1/namespaces/97e6cfd01ddd497db1cfdd13ed38d5c9/configmaps/flink-97e6cfd01ddd497db1cfdd13ed38d5c9-8b89dcbd2662be08651d7a1fcacb1c4f-config-map. Message: Forbidden!Configured service account doesn't have access. Service account may have been revoked. configmaps "flink-97e6cfd01ddd497db1cfdd13ed38d5c9-8b89dcbd2662be08651d7a1fcacb1c4f-config-map" is forbidden: User "system:serviceaccount:97e6cfd01ddd497db1cfdd13ed38d5c9:97e6cfd01ddd497db1cfdd13ed38d5c9" cannot delete resource "configmaps" in API group "" in the namespace "97e6cfd01ddd497db1cfdd13ed38d5c9".
2023-08-14 06:23:45.258 [] cluster-io-thread-4 INFO  flink apache.flink.runtime.dispatcher.StandaloneDispatcher 1161 Job feae5f184bade9247fe744f103bc101a has been registered for cleanup in the JobResultStore after reaching a terminal state.
2023-08-14 06:23:45.258 [] flink-akka.actor.default-dispatcher-15 INFO  flink apache.flink.runtime.jobmaster.JobMaster 434 Stopping the JobMaster for job 'insert-into_myhive.flink.flink_hudi_hms3' (feae5f184bade9247fe744f103bc101a).
2023-08-14 06:23:45.259 [] flink-akka.actor.default-dispatcher-15 INFO  flink org.apache.hudi.client.BaseHoodieClient 111 Stopping Timeline service !!
2023-08-14 06:23:45.260 [] flink-akka.actor.default-dispatcher-15 INFO  flink apache.hudi.client.embedded.EmbeddedTimelineService 151 Closing Timeline server
2023-08-14 06:23:45.260 [] flink-akka.actor.default-dispatcher-15 INFO  flink apache.hudi.timeline.service.TimelineService 399 Closing Timeline Service
2023-08-14 06:23:45.260 [] flink-akka.actor.default-dispatcher-15 INFO  flink io.javalin.Javalin 22 Stopping Javalin ...
2023-08-14 06:23:45.264 [] flink-akka.actor.default-dispatcher-15 INFO  flink io.javalin.Javalin 22 Javalin has stopped
2023-08-14 06:23:45.265 [] flink-akka.actor.default-dispatcher-15 INFO  flink apache.hudi.timeline.service.TimelineService 408 Closed Timeline Service
2023-08-14 06:23:45.265 [] flink-akka.actor.default-dispatcher-15 INFO  flink apache.hudi.client.embedded.EmbeddedTimelineService 155 Closed Timeline server
2023-08-14 06:23:45.266 [] flink-akka.actor.default-dispatcher-15 INFO  flink org.apache.hudi.client.BaseHoodieClient 111 Stopping Timeline service !!
2023-08-14 06:23:45.266 [] flink-akka.actor.default-dispatcher-15 INFO  flink apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore 182 Shutting down
2023
Riddle4045 commented 1 year ago

@danny0405 I checked the Table props in metastore of a table that's synced using Hudi HMS sync tool vs the Flink table I mentioned below. I see very different properties here

Table props for table creating using Hudi HMS sync tool

TBL_ID  PARAM_KEY   PARAM_VALUE
250 EXTERNAL    TRUE
250 last_commit_time_sync   20230601210025262
250 numFiles    0
250 spark.sql.sources.provider  hudi
250 spark.sql.sources.schema.numParts   1
250 spark.sql.sources.schema.part.0 {"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"rideId","type":"long","nullable":true,"metadata":{}},{"name":"driverId","type":"long","nullable":false,"metadata":{}},{"name":"taxiId","type":"long","nullable":true,"metadata":{}},{"name":"startTime","type":"long","nullable":true,"metadata":{}},{"name":"tip","type":"float","nullable":true,"metadata":{}},{"name":"tolls","type":"float","nullable":true,"metadata":{}},{"name":"totalFare","type":"float","nullable":true,"metadata":{}}]}
250 totalSize   0
250 transient_lastDdlTime   1685653353

HMS props for the Hudi table creating using Flink SQL

TBL_ID  PARAM_KEY   PARAM_VALUE
335 flink.comment   
335 flink.connector hudi
335 flink.hive_sync.enable  true
335 flink.hive_sync.metastore.uris  thrift://hive-metastore:9083
335 flink.hive_sync.mode    hms
335 flink.partition.keys.0.name partition
335 flink.path  abfs://flink@test.dfs.core.windows.net/hudi/t1hms4
335 flink.schema.0.data-type    VARCHAR(20)
335 flink.schema.0.name uuid
335 flink.schema.1.data-type    VARCHAR(10)
335 flink.schema.1.name name
335 flink.schema.2.data-type    INT
335 flink.schema.2.name age
335 flink.schema.3.data-type    TIMESTAMP(3)
335 flink.schema.3.name ts
335 flink.schema.4.data-type    VARCHAR(20)
335 flink.schema.4.name partition
335 flink.table.type    COPY_ON_WRITE
335 transient_lastDdlTime   1691804292
danny0405 commented 1 year ago

HMS props for the Hudi table creating using Flink SQL

You are using the Flink Hive catalog, the table are actually created by the hive catalog. Actually we have a separate Hudi hive catalog instead, the syntax looks like:

  CREATE CATALOG hoodie_catalog
  WITH (
    'type'='hudi',
    'catalog.path' = '${catalog root path}',
    'hive.conf.dir' = '${hive-site.xml dir}',
    'mode'='hms'
  );

The error log in JM indicates a missing calcite-core jar, you can fix it by adding it to the classpath.

Riddle4045 commented 1 year ago

HMS props for the Hudi table creating using Flink SQL

You are using the Flink Hive catalog, the table are actually created by the hive catalog. Actually we have a separate Hudi hive catalog instead, the syntax looks like:

  CREATE CATALOG hoodie_catalog
  WITH (
    'type'='hudi',
    'catalog.path' = '${catalog root path}',
    'hive.conf.dir' = '${hive-site.xml dir}',
    'mode'='hms'
  );

The error log in JM indicates a missing calcite-core jar, you can fix it by adding it to the classpath.

Thanks, I'll give it a try! @danny0405 in the table definition I specified connector=hudi is that not sufficient? Also, are there any public facing docs for hudi catalog like what properties are supported, could you please share? - I couldn't find anything in the Flink guide

danny0405 commented 1 year ago

I didn't write that in hudi website, will fire a fix for it.

Riddle4045 commented 1 year ago

I didn't write that in hudi website, will fire a fix for it.

@danny0405 - Also, would it be possible for hiv.conf.dir param in the catalog definition to be a directory? Looking explicitly for hive-site.xml is a bit restrictive as other HDFS based setups in Flink make do with core-site.xml, thoughts?

danny0405 commented 1 year ago

hiv.conf.dir param

Sorry for the obscureness, the param value is desired to be a directory.

danny0405 commented 1 year ago

DOC is here: https://github.com/apache/hudi/pull/9453

Riddle4045 commented 1 year ago

Thank you @danny0405 - I was able to write the data using Hudi catalog!

For some reason, Trino(410) returns 0 rows when reading the data from these tables.

I noticed that for MOR tables, the compaction doesn't happen after 5 commits image

As a result there are no base files and trino tables returns 0 rows when trying to read

image

is that expected? I am also seeing the COW tables also don't return any rows.

danny0405 commented 1 year ago

Can you show me the runtime DAG of the flink streaming job?

Riddle4045 commented 1 year ago

@danny0405 sorry for the late response, here is the DAG. From the DAG it looks like compaction is scheduled.

Interesting note here that I am running into the same issue with COW table flink writes too, trino results empty result when querying them

image

Riddle4045 commented 1 year ago

@danny0405 I looked at the JM logs and it looks like the same issue about calcite/plan/RelOptRule not being found, what's interesting is that even when I manually ADD JAR to my SQL Client I am seeing this exception - does classpath for compaction task differ from the job at all?

StackTracke

2023-08-21 23:56:26.647 [] flink-akka.actor.default-dispatcher-19 INFO  flink apache.flink.runtime.executiongraph.ExecutionGraph 561 Deploying Source: Values[1] -> Calc[2] -> row_data_to_hoodie_record (1/1) (attempt #0) with attempt id 1ce0c00a771c0214a3141bd885bcfdb0_cbc357ccb763df2852fee8c4fc7d55f2_0_0 and vertex id cbc357ccb763df2852fee8c4fc7d55f2_0 to 10.244.7.3:6122-d80422 @ 10.244.7.3 (dataPort=40143) with allocation id 2786e9f15bcf024e1208268a4eb1a9f4
2023-08-21 23:56:26.656 [] flink-akka.actor.default-dispatcher-19 INFO  flink apache.flink.runtime.executiongraph.ExecutionGraph 1435 bucket_assigner (1/1) (1ce0c00a771c0214a3141bd885bcfdb0_e5df0a348bd9bf84b2755743802fc12d_0_0) switched from SCHEDULED to DEPLOYING.
2023-08-21 23:56:26.656 [] flink-akka.actor.default-dispatcher-19 INFO  flink apache.flink.runtime.executiongraph.ExecutionGraph 561 Deploying bucket_assigner (1/1) (attempt #0) with attempt id 1ce0c00a771c0214a3141bd885bcfdb0_e5df0a348bd9bf84b2755743802fc12d_0_0 and vertex id e5df0a348bd9bf84b2755743802fc12d_0 to 10.244.7.3:6122-d80422 @ 10.244.7.3 (dataPort=40143) with allocation id 2786e9f15bcf024e1208268a4eb1a9f4
2023-08-21 23:56:26.656 [] flink-akka.actor.default-dispatcher-19 INFO  flink apache.flink.runtime.executiongraph.ExecutionGraph 1435 stream_write: mor (1/1) (1ce0c00a771c0214a3141bd885bcfdb0_de00fca8057a992b9689a76776a87e88_0_0) switched from SCHEDULED to DEPLOYING.
2023-08-21 23:56:26.657 [] flink-akka.actor.default-dispatcher-19 INFO  flink apache.flink.runtime.executiongraph.ExecutionGraph 561 Deploying stream_write: mor (1/1) (attempt #0) with attempt id 1ce0c00a771c0214a3141bd885bcfdb0_de00fca8057a992b9689a76776a87e88_0_0 and vertex id de00fca8057a992b9689a76776a87e88_0 to 10.244.7.3:6122-d80422 @ 10.244.7.3 (dataPort=40143) with allocation id 2786e9f15bcf024e1208268a4eb1a9f4
2023-08-21 23:56:26.657 [] flink-akka.actor.default-dispatcher-19 INFO  flink apache.flink.runtime.executiongraph.ExecutionGraph 1435 compact_plan_generate (1/1) (1ce0c00a771c0214a3141bd885bcfdb0_454d170138cbd567f9c11b29e690b935_0_0) switched from SCHEDULED to DEPLOYING.
2023-08-21 23:56:26.657 [] flink-akka.actor.default-dispatcher-19 INFO  flink apache.flink.runtime.executiongraph.ExecutionGraph 561 Deploying compact_plan_generate (1/1) (attempt #0) with attempt id 1ce0c00a771c0214a3141bd885bcfdb0_454d170138cbd567f9c11b29e690b935_0_0 and vertex id 454d170138cbd567f9c11b29e690b935_0 to 10.244.7.3:6122-d80422 @ 10.244.7.3 (dataPort=40143) with allocation id 2786e9f15bcf024e1208268a4eb1a9f4
2023-08-21 23:56:26.657 [] flink-akka.actor.default-dispatcher-19 INFO  flink apache.flink.runtime.executiongraph.ExecutionGraph 1435 compact_task -> Sink: compact_commit (1/1) (1ce0c00a771c0214a3141bd885bcfdb0_848dfd76319f29eb5fbacd6c182b1d35_0_0) switched from SCHEDULED to DEPLOYING.
2023-08-21 23:56:26.657 [] flink-akka.actor.default-dispatcher-19 INFO  flink apache.flink.runtime.executiongraph.ExecutionGraph 561 Deploying compact_task -> Sink: compact_commit (1/1) (attempt #0) with attempt id 1ce0c00a771c0214a3141bd885bcfdb0_848dfd76319f29eb5fbacd6c182b1d35_0_0 and vertex id 848dfd76319f29eb5fbacd6c182b1d35_0 to 10.244.7.3:6122-d80422 @ 10.244.7.3 (dataPort=40143) with allocation id 2786e9f15bcf024e1208268a4eb1a9f4
2023-08-21 23:56:28.173 [] flink-akka.actor.default-dispatcher-15 INFO  flink apache.flink.runtime.executiongraph.ExecutionGraph 1435 stream_write: mor (1/1) (1ce0c00a771c0214a3141bd885bcfdb0_de00fca8057a992b9689a76776a87e88_0_0) switched from DEPLOYING to INITIALIZING.
2023-08-21 23:56:28.173 [] flink-akka.actor.default-dispatcher-15 INFO  flink apache.flink.runtime.executiongraph.ExecutionGraph 1435 compact_task -> Sink: compact_commit (1/1) (1ce0c00a771c0214a3141bd885bcfdb0_848dfd76319f29eb5fbacd6c182b1d35_0_0) switched from DEPLOYING to INITIALIZING.
2023-08-21 23:56:28.187 [] flink-akka.actor.default-dispatcher-14 INFO  flink apache.flink.runtime.executiongraph.ExecutionGraph 1435 compact_plan_generate (1/1) (1ce0c00a771c0214a3141bd885bcfdb0_454d170138cbd567f9c11b29e690b935_0_0) switched from DEPLOYING to INITIALIZING.
2023-08-21 23:56:28.187 [] flink-akka.actor.default-dispatcher-14 INFO  flink apache.flink.runtime.executiongraph.ExecutionGraph 1435 Source: Values[1] -> Calc[2] -> row_data_to_hoodie_record (1/1) (1ce0c00a771c0214a3141bd885bcfdb0_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from DEPLOYING to INITIALIZING.
2023-08-21 23:56:28.188 [] flink-akka.actor.default-dispatcher-14 INFO  flink apache.flink.runtime.executiongraph.ExecutionGraph 1435 bucket_assigner (1/1) (1ce0c00a771c0214a3141bd885bcfdb0_e5df0a348bd9bf84b2755743802fc12d_0_0) switched from DEPLOYING to INITIALIZING.
2023-08-21 23:56:28.305 [] flink-akka.actor.default-dispatcher-15 WARN  flink apache.flink.runtime.taskmanager.TaskManagerLocation 267 No hostname could be resolved for the IP address 10.244.7.3, using IP address as host name. Local input split assignment (such as for HDFS files) may be impacted.
2023-08-21 23:56:28.307 [] flink-akka.actor.default-dispatcher-14 INFO  flink apache.flink.runtime.executiongraph.ExecutionGraph 1435 Source: Values[1] -> Calc[2] -> row_data_to_hoodie_record (1/1) (1ce0c00a771c0214a3141bd885bcfdb0_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from INITIALIZING to RUNNING.
2023-08-21 23:56:28.329 [] flink-akka.actor.default-dispatcher-14 INFO  flink apache.flink.runtime.executiongraph.ExecutionGraph 1435 Source: Values[1] -> Calc[2] -> row_data_to_hoodie_record (1/1) (1ce0c00a771c0214a3141bd885bcfdb0_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from RUNNING to FINISHED.
2023-08-21 23:56:28.605 [] flink-akka.actor.default-dispatcher-15 INFO  flink apache.flink.runtime.executiongraph.ExecutionGraph 1435 compact_plan_generate (1/1) (1ce0c00a771c0214a3141bd885bcfdb0_454d170138cbd567f9c11b29e690b935_0_0) switched from INITIALIZING to RUNNING.
2023-08-21 23:56:28.806 [] JettyServerThreadPool-8637 INFO  flink apache.hudi.common.table.HoodieTableMetaClient 130 Loading HoodieTableMetaClient from abfs://flink@test.dfs.core.windows.net/hudi/mor
2023-08-21 23:56:28.849 [] JettyServerThreadPool-8637 INFO  flink apache.hudi.common.table.HoodieTableConfig 268 Loading table properties from abfs://flink@test.dfs.core.windows.net/hudi/mor/.hoodie/hoodie.properties
2023-08-21 23:56:28.876 [] JettyServerThreadPool-8637 INFO  flink apache.hudi.common.table.HoodieTableMetaClient 149 Finished Loading Table of type MERGE_ON_READ(version=1, baseFileFormat=PARQUET) from abfs://flink@test.dfs.core.windows.net/hudi/mor
2023-08-21 23:56:28.876 [] JettyServerThreadPool-8637 INFO  flink hudi.common.table.view.FileSystemViewManager 165 Creating InMemory based view for basePath abfs://flink@test.dfs.core.windows.net/hudi/mor
2023-08-21 23:56:28.896 [] JettyServerThreadPool-8637 INFO  flink hudi.common.table.timeline.HoodieActiveTimeline 171 Loaded instants upto : Option{val=[20230821235145161__deltacommit__COMPLETED]}
2023-08-21 23:56:28.899 [] JettyServerThreadPool-8637 INFO  flink hudi.common.table.view.AbstractTableFileSystemView 252 Took 0 ms to read  0 instants, 0 replaced file groups
2023-08-21 23:56:28.926 [] JettyServerThreadPool-8637 INFO  flink apache.hudi.common.util.ClusteringUtils 139 Found 0 files in pending clustering operations
2023-08-21 23:56:30.251 [] flink-akka.actor.default-dispatcher-15 INFO  flink apache.flink.runtime.executiongraph.ExecutionGraph 1435 compact_task -> Sink: compact_commit (1/1) (1ce0c00a771c0214a3141bd885bcfdb0_848dfd76319f29eb5fbacd6c182b1d35_0_0) switched from INITIALIZING to RUNNING.
2023-08-21 23:56:30.326 [] pool-5111-thread-1 INFO  flink apache.hudi.common.table.HoodieTableMetaClient 130 Loading HoodieTableMetaClient from abfs://flink@test.dfs.core.windows.net/hudi/mor/
2023-08-21 23:56:30.326 [] flink-akka.actor.default-dispatcher-15 INFO  flink apache.flink.runtime.executiongraph.ExecutionGraph 1435 stream_write: mor (1/1) (1ce0c00a771c0214a3141bd885bcfdb0_de00fca8057a992b9689a76776a87e88_0_0) switched from INITIALIZING to RUNNING.
2023-08-21 23:56:30.345 [] pool-5111-thread-1 INFO  flink apache.hudi.common.table.HoodieTableConfig 268 Loading table properties from abfs://flink@test.dfs.core.windows.net/hudi/mor/.hoodie/hoodie.properties
2023-08-21 23:56:30.371 [] pool-5111-thread-1 INFO  flink apache.hudi.common.table.HoodieTableMetaClient 149 Finished Loading Table of type MERGE_ON_READ(version=1, baseFileFormat=PARQUET) from abfs://flink@test.dfs.core.windows.net/hudi/mor/
2023-08-21 23:56:30.392 [] pool-5111-thread-1 INFO  flink hudi.common.table.timeline.HoodieActiveTimeline 171 Loaded instants upto : Option{val=[20230821235145161__deltacommit__COMPLETED]}
2023-08-21 23:56:30.392 [] pool-5111-thread-1 INFO  flink org.apache.hudi.sink.StreamWriteOperatorCoordinator 131 Executor executes action [handle write metadata event for instant ] success!
2023-08-21 23:56:30.393 [] pool-5111-thread-1 INFO  flink apache.hudi.common.util.CleanerUtils 155 Cleaned failed attempts if any
2023-08-21 23:56:30.394 [] pool-5111-thread-1 INFO  flink apache.hudi.common.table.HoodieTableMetaClient 130 Loading HoodieTableMetaClient from abfs://flink@test.dfs.core.windows.net/hudi/mor/
2023-08-21 23:56:30.405 [] pool-5111-thread-1 INFO  flink apache.hudi.common.table.HoodieTableConfig 268 Loading table properties from abfs://flink@test.dfs.core.windows.net/hudi/mor/.hoodie/hoodie.properties
2023-08-21 23:56:30.428 [] pool-5111-thread-1 INFO  flink apache.hudi.common.table.HoodieTableMetaClient 149 Finished Loading Table of type MERGE_ON_READ(version=1, baseFileFormat=PARQUET) from abfs://flink@test.dfs.core.windows.net/hudi/mor/
2023-08-21 23:56:30.428 [] pool-5111-thread-1 INFO  flink apache.hudi.common.table.HoodieTableMetaClient 152 Loading Active commit timeline for abfs://flink@test.dfs.core.windows.net/hudi/mor/
2023-08-21 23:56:30.447 [] pool-5111-thread-1 INFO  flink hudi.common.table.timeline.HoodieActiveTimeline 171 Loaded instants upto : Option{val=[20230821235145161__deltacommit__COMPLETED]}
2023-08-21 23:56:30.448 [] pool-5111-thread-1 INFO  flink hudi.common.table.view.FileSystemViewManager 245 Creating View Manager with storage type :REMOTE_FIRST
2023-08-21 23:56:30.448 [] pool-5111-thread-1 INFO  flink hudi.common.table.view.FileSystemViewManager 265 Creating remote first table view
2023-08-21 23:56:30.450 [] pool-5111-thread-1 INFO  flink org.apache.hudi.client.BaseHoodieWriteClient 841 Generate a new instant time: 20230821235630450 action: deltacommit
2023-08-21 23:56:30.471 [] pool-5111-thread-1 INFO  flink hudi.common.table.timeline.HoodieActiveTimeline 171 Loaded instants upto : Option{val=[20230821235145161__deltacommit__COMPLETED]}
2023-08-21 23:56:30.471 [] pool-5111-thread-1 INFO  flink hudi.common.table.timeline.HoodieActiveTimeline 202 Creating a new instant [==>20230821235630450__deltacommit__REQUESTED]
2023-08-21 23:56:30.527 [] pool-5111-thread-1 INFO  flink hudi.common.table.timeline.HoodieActiveTimeline 632 Checking for file exists ?abfs://flink@test.dfs.core.windows.net/hudi/mor/.hoodie/20230821235630450.deltacommit.requested
2023-08-21 23:56:30.595 [] pool-5111-thread-1 INFO  flink hudi.common.table.timeline.HoodieActiveTimeline 640 Create new file for toInstant ?abfs://flink@test.dfs.core.windows.net/hudi/mor/.hoodie/20230821235630450.deltacommit.inflight
2023-08-21 23:56:30.651 [] flink-akka.actor.default-dispatcher-14 INFO  flink apache.flink.runtime.executiongraph.ExecutionGraph 1435 bucket_assigner (1/1) (1ce0c00a771c0214a3141bd885bcfdb0_e5df0a348bd9bf84b2755743802fc12d_0_0) switched from INITIALIZING to RUNNING.
2023-08-21 23:56:30.658 [] JettyServerThreadPool-8636 INFO  flink hudi.common.table.view.AbstractTableFileSystemView 418 Building file system view for partition ()
2023-08-21 23:56:30.688 [] pool-5111-thread-1 INFO  flink org.apache.hudi.sink.StreamWriteOperatorCoordinator 380 Create instant [20230821235630450] for table [mor] with type [MERGE_ON_READ]
2023-08-21 23:56:30.689 [] pool-5111-thread-1 INFO  flink org.apache.hudi.sink.StreamWriteOperatorCoordinator 131 Executor executes action [initialize instant ] success!
2023-08-21 23:56:30.730 [] JettyServerThreadPool-8636 INFO  flink hudi.common.table.view.AbstractTableFileSystemView 160 addFilesToView: NumFiles=9, NumFileGroups=1, FileGroupsCreationTime=3, StoreTimeTaken=0
2023-08-21 23:56:30.775 [] flink-akka.actor.default-dispatcher-15 INFO  flink apache.flink.runtime.executiongraph.ExecutionGraph 1435 bucket_assigner (1/1) (1ce0c00a771c0214a3141bd885bcfdb0_e5df0a348bd9bf84b2755743802fc12d_0_0) switched from RUNNING to FINISHED.
2023-08-21 23:56:31.705 [] pool-5111-thread-1 INFO  flink org.apache.hudi.client.BaseHoodieWriteClient 219 Committing 20230821235630450 action deltacommit
2023-08-21 23:56:31.705 [] pool-5111-thread-1 INFO  flink apache.hudi.common.table.HoodieTableMetaClient 130 Loading HoodieTableMetaClient from abfs://flink@test.dfs.core.windows.net/hudi/mor/
2023-08-21 23:56:31.718 [] pool-5111-thread-1 INFO  flink apache.hudi.common.table.HoodieTableConfig 268 Loading table properties from abfs://flink@test.dfs.core.windows.net/hudi/mor/.hoodie/hoodie.properties
2023-08-21 23:56:31.744 [] pool-5111-thread-1 INFO  flink apache.hudi.common.table.HoodieTableMetaClient 149 Finished Loading Table of type MERGE_ON_READ(version=1, baseFileFormat=PARQUET) from abfs://flink@test.dfs.core.windows.net/hudi/mor/
2023-08-21 23:56:31.744 [] pool-5111-thread-1 INFO  flink apache.hudi.common.table.HoodieTableMetaClient 152 Loading Active commit timeline for abfs://flink@test.dfs.core.windows.net/hudi/mor/
2023-08-21 23:56:31.769 [] pool-5111-thread-1 INFO  flink hudi.common.table.timeline.HoodieActiveTimeline 171 Loaded instants upto : Option{val=[==>20230821235630450__deltacommit__INFLIGHT]}
2023-08-21 23:56:31.769 [] pool-5111-thread-1 INFO  flink hudi.common.table.view.FileSystemViewManager 245 Creating View Manager with storage type :REMOTE_FIRST
2023-08-21 23:56:31.769 [] pool-5111-thread-1 INFO  flink hudi.common.table.view.FileSystemViewManager 265 Creating remote first table view
2023-08-21 23:56:31.770 [] pool-5111-thread-1 INFO  flink apache.hudi.common.util.CommitUtils 115 Creating  metadata for UPSERT numWriteStats:1 numReplaceFileIds:0
2023-08-21 23:56:31.770 [] pool-5111-thread-1 INFO  flink org.apache.hudi.client.BaseHoodieWriteClient 272 Committing 20230821235630450 action deltacommit
2023-08-21 23:56:31.812 [] pool-5111-thread-1 INFO  flink apache.hudi.common.table.HoodieTableMetaClient 130 Loading HoodieTableMetaClient from abfs://flink@test.dfs.core.windows.net/hudi/mor/
2023-08-21 23:56:31.831 [] pool-5111-thread-1 INFO  flink apache.hudi.common.table.HoodieTableConfig 268 Loading table properties from abfs://flink@test.dfs.core.windows.net/hudi/mor/.hoodie/hoodie.properties
2023-08-21 23:56:31.859 [] pool-5111-thread-1 INFO  flink apache.hudi.common.table.HoodieTableMetaClient 149 Finished Loading Table of type MERGE_ON_READ(version=1, baseFileFormat=PARQUET) from abfs://flink@test.dfs.core.windows.net/hudi/mor/
2023-08-21 23:56:31.859 [] pool-5111-thread-1 INFO  flink apache.hudi.common.table.HoodieTableMetaClient 152 Loading Active commit timeline for abfs://flink@test.dfs.core.windows.net/hudi/mor/
2023-08-21 23:56:31.878 [] pool-5111-thread-1 INFO  flink hudi.common.table.timeline.HoodieActiveTimeline 171 Loaded instants upto : Option{val=[==>20230821235630450__deltacommit__INFLIGHT]}
2023-08-21 23:56:31.878 [] pool-5111-thread-1 INFO  flink hudi.common.table.view.FileSystemViewManager 245 Creating View Manager with storage type :REMOTE_FIRST
2023-08-21 23:56:31.878 [] pool-5111-thread-1 INFO  flink hudi.common.table.view.FileSystemViewManager 265 Creating remote first table view
2023-08-21 23:56:31.888 [] pool-5111-thread-1 INFO  flink hudi.common.table.timeline.HoodieActiveTimeline 220 Marking instant complete [==>20230821235630450__deltacommit__INFLIGHT]
2023-08-21 23:56:31.888 [] pool-5111-thread-1 INFO  flink hudi.common.table.timeline.HoodieActiveTimeline 632 Checking for file exists ?abfs://flink@test.dfs.core.windows.net/hudi/mor/.hoodie/20230821235630450.deltacommit.inflight
2023-08-21 23:56:31.982 [] pool-5111-thread-1 INFO  flink hudi.common.table.timeline.HoodieActiveTimeline 640 Create new file for toInstant ?abfs://flink@test.dfs.core.windows.net/hudi/mor/.hoodie/20230821235630450.deltacommit
2023-08-21 23:56:31.982 [] pool-5111-thread-1 INFO  flink hudi.common.table.timeline.HoodieActiveTimeline 224 Completed [==>20230821235630450__deltacommit__INFLIGHT]
2023-08-21 23:56:31.982 [] pool-5111-thread-1 INFO  flink apache.hudi.common.table.HoodieTableMetaClient 130 Loading HoodieTableMetaClient from abfs://flink@test.dfs.core.windows.net/hudi/mor/
2023-08-21 23:56:31.993 [] pool-5111-thread-1 INFO  flink apache.hudi.common.table.HoodieTableConfig 268 Loading table properties from abfs://flink@test.dfs.core.windows.net/hudi/mor/.hoodie/hoodie.properties
2023-08-21 23:56:32.018 [] pool-5111-thread-1 INFO  flink apache.hudi.common.table.HoodieTableMetaClient 149 Finished Loading Table of type MERGE_ON_READ(version=1, baseFileFormat=PARQUET) from abfs://flink@test.dfs.core.windows.net/hudi/mor/
2023-08-21 23:56:32.018 [] pool-5111-thread-1 INFO  flink apache.hudi.common.table.HoodieTableMetaClient 152 Loading Active commit timeline for abfs://flink@test.dfs.core.windows.net/hudi/mor/
2023-08-21 23:56:32.036 [] pool-5111-thread-1 INFO  flink hudi.common.table.timeline.HoodieActiveTimeline 171 Loaded instants upto : Option{val=[20230821235630450__deltacommit__COMPLETED]}
2023-08-21 23:56:32.036 [] pool-5111-thread-1 INFO  flink hudi.common.table.view.FileSystemViewManager 245 Creating View Manager with storage type :REMOTE_FIRST
2023-08-21 23:56:32.037 [] pool-5111-thread-1 INFO  flink hudi.common.table.view.FileSystemViewManager 265 Creating remote first table view
2023-08-21 23:56:32.297 [] pool-5111-thread-1 INFO  flink apache.hudi.common.fs.FSUtils 732 Removed directory at abfs://flink@test.dfs.core.windows.net/hudi/mor/.hoodie/.temp/20230821235630450
2023-08-21 23:56:32.297 [] pool-5111-thread-1 INFO  flink org.apache.hudi.client.BaseHoodieWriteClient 235 Committed 20230821235630450
2023-08-21 23:56:32.298 [] pool-5111-thread-1 INFO  flink org.apache.hudi.client.BaseHoodieWriteClient 571 Start to archive synchronously.
2023-08-21 23:56:32.321 [] pool-5111-thread-1 INFO  flink hudi.common.table.timeline.HoodieActiveTimeline 171 Loaded instants upto : Option{val=[20230821235630450__deltacommit__COMPLETED]}
2023-08-21 23:56:32.321 [] pool-5111-thread-1 INFO  flink org.apache.hudi.client.HoodieTimelineArchiver 179 No Instants to archive
2023-08-21 23:56:32.426 [] pool-5111-thread-1 INFO  flink org.apache.hudi.sink.StreamWriteOperatorCoordinator 542 Commit instant [20230821235630450] success!
2023-08-21 23:56:32.431 [] pool-5111-thread-1 INFO  flink apache.hudi.common.table.HoodieTableMetaClient 130 Loading HoodieTableMetaClient from abfs://flink@test.dfs.core.windows.net/hudi/mor/
2023-08-21 23:56:32.445 [] pool-5111-thread-1 INFO  flink apache.hudi.common.table.HoodieTableConfig 268 Loading table properties from abfs://flink@test.dfs.core.windows.net/hudi/mor/.hoodie/hoodie.properties
2023-08-21 23:56:32.475 [] pool-5111-thread-1 INFO  flink apache.hudi.common.table.HoodieTableMetaClient 149 Finished Loading Table of type MERGE_ON_READ(version=1, baseFileFormat=PARQUET) from abfs://flink@test.dfs.core.windows.net/hudi/mor/
2023-08-21 23:56:32.475 [] pool-5111-thread-1 INFO  flink apache.hudi.common.table.HoodieTableMetaClient 152 Loading Active commit timeline for abfs://flink@test.dfs.core.windows.net/hudi/mor/
2023-08-21 23:56:32.497 [] pool-5111-thread-1 INFO  flink hudi.common.table.timeline.HoodieActiveTimeline 171 Loaded instants upto : Option{val=[20230821235630450__deltacommit__COMPLETED]}
2023-08-21 23:56:32.501 [] pool-5111-thread-1 ERROR flink org.apache.hudi.sink.StreamWriteOperatorCoordinator 140 Executor executes action [handle end input event for instant 20230821235630450] error
java.lang.NoClassDefFoundError: org/apache/calcite/plan/RelOptRule
    at java.lang.Class.getDeclaredMethods0(Native Method) ~[?:?]
    at java.lang.Class.privateGetDeclaredMethods(Unknown Source) ~[?:?]
    at java.lang.Class.getMethodsRecursive(Unknown Source) ~[?:?]
    at java.lang.Class.getMethod0(Unknown Source) ~[?:?]
    at java.lang.Class.getMethod(Unknown Source) ~[?:?]
    at org.apache.hudi.hive.ddl.HMSDDLExecutor.<init>(HMSDDLExecutor.java:86) ~[blob_p-f6d17d46787238a244a654163d9291f028bd4b50-cd8c32c0bf152df1c672bb269a800586:0.13.0]
    at org.apache.hudi.hive.HoodieHiveSyncClient.<init>(HoodieHiveSyncClient.java:87) ~[blob_p-f6d17d46787238a244a654163d9291f028bd4b50-cd8c32c0bf152df1c672bb269a800586:0.13.0]
    at org.apache.hudi.hive.HiveSyncTool.initSyncClient(HiveSyncTool.java:119) ~[blob_p-f6d17d46787238a244a654163d9291f028bd4b50-cd8c32c0bf152df1c672bb269a800586:0.13.0]
    at org.apache.hudi.hive.HiveSyncTool.<init>(HiveSyncTool.java:113) ~[blob_p-f6d17d46787238a244a654163d9291f028bd4b50-cd8c32c0bf152df1c672bb269a800586:0.13.0]
    at org.apache.hudi.sink.utils.HiveSyncContext.hiveSyncTool(HiveSyncContext.java:81) ~[blob_p-f6d17d46787238a244a654163d9291f028bd4b50-cd8c32c0bf152df1c672bb269a800586:0.13.0]
    at org.apache.hudi.sink.StreamWriteOperatorCoordinator.doSyncHive(StreamWriteOperatorCoordinator.java:338) ~[blob_p-f6d17d46787238a244a654163d9291f028bd4b50-cd8c32c0bf152df1c672bb269a800586:0.13.0]
    at org.apache.hudi.sink.StreamWriteOperatorCoordinator.syncHive(StreamWriteOperatorCoordinator.java:329) ~[blob_p-f6d17d46787238a244a654163d9291f028bd4b50-cd8c32c0bf152df1c672bb269a800586:0.13.0]
    at org.apache.hudi.sink.StreamWriteOperatorCoordinator.handleEndInputEvent(StreamWriteOperatorCoordinator.java:433) ~[blob_p-f6d17d46787238a244a654163d9291f028bd4b50-cd8c32c0bf152df1c672bb269a800586:0.13.0]
    at org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$handleEventFromOperator$3(StreamWriteOperatorCoordinator.java:281) ~[blob_p-f6d17d46787238a244a654163d9291f028bd4b50-cd8c32c0bf152df1c672bb269a800586:0.13.0]
    at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:130) ~[blob_p-f6d17d46787238a244a654163d9291f028bd4b50-cd8c32c0bf152df1c672bb269a800586:0.13.0]
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [?:?]
    at java.util.concurrent.FutureTask.run(Unknown Source) [?:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
    at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: java.lang.ClassNotFoundException: org.apache.calcite.plan.RelOptRule
    at jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown Source) ~[?:?]
    at jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown Source) ~[?:?]
    at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
    ... 20 more
2023-08-21 23:56:32.501 [] flink-akka.actor.default-dispatcher-15 INFO  flink apache.flink.runtime.executiongraph.ExecutionGraph 1435 stream_write: mor (1/1) (1ce0c00a771c0214a3141bd885bcfdb0_de00fca8057a992b9689a76776a87e88_0_0) switched from RUNNING to FINISHED.
2023-08-21 23:56:32.501 [] flink-akka.actor.default-dispatcher-15 INFO  flink apache.flink.runtime.executiongraph.ExecutionGraph 1435 compact_plan_generate (1/1) (1ce0c00a771c0214a3141bd885bcfdb0_454d170138cbd567f9c11b29e690b935_0_0) switched from RUNNING to FINISHED.
2023-08-21 23:56:32.501 [] flink-akka.actor.default-dispatcher-15 INFO  flink apache.flink.runtime.executiongraph.ExecutionGraph 1435 compact_task -> Sink: compact_commit (1/1) (1ce0c00a771c0214a3141bd885bcfdb0_848dfd76319f29eb5fbacd6c182b1d35_0_0) switched from RUNNING to FINISHED.
2023-08-21 23:56:32.502 [] flink-akka.actor.default-dispatcher-14 INFO  flink flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager 292 Clearing resource requirements of job b38f5c29d67259e28847e178a6145495
2023-08-21 23:56:32.502 [] flink-akka.actor.default-dispatcher-15 INFO  flink apache.flink.runtime.executiongraph.ExecutionGraph 1152 Job insert-into_hudi_catalog.hudicatalogdb.mor (b38f5c29d67259e28847e178a6145495) switched from state RUNNING to FINISHED.
2023-08-21 23:56:32.502 [] flink-akka.actor.default-dispatcher-15 INFO  flink apache.flink.runtime.checkpoint.CheckpointCoordinator 411 Stopping checkpoint coordinator for job b38f5c29d67259e28847e178a6145495.
2023-08-21 23:56:32.503 [] flink-akka.actor.default-dispatcher-15 INFO  flink apache.flink.runtime.jobmaster.JobMaster 297 Trying to recover from a global failure.
org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'stream_write: mor' (operator de00fca8057a992b9689a76776a87e88).
    at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:617) ~[flink-dist-1.16.0-0.0.18.jar:1.16.0-0.0.18]
    at org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$start$0(StreamWriteOperatorCoordinator.java:190) ~[?:?]
    at org.apache.hudi.sink.utils.NonThrownExecutor.handleException(NonThrownExecutor.java:142) ~[?:?]
    at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:133) ~[?:?]
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
    at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
    at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: org.apache.hudi.exception.HoodieException: Executor executes action [handle end input event for instant 20230821235630450] error
    ... 8 more
Caused by: java.lang.NoClassDefFoundError: org/apache/calcite/plan/RelOptRule
    at java.lang.Class.getDeclaredMethods0(Native Method) ~[?:?]
    at java.lang.Class.privateGetDeclaredMethods(Unknown Source) ~[?:?]
    at java.lang.Class.getMethodsRecursive(Unknown Source) ~[?:?]
    at java.lang.Class.getMethod0(Unknown Source) ~[?:?]
    at java.lang.Class.getMethod(Unknown Source) ~[?:?]
    at org.apache.hudi.hive.ddl.HMSDDLExecutor.<init>(HMSDDLExecutor.java:86) ~[?:?]
    at org.apache.hudi.hive.HoodieHiveSyncClient.<init>(HoodieHiveSyncClient.java:87) ~[?:?]
    at org.apache.hudi.hive.HiveSyncTool.initSyncClient(HiveSyncTool.java:119) ~[?:?]
    at org.apache.hudi.hive.HiveSyncTool.<init>(HiveSyncTool.java:113) ~[?:?]
    at org.apache.hudi.sink.utils.HiveSyncContext.hiveSyncTool(HiveSyncContext.java:81) ~[?:?]
    at org.apache.hudi.sink.StreamWriteOperatorCoordinator.doSyncHive(StreamWriteOperatorCoordinator.java:338) ~[?:?]
    at org.apache.hudi.sink.StreamWriteOperatorCoordinator.syncHive(StreamWriteOperatorCoordinator.java:329) ~[?:?]
    at org.apache.hudi.sink.StreamWriteOperatorCoordinator.handleEndInputEvent(StreamWriteOperatorCoordinator.java:433) ~[?:?]
    at org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$handleEventFromOperator$3(StreamWriteOperatorCoordinator.java:281) ~[?:?]
    at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:130) ~[?:?]
    ... 5 more
Caused by: java.lang.ClassNotFoundException: org.apache.calcite.plan.RelOptRule
    at jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown Source) ~[?:?]
    at jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown Source) ~[?:?]
    at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
    at java.lang.Class.getDeclaredMethods0(Native Method) ~[?:?]
    at java.lang.Class.privateGetDeclaredMethods(Unknown Source) ~[?:?]
    at java.lang.Class.getMethodsRecursive(Unknown Source) ~[?:?]
    at java.lang.Class.getMethod0(Unknown Source) ~[?:?]
    at java.lang.Class.getMethod(Unknown Source) ~[?:?]
    at org.apache.hudi.hive.ddl.HMSDDLExecutor.<init>(HMSDDLExecutor.java:86) ~[?:?]
    at org.apache.hudi.hive.HoodieHiveSyncClient.<init>(HoodieHiveSyncClient.java:87) ~[?:?]
    at org.apache.hudi.hive.HiveSyncTool.initSyncClient(HiveSyncTool.java:119) ~[?:?]
    at org.apache.hudi.hive.HiveSyncTool.<init>(HiveSyncTool.java:113) ~[?:?]
    at org.apache.hudi.sink.utils.HiveSyncContext.hiveSyncTool(HiveSyncContext.java:81) ~[?:?]
    at org.apache.hudi.sink.StreamWriteOperatorCoordinator.doSyncHive(StreamWriteOperatorCoordinator.java:338) ~[?:?]
    at org.apache.hudi.sink.StreamWriteOperatorCoordinator.syncHive(StreamWriteOperatorCoordinator.java:329) ~[?:?]
    at org.apache.hudi.sink.StreamWriteOperatorCoordinator.handleEndInputEvent(StreamWriteOperatorCoordinator.java:433) ~[?:?]
    at org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$handleEventFromOperator$3(StreamWriteOperatorCoordinator.java:281) ~[?:?]
    at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:130) ~[?:?]
    ... 5 more
2023-08-21 23:56:32.510 [] flink-akka.actor.default-dispatcher-15 INFO  flink apache.flink.runtime.dispatcher.StandaloneDispatcher 1113 Job b38f5c29d67259e28847e178a6145495 reached terminal state FINISHED.
2023-08-21 23:56:32.671 [] cluster-io-thread-4 INFO  flink apache.flink.runtime.history.FsJobArchivist 91 Job b38f5c29d67259e28847e178a6145495 has been archived at abfs://flink@test.dfs.core.windows.net/completed-jobs/b38f5c29d67259e28847e178a6145495.
2023-08-21 23:56:32.820 [] cluster-io-thread-3 INFO  flink apache.flink.runtime.dispatcher.StandaloneDispatcher 1161 Job b38f5c29d67259e28847e178a6145495 has been registered for cleanup in the JobResultStore after reaching a terminal state.
2023-08-21 23:56:32.820 [] flink-akka.actor.default-dispatcher-13 INFO  flink apache.flink.runtime.jobmaster.JobMaster 434 Stopping the JobMaster for job 'insert-into_hudi_catalog.hudicatalogdb.mor' (b38f5c29d67259e28847e178a6145495).
2023-08-21 23:56:32.821 [] flink-akka.actor.default-dispatcher-13 INFO  flink org.apache.hudi.client.BaseHoodieClient 111 Stopping Timeline service !!
2023-08-21 23:56:32.821 [] flink-akka.actor.default-dispatcher-13 INFO  flink apache.hudi.client.embedded.EmbeddedTimelineService 151 Closing Timeline server
2023-08-21 23:56:32.821 [] flink-akka.actor.default-dispatcher-13 INFO  flink apache.hudi.timeline.service.TimelineService 399 Closing Timeline Service
2023-08-21 23:56:32.821 [] flink-akka.actor.default-dispatcher-13 INFO  flink io.javalin.Javalin 22 Stopping Javalin ...
2023-08-21 23:56:32.826 [] flink-akka.actor.default-dispatcher-13 INFO  flink io.javalin.Javalin 22 Javalin has stopped
2023-08-21 23:56:32.826 [] flink-akka.actor.default-dispatcher-13 INFO  flink apache.hudi.timeline.service.TimelineService 408 Closed Timeline Service
2023-08-21 23:56:32.826 [] flink-akka.actor.default-dispatcher-13 INFO  flink apache.hudi.client.embedded.EmbeddedTimelineService 155 Closed Timeline server
2023-08-21 23:56:32.827 [] flink-akka.actor.default-dispatcher-13 INFO  flink org.apache.hudi.client.BaseHoodieClient 111 Stopping Timeline service !!
2023-08-21 23:56:32.827 [] flink-akka.actor.default-dispatcher-13 INFO  flink apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore 182 Shutting down
2023-08-21 23:56:32.836 [] flink-akka.actor.default-dispatcher-13 INFO  flink apache.flink.kubernetes.highavailability.KubernetesCheckpointIDCounter 82 Shutting down.
2023-08-21 23:56:32.836 [] flink-akka.actor.default-dispatcher-13 INFO  flink apache.flink.kubernetes.highavailability.KubernetesCheckpointIDCounter 84 Removing counter from ConfigMap flink-d064232b26594088ae78032391934433-b38f5c29d67259e28847e178a6145495-config-map
2023-08-21 23:56:32.845 [] flink-akka.actor.default-dispatcher-13 INFO  flink apache.flink.runtime.jobmaster.JobMaster 525 Disconnect TaskExecutor 10.244.7.3:6122-d80422 because: Stopping JobMaster for job 'insert-into_hudi_catalog.hudicatalogdb.mor' (b38f5c29d67259e28847e178a6145495).

Adding jar to the sql-client

image

my directory image

danny0405 commented 1 year ago

Instead of ADD JAR, did you try to put these two jars under the flink lib directory?

Riddle4045 commented 1 year ago

Instead of ADD JAR, did you try to put these two jars under the flink lib directory?

@danny0405 do you mean the Flink lib directory in the JM/TM? my snapshot of directory is from the lib directory of SQL client.

danny0405 commented 1 year ago

do you mean the Flink lib directory in the JM/TM

Yes

Riddle4045 commented 1 year ago

do you mean the Flink lib directory in the JM/TM

Yes

@danny0405 unfortunately, the Flink RT is pre-packaged and I don't have control over it. it has deps from hadoop 3.x and hive 3.1.2 based on the this document it looks like Hudi doesn't work with Hadoop 3.3.2 is that still true or the doc is stale?

danny0405 commented 1 year ago

Hudi could work with Hadoop 3.3.2.

galadrielwithlaptop commented 1 year ago

@danny0405 . Hi.

Flink - 1.16.0 Hudi - 0.13.0 Hive Metastore - 3.1.2

Steps to repro:

  1. We need to add calcite-core dependency in the server classpath.
  2. We need to follow this doc: https://hudi.apache.org/docs/syncing_metastore#flink-setup Which basically tells to use hive3, Hadoop3 profile compiled hudi jar and keep hudi-mr-bundle in Hive Metastore auxlib directory.

PS: this is to get rid of Class Exceptions we got in the process.

Commands:

  1. create catalog hive with( 'type'='hudi', 'mode'='hms', 'hive.conf.dir'='/opt/hive-conf');

  2. USE CATALOG hive;

  3. ADD JAR ‘hudi-flink-0.3.0-1.16.0.jar’; In order to create a new table.

  4. CREATE TABLE hive3t9( uuid VARCHAR(20), name VARCHAR(10), age INT, partition VARCHAR(20) ) PARTITIONED BY (partition) WITH ( 'connector' = 'hudi', 'path' = 'abfs://xx@xx.dfs.core.windows.net/hive3t9','hoodie.table.base.fi le.format' = 'PARQUET', 'table.type' = 'MERGE_ON_READ', -- If MERGE_ON_READ, hive query will not have output until th e parquet file is generated 'hive_sync.enable' = 'true', -- Required. To enable hive synchronization 'hive_sync.mode' = 'hms', -- Required. Setting hive sync mode to hms, default jdbc 'hive_sync.metastore.uris' = 'thrift://hive-metastore:9083/' -- Required. The port need set on );

Insert Data into Table:

  1. INSERT INTO hive3t8 VALUES ('8722', 'ehkjhdwumj', 87, 'Maths’);

COW Tables: After these steps, cow just works fine and we are able to see data in Trino MOR Tables: We see the job as successful in Flink UI. The data also gets written to ABFS directory. But when Trino tries to read the data, it fails. And the catch is, the data gets compacted to a column-based parquet file format after every 5 commits. This compaction jobs fails.

As we dived into TM logs, we got these exceptions:

Caused by: java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(Unknown Source) ~[?:?] at java.util.concurrent.Semaphore.acquire(Unknown Source) ~[?:?] at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3556) ~[flink-hadoop-dep-1.16.0-0.0.18.jar:?] at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521) ~[flink-hadoop-dep-1.16.0-0.0.18.jar:?] at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540) ~[flink-hadoop-dep-1.16.0-0.0.18.jar:?] at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365) ~[flink-hadoop-dep-1.16.0-0.0.18.jar:?]

Solution to above problem: Solution: We upgraded Hadoop to 3.3.5 in JM/TM and compiled hudi-flink with the same version owing to https://issues.apache.org/jira/browse/HADOOP-17779

Exception 2: Exception 2: Coming out of the previous error, we got this.

**2023-09-21 06:11:22.564 [] pool-14-thread-1 ERROR flink apache.hudi.sink.compact.CompactOperator 140 Executor executes action [Execute compaction for instant 20230917184914418 from task 0] error org.apache.hudi.exception.HoodieInsertException: Failed to close the Insert Handle for path abfs://xx@xx.dfs.core.windows.net/hive3t8/Maths/cfe09673-7c1c-4e6c-9758-a44d531a1a03_0-1-0_20230917184914418.parquet at org.apache.hudi.io.HoodieCreateHandle.close(HoodieCreateHandle.java:217) ~[hudi-flink1.16-bundle-0.13.0.jar:0.13.0] at org.apache.hudi.table.HoodieFlinkCopyOnWriteTable.handleInsert(HoodieFlinkCopyOnWriteTable.java:408) ~[hudi-flink1.16-bundle-0.13.0.jar:0.13.0] at org.apache.hudi.table.action.compact.CompactionExecutionHelper.writeFileAndGetWriteStats(CompactionExecutionHelper.java:68) ~[hudi-flink1.16-bundle-0.13.0.jar:0.13.0] at org.apache.hudi.table.action.compact.HoodieCompactor.compact(HoodieCompactor.java:231) ~[hudi-flink1.16-bundle-0.13.0.jar:0.13.0] at org.apache.hudi.table.action.compact.HoodieCompactor.compact(HoodieCompactor.java:144) ~[hudi-flink1.16-bundle-0.13.0.jar:0.13.0] at org.apache.hudi.sink.compact.CompactOperator.doCompaction(CompactOperator.java:136) ~[hudi-flink1.16-bundle-0.13.0.jar:0.13.0] at org.apache.hudi.sink.compact.CompactOperator.lambda$processElement$0(CompactOperator.java:119) ~[hudi-flink1.16-bundle-0.13.0.jar:0.13.0] at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:130) ~[hudi-flink1.16-bundle-0.13.0.jar:0.13.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?] at java.lang.Thread.run(Unknown Source) [?:?] Caused by: java.io.IOException: Failed with java.io.IOException while processing file/directory :[/hive3t8/Maths/cfe09673-7c1c-4e6c-9758-a44d531a1a03_0-1-0_20230917184914418.parquet] in method:[java.lang.InterruptedException] at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:?] at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?] at jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?] at java.lang.reflect.Constructor.newInstance(Unknown Source) ~[?:?] at org.apache.hadoop.io.IOUtils.wrapWithMessage(IOUtils.java:514) ~[flink-hadoop-dep-1.16-SNAPSHOT.jar:?] at org.apache.hadoop.io.IOUtils.wrapException(IOUtils.java:497) ~[flink-hadoop-dep-1.16-SNAPSHOT.jar:?] at org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream.close(AbfsOutputStream.java:495) ~[flink-hadoop-dep-1.16-SNAPSHOT.jar:?] at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:77) ~[flink-hadoop-dep-1.16-SNAPSHOT.jar:?] at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106) ~[flink-hadoop-dep-1.16-SNAPSHOT.jar:?] at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:77) ~[flink-hadoop-dep-1.16-SNAPSHOT.jar:?] at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106) ~[flink-hadoop-dep-1.16-SNAPSHOT.jar:?] at org.apache.hudi.common.fs.SizeAwareFSDataOutputStream.close(SizeAwareFSDataOutputStream.java:75) ~[hudi-flink1.16-bundle-0.13.0.jar:0.13.0] at org.apache.parquet.hadoop.util.HadoopPositionOutputStream.close(HadoopPositionOutputStream.java:64) ~[hudi-flink1.16-bundle-0.13.0.jar:0.13.0] at org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:1106) ~[hudi-flink1.16-bundle-0.13.0.jar:0.13.0] at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:132) ~[hudi-flink1.16-bundle-0.13.0.jar:0.13.0] at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:319) ~[hudi-flink1.16-bundle-0.13.0.jar:0.13.0] at org.apache.hudi.io.storage.HoodieAvroParquetWriter.close(HoodieAvroParquetWriter.java:84) ~[hudi-flink1.16-bundle-0.13.0.jar:0.13.0] at org.apache.hudi.io.HoodieCreateHandle.close(HoodieCreateHandle.java:205) ~[hudi-flink1.16-bundle-0.13.0.jar:0.13.0] ... 10 more Caused by: java.io.IOException: java.lang.InterruptedException at org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream.waitForAppendsToComplete(AbfsOutputStream.java:608) ~[flink-hadoop-dep-1.16-SNAPSHOT.jar:?] at org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream.flushWrittenBytesToService(AbfsOutputStream.java:615) ~[flink-hadoop-dep-1.16-SNAPSHOT.jar:?] at org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream.flushInternal(AbfsOutputStream.java:530) ~[flink-hadoop-dep-1.16-SNAPSHOT.jar:?] at org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream.close(AbfsOutputStream.java:489) ~[flink-hadoop-dep-1.16-SNAPSHOT.jar:?] at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:77) ~[flink-hadoop-dep-1.16-SNAPSHOT.jar:?] at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106) ~[flink-hadoop-dep-1.16-SNAPSHOT.jar:?] at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:77) ~[flink-hadoop-dep-1.16-SNAPSHOT.jar:?] at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106) ~[flink-hadoop-dep-1.16-SNAPSHOT.jar:?] at org.apache.hudi.common.fs.SizeAwareFSDataOutputStream.close(SizeAwareFSDataOutputStream.java:75) ~[hudi-flink1.16-bundle-0.13.0.jar:0.13.0] at org.apache.parquet.hadoop.util.HadoopPositionOutputStream.close(HadoopPositionOutputStream.java:64) ~[hudi-flink1.16-bundle-0.13.0.jar:0.13.0] at org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:1106) ~[hudi-flink1.16-bundle-0.13.0.jar:0.13.0] at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:132) ~[hudi-flink1.16-bundle-0.13.0.jar:0.13.0] at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:319) ~[hudi-flink1.16-bundle-0.13.0.jar:0.13.0] at org.apache.hudi.io.storage.HoodieAvroParquetWriter.close(HoodieAvroParquetWriter.java:84) ~[hudi-flink1.16-bundle-0.13.0.jar:0.13.0] at org.apache.hudi.io.HoodieCreateHandle.close(HoodieCreateHandle.java:205) ~[hudi-flink1.16-bundle-0.13.0.jar:0.13.0] ... 10 more Caused by: java.lang.InterruptedException at org.apache.hadoop.thirdparty.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:523) ~[flink-hadoop-dep-1.16-SNAPSHOT.jar:?] at org.apache.hadoop.thirdparty.com.google.common.util.concurrent.FluentFuture$TrustedFuture.get(FluentFuture.java:88) ~[flink-hadoop-dep-1.16-SNAPSHOT.jar:?] at org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream.waitForAppendsToComplete(AbfsOutputStream.java:596) ~[flink-hadoop-dep-1.16-SNAPSHOT.jar:?] at org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream.flushWrittenBytesToService(AbfsOutputStream.java:615) ~[flink-hadoop-dep-1.16-SNAPSHOT.jar:?] at org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream.flushInternal(AbfsOutputStream.java:530) ~[flink-hadoop-dep-1.16-SNAPSHOT.jar:?] at org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream.close(AbfsOutputStream.java:489) ~[flink-hadoop-dep-1.16-SNAPSHOT.jar:?] at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:77) ~[flink-hadoop-dep-1.16-SNAPSHOT.jar:?] at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106) ~[flink-hadoop-dep-1.16-SNAPSHOT.jar:?] at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:77) ~[flink-hadoop-dep-1.16-SNAPSHOT.jar:?] at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106) ~[flink-hadoop-dep-1.16-SNAPSHOT.jar:?] at org.apache.hudi.common.fs.SizeAwareFSDataOutputStream.close(SizeAwareFSDataOutputStream.java:75) ~[hudi-flink1.16-bundle-0.13.0.jar:0.13.0] at org.apache.parquet.hadoop.util.HadoopPositionOutputStream.close(HadoopPositionOutputStream.java:64) ~[hudi-flink1.16-bundle-0.13.0.jar:0.13.0] at org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:1106) ~[hudi-flink1.16-bundle-0.13.0.jar:0.13.0] at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:132) ~[hudi-flink1.16-bundle-0.13.0.jar:0.13.0] at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:319) ~[hudi-flink1.16-bundle-0.13.0.jar:0.13.0] at org.apache.hudi.io.storage.HoodieAvroParquetWriter.close(HoodieAvroParquetWriter.java:84) ~[hudi-flink1.16-bundle-0.13.0.jar:0.13.0] at org.apache.hudi.io.HoodieCreateHandle.close(HoodieCreateHandle.java:205) ~[hudi-flink1.16-bundle-0.13.0.jar:0.13.0] ... 10 more**

I tried to repro the exception 2 in my local system. The error persists and it fails with this exception:

rg.apache.hudi.exception.HoodieIOException: Could not load Hoodie properties from file:/home/walls/hive3t9/.hoodie/hoodie.properties at org.apache.hudi.common.table.HoodieTableConfig.(HoodieTableConfig.java:289) ~[hudi-flink1.16-bundle-0.13.0.jar:0.13.0] at org.apache.hudi.common.table.HoodieTableMetaClient.(HoodieTableMetaClient.java:138) ~[hudi-flink1.16-bundle-0.13.0.jar:0.13.0] at org.apache.hudi.common.table.HoodieTableMetaClient.newMetaClient(HoodieTableMetaClient.java:689) ~[hudi-flink1.16-bundle-0.13.0.jar:0.13.0] at org.apache.hudi.common.table.HoodieTableMetaClient.access$000(HoodieTableMetaClient.java:81) ~[hudi-flink1.16-bundle-0.13.0.jar:0.13.0] at org.apache.hudi.common.table.HoodieTableMetaClient$Builder.build(HoodieTableMetaClient.java:770) ~[hudi-flink1.16-bundle-0.13.0.jar:0.13.0] at org.apache.hudi.table.HoodieFlinkTable.create(HoodieFlinkTable.java:62) ~[hudi-flink1.16-bundle-0.13.0.jar:0.13.0] at org.apache.hudi.client.HoodieFlinkWriteClient.getHoodieTable(HoodieFlinkWriteClient.java:467) ~[hudi-flink1.16-bundle-0.13.0.jar:0.13.0] at org.apache.hudi.sink.compact.CompactOperator.reloadWriteConfig(CompactOperator.java:151) ~[hudi-flink1.16-bundle-0.13.0.jar:0.13.0] at org.apache.hudi.sink.compact.CompactOperator.lambda$processElement$0(CompactOperator.java:119) ~[hudi-flink1.16-bundle-0.13.0.jar:0.13.0] at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:130) ~[hudi-flink1.16-bundle-0.13.0.jar:0.13.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_382] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_382] at java.lang.Thread.run(Thread.java:750) [?:1.8.0_382] Caused by: java.nio.channels.ClosedByInterruptException at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202) ~[?:1.8.0_382] at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:293) ~[?:1.8.0_382] at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.seek(RawLocalFileSystem.java:157) ~[flink-hadoop-dep-1.16.0-0.0.18.jar:?] at org.apache.hadoop.fs.BufferedFSInputStream.seek(BufferedFSInputStream.java:102) ~[flink-hadoop-dep-1.16.0-0.0.18.jar:?] at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:71) ~[flink-hadoop-dep-1.16.0-0.0.18.jar:?] at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.readChunk(ChecksumFileSystem.java:251) ~[flink-hadoop-dep-1.16.0-0.0.18.jar:?] at org.apache.hadoop.fs.FSInputChecker.readChecksumChunk(FSInputChecker.java:300) ~[flink-hadoop-dep-1.16.0-0.0.18.jar:?] at org.apache.hadoop.fs.FSInputChecker.read1(FSInputChecker.java:252) ~[flink-hadoop-dep-1.16.0-0.0.18.jar:?] at org.apache.hadoop.fs.FSInputChecker.read(FSInputChecker.java:197) ~[flink-hadoop-dep-1.16.0-0.0.18.jar:?] at java.io.DataInputStream.read(DataInputStream.java:149) ~[?:1.8.0_382] at java.io.DataInputStream.read(DataInputStream.java:100) ~[?:1.8.0_382] at java.util.Properties$LineReader.readLine(Properties.java:435) ~[?:1.8.0_382] at java.util.Properties.load0(Properties.java:353) ~[?:1.8.0_382] at java.util.Properties.load(Properties.java:341) ~[?:1.8.0_382] at org.apache.hudi.common.table.HoodieTableConfig.fetchConfigs(HoodieTableConfig.java:337) ~[hudi-flink1.16-bundle-0.13.0.jar:0.13.0] at org.apache.hudi.common.table.HoodieTableConfig.(HoodieTableConfig.java:270) ~[hudi-flink1.16-bundle-0.13.0.jar:0.13.0] ... 12 more

PS: On my local system the job finishes in 2s, as comapred to 10s on Azure System

danny0405 commented 1 year ago

@galadrielwithlaptop Looks like there are many fs relating errors, in your local env, what filesystem did you use?

galadrielwithlaptop commented 1 year ago

@danny0405 Local filesystem only - path was , file://home/walls/hive3t9

danny0405 commented 1 year ago

Were you capable to debug the local fs test failures?

ramkrish86 commented 1 year ago

@danny0405 - There are two stack traces in the above comment from @galadrielwithlaptop . What we can see is that with ABFS support or local system before the compaction could get completed there is an interrupted exception . But we can see that the Compaction job gets picked up in the Flink job graph but seems that before the compaction is over the parent thread is probably closed. When we manually run the compaction tool we are able to make it run. Is there something like a config that is missing over here? Any inputs could help us as from code perspective we are not able to see any issues over here.