apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.38k stars 2.2k forks source link

Iceberg streaming streaming-skip-overwrite-snapshots SparkMicroBatchStream only skips over one file per trigger #8902

Closed cccs-jc closed 9 months ago

cccs-jc commented 1 year ago

Apache Iceberg version

1.3.1

Query engine

Spark

Please describe the bug 🐞

@singhpk234 I think you might know how to fix this.

The implementation of streaming-skip-overwrite-snapshots is not what I expected. At this location it does skip over any rewrite snapshots, but only a single file at the time.

https://github.com/apache/iceberg/blob/b1f7008517bf9da0fe4eea6755878a87cf64341d/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java#L230C9-L230C17

Suppose you trigger every minute and that you encounter a rewrite snapshot with 300 files. This means it will take 300 x 1 minute (300 minutes) to finally skip over the snapshot and start progressing again.

I think that once a rewrite snapshot is detected we should exhaust all the positions (all the files) in that commit to position ourselves for the next commit.

This is how my writeStream is configured.

    # connect to source table
    df = spark.readStream.format("iceberg")
    if reset_checkpoint:
        # current time in milliseconds
        ts = int(time.time() * 1000)
        print(f"Reading {source_table} from ts {ts}")
        df = df.option("stream-from-timestamp", ts)

    df = (
        df
        .option("split-size", 16 * 1024 * 1024)
        .option("streaming-skip-delete-snapshots", True)
        .option("streaming-skip-overwrite-snapshots", True)
        .option("streaming-max-files-per-micro-batch", 200)
        .option("streaming-max-rows-per-micro-batch", 2000000)
        .load(source_table)
        .withWatermark(
            "timestamp", "10 minutes"
        )  # enable watermark so that spark keeps track of min/max/avg/watermark eventTime.
        # Note we do not use the watermark to evict rows from an aggregation window, only to keeps track of eventTime metrics.
    )
singhpk234 commented 1 year ago

ACK @cccs-jc let me take a closer look.

cccs-jc commented 1 year ago

yes I would skip over all the files of a rewrite snapshot.

also I've noticed here when you calculate the latestOffset you call skippedManifestIndexesFromSnapshot.

https://github.com/apache/iceberg/blob/e2b56daf35724700a9b57dbeee5fe23f99c592c4/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java#L350

However, that function does not skip over rewrite snapshots. I think it should?

and here, I'm not sure why you add the current number of files to your currentFileIndex counter.

https://github.com/apache/iceberg/blob/e2b56daf35724700a9b57dbeee5fe23f99c592c4/core/src/main/java/org/apache/iceberg/MicroBatches.java#L95

cccs-jc commented 12 months ago

@singhpk234 just a few quick questions. see above.

singhpk234 commented 12 months ago

yes @cccs-jc i think the right way to skip over snapshots (for the overwrite ones) should be latestOffset Api so that we coud just say the new offset is the one ignoring un-necessary ones.

Would you like to add a CL for the same? presently I am a bit tied up in some internal work should be able to pick this up sometime next week otherwise

cccs-jc commented 12 months ago

a CL, that's a pull request?

right now I'm trying to add unit tests but it's going slowly. I don't have too many free cycles..

cccs-jc commented 12 months ago

I did manage to reproduce the issue. So we'll see how it goes.


@Test
  public void testReadStreamWithSnapshotTypeReplaceIgnoresReplace() throws Exception {
    // fill table with some data
    List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
    appendDataAsMultipleSnapshots(expected);

    // this should create a snapshot with type Replace.
    table.rewriteManifests().clusterBy(f -> 1).commit();
    RewriteFiles rewrite = table.newRewrite();

    Iterable<Snapshot> it = table.snapshots();
    for (Snapshot snapshot : it) {
      Iterable<DataFile> datafiles = snapshot.addedDataFiles(table.io());
      for (DataFile datafile : datafiles) {
        rewrite.addFile(datafile);
        rewrite.deleteFile(datafile);
      }
    }
    rewrite.commit();

    // check pre-condition
    Assert.assertEquals(DataOperations.REPLACE, table.currentSnapshot().operation());

    // StreamingQuery query = startStream();
    // List<SimpleRecord> actual = rowsAvailable(query);
    // Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));

    Assert.assertEquals(
        5,
        microBatchCount(
            ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1")));

  }
singhpk234 commented 12 months ago

Ya sure take your time you are pretty close :) let me know if you need some help here, apologies I am short on bandwidth too due to upcoming re:invent.

cccs-jc commented 12 months ago

@singhpk234 do you know why the existingFilesCount are added to the count. Seems like it should only add the number of addedFilesCount .

https://github.com/apache/iceberg/blob/e2b56daf35724700a9b57dbeee5fe23f99c592c4/core/src/main/java/org/apache/iceberg/MicroBatches.java#L95

In the test cases there are no snapshot with multiple manifest list so I'm not 100% sure it's correct or not.

cccs-jc commented 11 months ago

@singhpk234 I have a fix. Unit tested and ran it on our live data.

I will soon create a PR for you to review.