nats-io / nats-spark-connector

Apache License 2.0
8 stars 5 forks source link

After application restart, total number of processed messages at the end is not same as input messages #11

Closed vivekgh24 closed 4 months ago

vivekgh24 commented 9 months ago

What version were you using?

nats-server: v2.0.0 Jetstream Version: 2.10.4

What environment was the server running in?

Local - On Eclipse IDE running in my system

Is this defect reproducible?

  1. Run below Java code to pull messages from NATS Jetstream , process and save them
private static void sparkNatsTester() {

            SparkSession spark = SparkSession.builder()
                    .appName("spark-with-nats")
                    .master("local")
                      .config("spark.jars",
                      "libs/nats-spark-connector-balanced_2.12-1.1.4.jar,"+"libs/jnats-2.17.1.jar"
                      )
                     .config("spark.sql.streaming.checkpointLocation","tmp/checkpoint")
                    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
                    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
                      .getOrCreate();
            System.out.println("sparkSession : "+ spark);
            Dataset<Row> df = spark.readStream()
                    .format("nats")
                    .option("nats.host", "localhost")
                    .option("nats.port", 4222)
                    .option("nats.stream.name", "newstream")
                    .option("nats.stream.subjects", "newsub")
                    .option("nats.durable.name", "cons1")
                    .option("nats.msg.ack.wait.secs", 120)
                    .load();
            StreamingQuery query;
            try {
                query = df.withColumn("date_only", from_unixtime(unix_timestamp(col("dateTime"),  "MM/dd/yyyy - HH:mm:ss Z"), "MM/dd/yyyy"))
                        .writeStream()
                          .outputMode("append")
                          .partitionBy("date_only")
                          .format("delta")
                          .option("path", "tmp/outputdelta")
                          .start();
                query.awaitTermination();
            } catch (Exception e) {
                e.printStackTrace();
            } 
        }
  1. From NATS CLI, run bellow command to push 10k messages for testing nats pub newsub --count=10000 "test #{{Count}}"

  2. While spark application is processing the messages, stop the spark application.

  3. After sometime restart the spark application.

Given the capability you are leveraging, describe your expectation?

Since 10k messages were pushed to NATS Jetstream as input, after spark application processed all the messages ( after in-between stop and restarting the spark application) the number of processed messages in the output folder "tmp/outputdelta" should be exactly 10k . That is no. of input messages should have been same as no. of output/processed messages.

Given the expectation, what is the defect you are observing?

No. of output/processed messages in the output folder is always greater than the no. of input messages . In above scenario output messages = 10100 where as only 10000 messages were pushed to NATS Jetstream as input. 100 messages were duplicated !

Observations : As I can see in the tmp\outputdelta_delta_log folder , the last file which got generated before stopping the application contains below :

{"commitInfo":{"timestamp":1701081648739,"operation":"STREAMING UPDATE","operationParameters":{"outputMode":"Append","queryId":"193fec24-78ea-4ac0-87af-7b6232e748ff","epochId":"20"},"readVersion":19,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numRemovedFiles":"0","numOutputRows":"100","numOutputBytes":"1753","numAddedFiles":"1"},"engineInfo":"Apache-Spark/3.3.3 Delta-Lake/2.3.0","txnId":"36a435e7-f775-4f80-a637-48a40c23dc87"}}
{"txn":{"appId":"193fec24-78ea-4ac0-87af-7b6232e748ff","version":20,"lastUpdated":1701081648739}}
{"add":{"path":"date_only=11%252F27%252F2023/part-00000-5c9955af-06c7-4da6-991a-2412f033cd38.c000.snappy.parquet","partitionValues":{"date_only":"11/27/2023"},"size":1753,"modificationTime":1701081648739,"dataChange":true,"stats":"{\**"numRecords\":100**,\"minValues\":{\"subject\":\"newsub\",\"dateTime\":\"11/27/2023 - 16:10:16 +0530\",\"content\":\"**test #3001**\"},\"maxValues\":{\"subject\":\"newsub\",\"dateTime\":\"11/27/2023 - 16:10:16 +0530\",\"content\":\"**test #3100**\"},\"nullCount\":{\"subject\":0,\"dateTime\":0,\"content\":0}}"}}

And the first file which got generated right after restarting the spark application contains below :

{"commitInfo":{"timestamp":1701081772472,"operation":"STREAMING UPDATE","operationParameters":{"outputMode":"Append","queryId":"f8d338e3-9eb2-49a2-96df-2a4c0e368285","epochId":"10"},"readVersion":30,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numRemovedFiles":"0","numOutputRows":"1000","numOutputBytes":"6250","numAddedFiles":"1"},"engineInfo":"Apache-Spark/3.3.3 Delta-Lake/2.3.0","txnId":"d1a5e794-6827-4c3d-bfa0-e1fd9e3474c9"}}
{"txn":{"appId":"f8d338e3-9eb2-49a2-96df-2a4c0e368285","version":10,"lastUpdated":1701081772472}}
{"add":{"path":"date_only=11%252F27%252F2023/part-00000-8e45d89b-6426-43f5-8144-ee826874dcfb.c000.snappy.parquet","partitionValues":{"date_only":"11/27/2023"},"size":6250,"modificationTime":1701081772455,"dataChange":true,"stats":"{\"**numRecords\":1000**,\"minValues\":{\"subject\":\"newsub\",\"dateTime\":\"11/27/2023 - 16:10:16 +0530\",\"content\":\"**test #3001**\"},\"maxValues\":{\"subject\":\"newsub\",\"dateTime\":\"11/27/2023 - 16:10:16 +0530\",\"content\":\"**test #4000**\"},\"nullCount\":{\"subject\":0,\"dateTime\":0,\"content\":0}}"}}

As you can see, before application was stopped, it had processed 3100 messages. But after spark application was restarted , it again processed from 3000 instead of 3100. That's where we see 100 messages were duplicated in the output folder. Also I'm using Durable consumer which resends the messages if acknowledgment wasn't done after consumption. So looks like for the last microbatch which was processed just before app shutdown ( 3000 to 3100 ) acknowledgment wasn't done so it resent those messages again which got duplicated How to fix this issue ?

vivekgh24 commented 9 months ago

@jnmoyne @gcolliso @sergiosennder @scottf @bruth Can you please help us on this.

jnmoyne commented 4 months ago

The messages in the batch are acknowledged when commit is invoked for the batch, so if the app is killed before that happens then those un-acknowledged messages will be re-delivered, I do not believe this behavior should be changed as it's the whole point of the commit (to ack the messages only when commit is invoked).

vivekgh24 commented 4 months ago

@jnmoyne Thanks for the response. Problem was spark structured streaming app which was consuming these messages fron NATS.io was saving the messages to output sink . But looks like before they were acknowledged spark app was shutdown and when we restarted the spark app those un-acknowledged messaged ( even though they were saved in the output sink) got reprocessed. Ok so that means we need to handle those duplicates in the consuming application . In our case it's spark structured streaming app. Closing the ticket.