apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
7.63k stars 1.68k forks source link

[Bug] [seatunnel-flink-starter] If checkpoint is not set, data will be lost #6986

Closed zhangshenghang closed 1 week ago

zhangshenghang commented 1 month ago

Search before asking

What happened

Described in the documentation:

image

I submit the Flink task to Yarn for execution through the following command

bin/start-seatunnel-flink-13-connector-v2.sh --config ./config/test.config --master yarn-per-job

test.config

env {
  parallelism = 2
  job.mode = "STREAMING"
#   flink.execution.checkpointing.interval=5000
}
source {
  Kafka {
    schema = {
      fields {
        comment_num = string
        insert_time = string
        user_info = {
            username = string,
            age = string
        }
      }
    }
    topic = "test-topic"
    consumer.group  = "test-group"
    bootstrap.servers = "xxxx"
    kafka.config = {
      client.id = client_1
      max.poll.records = 500
      auto.offset.reset = "earliest"
      enable.auto.commit = "false"
    }
    result_table_name = "kafka_table"
  }
}

sink {
    Elasticsearch {
        source_table_name = "kafka_table"
        hosts = ["xxxx"]
        index = "test-588"
    }
}

I did not set the flink.execution.checkpointing.interval parameter in the configuration file, The checkpoint.intercal parameter is not set in the Flink configuration file. At this time, writing to ElasticSearch will lose data.

Because ElasticSearch Sink uses the parameter maxBatchSize to submit in batches, it will process uncommitted data through prepareCommit

    @Override
    public Optional<ElasticsearchCommitInfo> prepareCommit() {
        bulkEsWithRetry(this.esRestClient, this.requestEsList);
        return Optional.empty();
    }

    @Override
    public void write(SeaTunnelRow element) {
        if (RowKind.UPDATE_BEFORE.equals(element.getRowKind())) {
            return;
        }

        String indexRequestRow = seaTunnelRowSerializer.serializeRow(element);
        requestEsList.add(indexRequestRow);
        if (requestEsList.size() >= maxBatchSize) {
            bulkEsWithRetry(this.esRestClient, this.requestEsList);
        }
    }

This may be because the default value of checkpoint.interval is not set in Flink STREAMING mode in the code. image

If this is a problem, please assign me.

SeaTunnel Version

2.3.5

SeaTunnel Config

seatunnel.yaml

seatunnel:
  engine:
    history-job-expire-minutes: 1440
    backup-count: 1
    queue-type: blockingqueue
    print-execution-info-interval: 60
    print-job-metrics-info-interval: 60
    slot-service:
      dynamic-slot: true
    checkpoint:
      interval: 10000
      timeout: 60000
      storage:
        type: hdfs
        max-retained: 3
        plugin-config:
          namespace: /tmp/seatunnel/checkpoint_snapshot
          storage.type: hdfs

### Running Command

```shell
bin/start-seatunnel-flink-13-connector-v2.sh --config ./config/test.config --master yarn-per-job

Error Exception

When the data does not meet the maxBatchSize, the writer will not write the data. This will cause the data to not be flushed.

Zeta or Flink or Spark Version

No response

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

Carl-Zhou-CN commented 4 weeks ago

That seems to be a problem