apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.4k stars 2.43k forks source link

[SUPPORT] Is it allowed using Flink Table API sqlQuery() to read data from hudi tables? #9093

Open gamblewin opened 1 year ago

gamblewin commented 1 year ago

Describe the problem you faced

I'm trying to use flink table api sqlQuery to read data from hudi table but not working, so am i doing it wrong or hudi doesn't support this way to query data.

Code

sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
sTableEnv = StreamTableEnvironment.create(sEnv);
sEnv.setParallelism(1);
sEnv.enableCheckpointing(3000);
// create table
String createTabelSql = "create table dept(\n" +
        "  dept_id BIGINT PRIMARY KEY NOT ENFORCED,\n" +
        "  dept_name varchar(10),\n" +
        "  ts timestamp(3)\n" +
        ")\n" +
        "with (\n" +
        "  'connector' = 'hudi',\n" +
        "  'path' = 'hdfs://localhost:9000/hudi/dept',\n" +
        "  'table.type' = 'MERGE_ON_READ'\n" +
        ")";
sTableEnv.executeSql(createTabelSql);
// insert data
sTableEnv.executeSql("insert into dept values (1, 'a', NOW()), (2, 'b', NOW())");
// query data
Table table = sTableEnv.sqlQuery("select * from dept");
DataStream<Row> dataStream = sTableEnv.toDataStream(table);
// there's nothing to print
dataStream.print();

Environment Description

gamblewin commented 1 year ago

One more question, does hudi have any flink api for bulk insert? image The example on official website is inserting data into Hudi table one by one, what if i want to split data source stream into different windows and when each window closes, bulk insert all data in that window into Hudi table.

For now, the only way i can think of bulk insert is use the executeSql() method of StreamTableEnvironment to execute SQL statements by concatenating the SQL string.

danny0405 commented 1 year ago

It seems you are using the batch query, can you check whether the data is committed into the table (by checking new commit meta files on .hoodie folder).

We can enable bulk_insert mode for Flink with option write.operation = 'BULK_INSERT', the bulk_insert only works for batch execution mode.

gamblewin commented 1 year ago

@danny0405 Thx for replying.

  1. Data is committed into the table, but can not be queried by using sTableEnv.sqlQuery(select * from dept). image

  2. If i use sql way, which is inserting multiple rows in one sql and executing this sql, is this way bulk insert or not?

    sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
    sEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);    // set execution mode as batch
    sTableEnv = StreamTableEnvironment.create(sEnv);
    sEnv.setParallelism(1);
    sEnv.enableCheckpointing(3000);
    
    // SQL way: insert multiple rows in one sql without explicitly configuring write option as bulk insert
    sTableEnv.executeSql("insert into dept values (1, 'a', NOW()), (2, 'b', NOW())");
  3. If the above sql way is not bulk insert, is there any way i can bulk insert data by using sql? I know that for query sql, we can add options to set up some configurations, but i tried add options to insert data sql, it's not working.

    insert into dept values
    (1, 'a', NOW()),
    (2, 'b', NOW())
    /*+
    options (
    'write.operation' = 'bulk_insert'
    )*/
  4. I think what u really mean is using streaming API to bulk insert data. In my understanding, bulk insert means insert a batch of data at a time, but in the following code, source data is an unbounded stream, how does sink function split source data into different batches?

    DataStream<RowData> dataStream = env.addSource(...);
    Map<String, String> options = new HashMap<>();
    // other option configurations ......
    options.put("write.operation", "bulk_insert");
    DataStream<RowData> dataStream = sEnv.addSource(...);
    HoodiePipeline.Builder builder = HoodiePipeline.builder("dept")
          .column(...)
          .options(options);
    builder.sink(dataStream, false);  
danny0405 commented 1 year ago

You should define the bulk_insert option while initializing the table with sql:

    String createTabelSql = "create table dept(\n" +
        "  dept_id BIGINT PRIMARY KEY NOT ENFORCED,\n" +
        "  dept_name varchar(10),\n" +
        "  ts timestamp(3)\n" +
        ")\n" +
        "with (\n" +
        "  'connector' = 'hudi',\n" +
        "  'path' = 'hdfs://localhost:9000/hudi/dept',\n" +
        "  'table.type' = 'MERGE_ON_READ'\n" +
        ")";

It's weird you can't query the data, is there any exception thrown out?

gamblewin commented 1 year ago

@danny0405 Thx, I have reviewed the documentation on the Hudi website regarding bulk insert, and it states that bulk insert "implements a sort-based data writing algorithm", which means bulk insert and batch insert are actually not the same concept? For example, if I insert 100 records into a Hudi table in one sql, bulk insert does not optimize the performance of batch insertion. It simply provides a sorting operation during the data insertion process?

I check the web ui, there's no exception. job graph

image

exception page

image
danny0405 commented 1 year ago

@gamblewin From the runtime DAG, it seems you are using MOR table with upsert operation. For bulk_insert, it is expected to be executed in flink batch runtime mode, and the write.operation should be set up as bulk_insert.