apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.37k stars 2.42k forks source link

[SUPPORT] Flink Table planner not loading problem #8265

Open huyuanfeng2018 opened 1 year ago

huyuanfeng2018 commented 1 year ago

Describe the problem you faced Version Information

hudi-0.13.0 flink1.16 flink sql( HUDI CREATE DDL):

CREATE TABLE `ods_action_log_huya_hudi_nopro_test` (

  `stime` VARCHAR  PRIMARY KEY,
  `product` VARCHAR,
  `eid` VARCHAR,
  `curpage` VARCHAR,
  `curlocation` VARCHAR,
  `mid` VARCHAR ,
  `yyuid` BIGINT,
  `prop` VARCHAR,
  `dt` VARCHAR,
  `hour` VARCHAR
) PARTITIONED BY (`dt`, `hour`) 
WITH (
  'connector' = 'hudi',
  'write.tasks' = '64',
  'write.operation' = 'insert',  -- The write operation, that this write should do (insert or upsert is supported)
  'path' = 'hdfs://huyaopclusternew/user/hive/warehouse/dw_rt_ods.db/ods_action_log_huya_hudi_nopro_test',  
  'table.type' = 'COPY_ON_WRITE',  -- If MERGE_ON_READ, hive query will not have output until the parquet file is generated
  'hoodie.bucket.index.num.buckets' = '1',
  'hoodie.bucket.index.hash.field' = 'stime',
  'hoodie.clean.async' = 'true',
  'hoodie.cleaner.commits.retained' = '5',
  'hoodie.datasource.write.hive_style_partitioning' = 'true',
  'clustering.async.enabled' = 'true'
);

When I turn on 'clustering.async.enabled' = 'true' Start error:

Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.planner.codegen.sort.SortCodeGenerator

Reasons and suggestions

Flink does not have Flink-table-planner in the jvm operating environment by default after 1.15. The related planner is loaded and used through flink-table-planner-loader through subclasspath. SortCodeGenerator is used in hudi to do part of the logic of the cluster, so This exception occurs.

Specific link: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/configuration/advanced/

So I think that hudi needs to improve the code adaptation after flink1.15, or give a reminder that the user should move the flink-table-planner jar package in the opt directory to the lib directory in the flink release version To adapt to the operating environment of hudi

huyuanfeng2018 commented 1 year ago

cc @danny0405

danny0405 commented 1 year ago

Thanks for the reminder, maybe we can add this notion on the Hudi website: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/configuration/advanced/#anatomy-of-table-dependencies

huyuanfeng2018 commented 1 year ago

I think it is good to add reminders, but based on our current operating environment, we need to package two different flink images to distinguish them in the production environment, so can you consider adding these dependency shades options to hudi?

I think it is good to add reminders, but based on our current operating environment, we need to package two different flink images to distinguish them in the production environment, so can you consider adding these dependency shades options to hudi?

danny0405 commented 1 year ago

No, hudi does not package enging specific jars in the uber jar.

huyuanfeng2018 commented 1 year ago

No, hudi does not package enging specific jars in the uber jar.

This will result in a large package

danny0405 commented 1 year ago

The more serious problem is once you shade a jar, you need to shade all the jar that it depends on, or there could be conflicts because of the indirectly introduced classes.

ertanden commented 1 year ago

I hit this issue on Amazon Kinesis Analytics (Flink) service. I tried to include the flink-table-planner_2.12-1.15.2.jar into the shadowJar, but this doesn't work.

Is there a known solution how to fix this on Amazon Kinesis Analytics?

danny0405 commented 1 year ago

Did you have the flink-table-planner-loader in the classpath?

ertanden commented 1 year ago

Do you mean I should put flink-table-planner-loader in the jobs shadowJar? I didn't try that.

But, my understanding from Flink documentation is that flink-table-planner-loader is already by default in the classpath (/lib folder). We have no control on the Flink distribution setup with Amazon Kinesis Analytics, it uses currently Flink 1.15.2 so I assume flink-table-planner-loader should be in classpath.

danny0405 commented 1 year ago

Did you use the Flink session cluster? What class is missing here?

ertanden commented 1 year ago

Did you use the Flink session cluster? What class is missing here?

Amazon Kinesis Analytics is an Application cluster as far as I know, not a session cluster.

The class missing is given in the description of this issue, which is;

Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.planner.codegen.sort.SortCodeGenerator

This happens when clustering.async.enabled is enabled. Otherwise the job runs fine.

danny0405 commented 1 year ago

The class is located in flink-table-planner-loader, can aws supporters give some help here?

ertanden commented 1 year ago

Any update on this?

