apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.84k stars 4.25k forks source link

[Bug]: KafkaIO missing initial messages when using automatic topic creation #27246

Open bvolpato opened 1 year ago

bvolpato commented 1 year ago

What happened?

I noticed missing sequences when using GenerateSequence + KafkaIO to produce records to Kafka, when relying on the automatic topic creation feature (auto.create.topics.enable).

What I've used to reproduce:

    Pipeline p = Pipeline.create(pipelineOptions);

    PCollection<Long> numbers =
        p.apply(GenerateSequence.from(0).withRate(100, Duration.millis(100)));
    numbers
        .apply(
            MapElements.into(
                    TypeDescriptors.kvs(TypeDescriptors.longs(), TypeDescriptors.strings()))
                .via(x -> KV.of(x, String.valueOf(x))))
        .apply(
            KafkaIO.<Long, String>write()
                .withTopic(TOPIC_NAME)
                .withBootstrapServers(BOOTSTRAP_SERVERS)
                .withKeySerializer(LongSerializer.class)
                .withValueSerializer(StringSerializer.class));

    p.run();

And consuming using KafkaIO.read() + writing to a file:

    Pipeline p = Pipeline.create(pipelineOptions);

    List<TopicPartition> partitions = new ArrayList<>();
    for (int i = 0; i < 20; i++) {
      partitions.add(new TopicPartition(TOPIC_NAME, i));
    }

    PCollection<KV<Long, KV<String, Tick>>> ticks = p.apply(
            KafkaIO.<Long, String>read()
                .withBootstrapServers(BOOTSTRAP_SERVERS)
                .withKeyDeserializer(LongDeserializer.class)
                .withValueDeserializer(StringDeserializer.class)
                .withTopicPartitions(partitions))
        .apply(MapElements.into(TypeDescriptors.strings()).via(record -> record.getKV().getValue()));

    ticks.apply("Window", Window.into(FixedWindows.of(Duration.standardMinutes(1))))
        .apply(
            TextIO.write().to("gs://test-bucket/" + TOPIC_NAME + "/keys-")
                .withNumShards(0)
                .withWindowedWrites());

    p.run();

Then I copy the files from gcs (gsutil -m cp gs://test-bucket/{TOPIC}/* /tmp/test) and created a simple class to check for continuity:

public class TestContiguous {

  public static void main(String[] args) {
    findMissingNumbersInFiles(new File("/tmp/example-stream/"));
  }

  public static List<Integer> findMissingNumbersInFiles(File folder) {
    File[] files = folder.listFiles();

    TreeSet<Integer> numbers = new TreeSet<>();
    for (File file : files) {
      numbers.addAll(readNumbersFromFile(file));
    }

    List<Integer> missingNumbers = new ArrayList<>();
    int minNumber = numbers.first();
    int maxNumber = numbers.last();

    for (int i = minNumber; i <= maxNumber; i++) {
      if (!numbers.contains(i)) {
        missingNumbers.add(i);
      }
    }

    System.out.println(
        "Min: " + minNumber + ", Max: " + maxNumber + ". Missing: " + missingNumbers);
    return missingNumbers;
  }

  public static List<Integer> readNumbersFromFile(File file) {
    List<Integer> numbers = new ArrayList<>();
    try (Scanner scanner = new Scanner(file)) {
      while (scanner.hasNextLine()) {
        String line = scanner.nextLine().trim();
        if (!line.isEmpty()) {
          try {
            int number = Integer.parseInt(line.split(",")[0]);
            numbers.add(number);
          } catch (NumberFormatException e) {
            e.printStackTrace();
          }
        }
      }
    } catch (IOException e) {
      e.printStackTrace();
    }
    return numbers;
  }

}

It is expected that there will be some records missing at the tail end due to parallelism, but I've always encountered about ~1k of sequences missing at the beginning of the pipeline:

Min: 1268, Max: 780327. Missing: [1269, 1270, 1271, 1272, 1273, 1276, 1277, 1280, 1282, 1283, 1284, 1293, 780319, 780322, 780323, 780324, 780326]

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

aromanenko-dev commented 1 year ago

Could you try to rerun this test with .withConsumerConfigUpdates(ImmutableMap.of("auto.offset.reset", "earliest")) for KafkaIO.read() please?