apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.51k stars 2.25k forks source link

Iceberg Spark streaming skips rows of data #10156

Open cccs-jc opened 7 months ago

cccs-jc commented 7 months ago

Apache Iceberg version

1.5.0 (latest release)

Query engine

Spark

Please describe the bug 🐞

When using spark readStream the option stream-from-timestamp is used to position the read at the specified timestamp. The query below uses the current time to read from the head of the queue.

ts = int(time.time() * 1000)
df = spark.readStream.format("iceberg")
        .option("streaming-skip-delete-snapshots", True)
        .option("streaming-skip-overwrite-snapshots", True)
        .option("streaming-max-files-per-micro-batch", max_files)
        .option("streaming-max-rows-per-micro-batch", max_rows)
        .option("stream-from-timestamp", ts)
        .load(source_table)

You can kill your streaming job and wait 10 minutes. Then start it again. The readStream will load the checkpointed offset from disk and is supposed to read from that offset. However, there is a bug that cause it to skip the commits that occurred in that 10 minutes and instead the readStream reads from the latest commit.

I can work around this bug by not specifying the stream-from-timestamp if the query uses the checkpointed offset.

    ts = int(time.time() * 1000)
    use_ts = checkpoint_dir_exists(checkpoint_location) == False
    df = ( 
        get_spark().readStream.format("iceberg")
        .option("streaming-skip-delete-snapshots", True)
        .option("streaming-skip-overwrite-snapshots", True)
        .option("streaming-max-files-per-micro-batch", max_files)
        .option("streaming-max-rows-per-micro-batch", max_rows)
        )
    if use_ts:
        df = df.option("stream-from-timestamp", ts)
    df = df.load(source_table)

But this is error prone. As a user I expect the readStream to continue from the last checkpointed offset. That is the behavior of the kafka readStream.

Note2: For streaming queries, this only applies when a new query is started, and that resuming will always pick up from where the query left off. Newly discovered partitions during a query will start at earliest.

https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

I suspect the issue might be here. https://github.com/apache/iceberg/blob/fc5b2b336c774b0b8b032f7d87a1fb21e76b3f20/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java#L326

This seems to short-circuit the saved startOffset by checking of the last snapshot in the table is older than the requested stream-from-timestamp.

@singhpk234 I have not stepped through the code to be sure. Is my theory possible?

singhpk234 commented 7 months ago

@cccs-jc this is interesting, question is is the consumed offset properly committed in the scenario ?

  1. did you check this offset json in the offset directory ?
  2. is it problem with just new latestOffset API impl or do you experience the same in the old latestOffset impl ?
  3. Let me take a deeper look on the code pointer shared

Also could it be related to this ? https://github.com/apache/iceberg/pull/4473/files ? if streaming state is reset then this could very well be the issue. let me know

cccs-jc commented 7 months ago

make sure to set stream-from-timestamp to the current time.

Then start a streaming job (let's call it the writer) that writes every minute. The other streaming job consumes that Iceberg table (let's call it the reader) . If you stop the reader for 10 minutes and resume the job, it will filter out all the data that was written during that 10 minutes. That's because it applies the stream-from-timestamp when in fact it should not look at it at all but instead rely on the checkpoint information (which I'm sure is good)

thanks for looking into it @singhpk234

cccs-jc commented 5 months ago

@singhpk234 any progress on this issue ?

singhpk234 commented 5 months ago

Haven't been looking into this actively.

couple of questions :

That's because it applies the stream-from-timestamp when in fact it should not look at it at all but instead rely on the checkpoint information (which I'm sure is good)

I see so let say there were S1, S2, S3, S4 committed at timestamp t1, t2, t3, t4 where t1 < t2 < t3 < t4 right now we consumed till t2 and hopefully checkpointed till t2 and now when we start the stream it starts let's say S5 (which happens after S4) and not resume from S3 ? is this understanding correct ?

https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/SparkDataStream.java#L40

One thing that comes to my mind is that we should let the initialOffset process even when it;s not passing our latest offset check ? but thats something planFilesApi would already be doing, may be we need more log lines to see the call pattern then ?