Currently, it is impossible to run in AWS Kinesis Analytics a Hudi job in append mode (COW and insert) with clustering enabled. Job runs fine with clustering.async.enabled=false but then we get many many small files....

The class org.apache.flink.table.planner.codegen.sort.SortCodeGenerator is actually in the flink-table-planner_2.12 not in the flink-table-planner-loader. That's why this issue happens.

However, as documented by Flink, dependency on flink-table-planner_2.12 is deprecated since 1.15, and projects should refactor out of it.

Is there an idea how to remove dependency to org.apache.flink.table.planner.codegen.sort.SortCodeGenerator? I could give it a shot, but right now the code is not that familiar so I would need a clue where to start....

Zouxxyy commented 1 year ago

@ertanden I also found this problem, it may can be solved by:

(1) Copy the codegen module from flink into hudi, but it is written by scala; (2) Try to use PlannerCodeLoader in flink to load these classes; (3) Instead of using the SortCodeGenerator API, look for other APIs

Currently I'm trying the second way to solve it

@danny0405 do you have any other suggestions~ This should be a blocker problem, 100% reproducible after flink 1.15

danny0405 commented 1 year ago

@ertanden @Zouxxyy

dependency on flink-table-planner_2.12 is deprecated since 1.15, and projects should refactor out of it.

That's correct, we introduce SortCodeGenerator in the first place to fetch flexibility of sorting on operator level, and Flink seem has no better solution for it. Even though we have dependency on it, the flink-table-planner classes should still be on the classpath right? I'm not that familiair with PlannerCodeLoader too, maybe that's the direction deserves searching around.

Zouxxyy commented 1 year ago

@danny0405 flink-table-planner has been removed from the classpath since flink1.15

danny0405 commented 1 year ago

flink-table-planner has been removed from the classpath since flink1.15

Then how the classes in flink-table-planner is loaded then, can we do similiar loadings in Hudi side?

huyuanfeng2018 commented 1 year ago

flink-table-planner has been removed from the classpath since flink1.15

Then how the classes in flink-table-planner is loaded then, can we do similiar loadings in Hudi side?

This logic is executed when flink graph is built, not in jobmanager (except application-mode). The classpath running hudi-related logic does not load table-planner by default, because table-planner passes flink-table since flink1.15 -planner-loader dynamically loads flink-table-planner at runtime (because table-planner is not necessarily bound to blink), obviously hudi binds the sort implementation of blink in the code, so I think hudi may need to try to copy SortCodeGenerator related logic (I am not sure)

danny0405 commented 1 year ago

SortCodeGenerator is in scala, we do not want to introduce scala code in hudi-flink module.

vkhoroshko commented 8 months ago

Hello,

Is there any solution for this. I'm running Flink SQL client locally and it has flink-table-planner-loader-1.17.1.jar in the /opt/flink/lib folder (I'm using Docker).

However, if Async Clustering is enabled I receive the same error as above:

java.lang.ClassNotFoundException: org.apache.flink.table.planner.codegen.sort.SortCodeGenerator

If I replace flink-table-planner-loader-1.17.1.jar with the flink-table-planner_2.12-1.17.1.jar from opt folder, then the following error occurs:

Caused by: java.lang.NoSuchFieldError: CHAR_LENGTH
    at org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable.<clinit>(FlinkSqlOperatorTable.java:1156)
    at org.apache.flink.table.planner.delegation.PlannerContext.getBuiltinSqlOperatorTable(PlannerContext.java:322)
    at java.base/java.util.Optional.orElseGet(Unknown Source)
    at org.apache.flink.table.planner.delegation.PlannerContext.getSqlOperatorTable(PlannerContext.java:311)
    at org.apache.flink.table.planner.delegation.PlannerContext.createFrameworkConfig(PlannerContext.java:147)
    at org.apache.flink.table.planner.delegation.PlannerContext.<init>(PlannerContext.java:124)
    at org.apache.flink.table.planner.delegation.PlannerBase.<init>(PlannerBase.scala:121)
    at org.apache.flink.table.planner.delegation.StreamPlanner.<init>(StreamPlanner.scala:65)
    at org.apache.flink.table.planner.delegation.DefaultPlannerFactory.create(DefaultPlannerFactory.java:65)
    at org.apache.flink.table.factories.PlannerFactoryUtil.createPlanner(PlannerFactoryUtil.java:58)
    at org.apache.flink.table.gateway.service.operation.OperationExecutor.createStreamTableEnvironment(OperationExecutor.java:369)
    at org.apache.flink.table.gateway.service.operation.OperationExecutor.getTableEnvironment(OperationExecutor.java:326)
    at org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:185)
    at org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212)
    at org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119)
    at org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:258)
    ... 7 more