tidb-incubator / TiBigData

TiDB connectors for Flink/Hive/Presto
Apache License 2.0
211 stars 57 forks source link

[Enhancement] Flink-tidb-connector Flink-tidb-connector batch flush can add timeout determination #245

Open Xuxiaotuan opened 1 year ago

Xuxiaotuan commented 1 year ago

Enhancement

When I start use flink-tidb-connector-1.14 to sink data to TiDB, refer to README_unified_batch_streaming.md image

But  insert data Too little,only three rows and tikv.sink.buffer-size default 1000,So can't trigger flush rows.

Code block: TiDBWriteOperator

@Override  
public void processElement(StreamRecord<Row> element) throws Exception {  
  Row row = element.getValue();  
  if (buffer.isFull()) {  // only judge rows size
    flushRows();  
  }  
  boolean added = buffer.add(row);  
  if (!added && !sinkOptions.isDeduplicate()) {  
    throw new IllegalStateException(  
        "Duplicate index in one batch, please enable deduplicate, row = " + row);  
  }  
}

The issue: Set default commit time,for example: tikv.sink.max.wait.ms5000

  1. When checking the number of rows each time, it is judged whether the current time has expired. If the number of rows has not been reached, but the time has been reached, flush rows
  2. Separately judge whether to process overtime, set up a single consumption pipeline, regularly check whether the time is overtime, and flush it when the time is reached.