cccs-jc commented 5 months ago

The issue is that when a streamy query resumes (either it was killed, died, gracefully stopped) it does not resume where it left off but rather resumes based on the stream-from-timestamp. When it resumes from a checkpoint it should ignore stream-from-timestamp.

singhpk234 commented 5 months ago

I see so what i was trying to say is that latestOffset() API which you linked in the ticket is working right, as if it's giving the latestOffset it can see, question is planFiles() which is calling this API is calling it from the right offset, from the latestOffset() rather than the checkpointed location or is it expected latestOffset to handle this ?

One more issue I see is that commit() API is doing nothing which would hint spark that it has done something and should resume from here .

do you know if this is a problem with the ratelimiting reader or all spark structured streaming as from the history i can't see that this ever was overriden ?

cccs-jc commented 3 months ago

@singhpk234 best way to reproduce the issue is to run a streaming job which sets stream-from-timestamp to the current time.

The job will read the latest data. You then stop your job of 10 minutes. Then resume it. Because stream-from-timestamp is set to the current time it will read from that point, rather then reading the 10 minutes worth of data which as come in since the job was stopped.

cccs-jc commented 3 months ago

@singhpk234 are you able to reproduce the issue ?

singhpk234 commented 3 months ago

@cccs-jc no i wasn't i tried this unit test :

  @TestTemplate
  public void testResumingStreamReadFromCheckpointWithStreamFromTimestamp() throws Exception {
    File writerCheckpointFolder = temp.resolve("writer-checkpoint-folder").toFile();
    File writerCheckpoint = new File(writerCheckpointFolder, "writer-checkpoint");
    File output = temp.resolve("junit").toFile();

    DataStreamWriter querySource =
            spark
                    .readStream()
                    .format("iceberg")
                    .load(tableName)
                    .writeStream()
                    .option("checkpointLocation", writerCheckpoint.toString())
                    .option(SparkReadOptions.STREAM_FROM_TIMESTAMP, System.currentTimeMillis())
                    .format("parquet")
                    .queryName("checkpoint_test")
                    .option("path", output.getPath());

    StreamingQuery startQuery = querySource.start();
    startQuery.processAllAvailable();
    startQuery.stop();

    List<SimpleRecord> expected = Lists.newArrayList();
    for (List<List<SimpleRecord>> expectedCheckpoint :
            TEST_DATA_MULTIPLE_WRITES_MULTIPLE_SNAPSHOTS) {
      // New data was added while the stream was down
      appendDataAsMultipleSnapshots(expectedCheckpoint);
      expected.addAll(Lists.newArrayList(Iterables.concat(Iterables.concat(expectedCheckpoint))));

      // Stream starts up again from checkpoint read the newly added data and shut down
      StreamingQuery restartedQuery = querySource.start();
      restartedQuery.processAllAvailable();
      restartedQuery.stop();

      // Read data added by the stream
      List<SimpleRecord> actual =
              spark.read().load(output.getPath()).as(Encoders.bean(SimpleRecord.class)).collectAsList();
      assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
    }
  }

I think this may be that i am reading using the same spark session, when you kill the job how do you do it can you elaborate more.

Can you please apply this patch and test see this explanation if you are starting a new spark session ? https://github.com/apache/iceberg/pull/4473#issuecomment-1086892995

If it fixes your case i will add a pr for the same.

cccs-jc commented 3 months ago

in your test case, re-create the readStream from scratch. This will set the .option(SparkReadOptions.STREAM_FROM_TIMESTAMP, System.currentTimeMillis())

to the current time. I think right now the readStream uses the "old" timestamp.

so maybe just add

DataStreamWriter querySource2 =
            spark
                    .readStream()
                    .format("iceberg")
                    .load(tableName)
                    .writeStream()
                    .option("checkpointLocation", writerCheckpoint.toString())
                    .option(SparkReadOptions.STREAM_FROM_TIMESTAMP, System.currentTimeMillis())
                    .format("parquet")
                    .queryName("checkpoint_test")
                    .option("path", output.getPath());
cccs-jc commented 2 months ago

@singhpk234 does my previous explanation make sense ?