apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.41k stars 2.43k forks source link

[SUPPORT]Behavior of streaming read table A join with streaming read table B with Flink #11933

Closed bithw1 closed 1 month ago

bithw1 commented 1 month ago

Hi, I have following sql snippet that I want to do join against two streaming read tables.For now, I am not able to do the experiment to find out the behavior, so I would ask here.

  1. I would ask how the join works(What data participates in the join calculation) 1.1. When new data comes from A, these new data will do join with full data of Z? 1.2. When new data comes from Z, these new data will do join with full data of A? 1.3. When new data comes at the same time from A and Z, then new data from A do join will full Z and new data from Z will do join with full A? 1.4 I am not sure whether only new data from two tables will do the join. If so,then there would very few data will join successfully because of the difference of data arrival? 2.. new data comes from A means there is new commits in A? Query won't see data that hasn't been committed?
    ---1. streaming read table A
      create table A(
        a  string,
        b  string,
        c  string,
        d  string
      ) WITH

      'connector' = 'hudi',
      'path' = 'hdfs://tmp/hudi_table_a',
      'table.type' = 'MERGE_ON_READ',
      'read.streaming.enabled' = 'true',
      'read.streaming.start-commit' = 'earliest',
      'write.precombine.field' = 'a',
      'hoodie.datasource.write.keygenerator.type' = 'complex',
      'hoodie.datasource.write.recordkey.field' = 'b,c',
      'read.streaming.check-interval' = '5',
      'read.tasks' = '4',
      'read.rate.limit' = '10000'
      );

     ---2. streaming read table Z
      create table Z(
        a  string,
        x  string,
        y  string,
        z  string
      ) WITH

      'connector' = 'hudi',
      'path' = 'hdfs://tmp/hudi_table_z',
      'table.type' = 'MERGE_ON_READ',
      'read.streaming.enabled' = 'true',
      'read.streaming.start-commit' = 'earliest',
      'write.precombine.field' = 'a',
      'hoodie.datasource.write.keygenerator.type' = 'complex',
      'hoodie.datasource.write.recordkey.field' = 'x,y',
      'read.streaming.check-interval' = '5',
      'read.tasks' = '4',
      'read.rate.limit' = '10000'
      );

    --- join againt two streaming read table A and Z  
      select A.a, b,c,d,x,y,z from  A join Z
      on A.a = Z.a
danny0405 commented 1 month ago

I think you are talking about regular join, that is select ... from table A join B where ... on ..., if both A and B are streaming, then each time a new record from one side comes, it will trigger a full join of the other side.

There is also a dimention table join, which will just use the records from left stream to join the right table, and the right table needs to refresh itself at some interval.

bithw1 commented 1 month ago

I think you are talking about regular join, that is select ... from table A join B where ... on ..., if both A and B are streaming, then each time a new record from one side comes, it will trigger a full join of the other side.

There is also a dimention table join, which will just use the records from left stream to join the right table, and the right table needs to refresh itself at some interval.

Thanks @danny0405 , for the regular join, each time a new record from one side comes, it will trigger a full join of the other side, looks to me that both tables are kept in the flink state? As of underlying hudi table,the table is also saved in flink state? Are there hudi configutions to support this streaming tables join, such as index.state.ttl?