bartosz25 / spark-scala-playground

Sample processing code using Spark 2.1+ and Scala
50 stars 25 forks source link

Question in the TriggerTest #1

Closed bithw1 closed 5 years ago

bithw1 commented 5 years ago

Hi, @bartosz25

In the TriggerTest.scala, I have a question that I don't understand. I print the Container.processingTimes on the console.

/*
1534650818447
1534650813458
1534650808554
1534650803565
1534650798638 
1534650795528
 */

I would ask why the second processing time 1534650798638 is only about 2 seconds larger than 1534650795528, I think it should be about 5 seconds

"trigger lower than data arrival time" should "not process rows every trigger interval" in {
    val inputStream = new MemoryStream[Int](1, sparkSession.sqlContext)
    val stream = inputStream.toDS().toDF("number")

    val query = stream.writeStream.outputMode("update").trigger(Trigger.ProcessingTime("1 seconds"))
      .foreach(new ForeachWriter[Row]() {
        override def open(partitionId: Long, version: Long): Boolean = true

        override def process(value: Row): Unit = {
          val currentTime = System.currentTimeMillis()
          Container.processingTimes.append(currentTime)
        }

        override def close(errorOrNull: Throwable): Unit = {}
      })
      .start()

    new Thread(new Runnable() {
      override def run(): Unit = {
        while (!query.isActive) {Thread.sleep(50)}
        while (true) {
          inputStream.addData(1)
          Thread.sleep(5000)
        }
      }
    }).start()

    query.awaitTermination(30000)

    /*
    1534650818447
    1534650813458
    1534650808554
    1534650803565
    1534650798638 
    1534650795528
     */

    Container.processingTimes_string.foreach(println)

    val processingTimes = Seq[Long]()
    for (index <- 0 until Container.processingTimes.size - 1) {
      val currentTimestamp = Container.processingTimes(index)
      val nextTimestamp = Instant.ofEpochMilli(Container.processingTimes(index + 1))

      val processingTimeDiffInSec = nextTimestamp.minusMillis(currentTimestamp).getEpochSecond
      processingTimes :+ processingTimeDiffInSec
    }
    // Even though the trigger was defined to 1 second, we can see that the data processing is not launched
    // when there is no new data to process. So logically we should't find any difference
    // of 1 second between trigger subsequent executions for data processing
    processingTimes should not contain (1L)
  }
}

object Container {
  var processingTimes = new ListBuffer[Long]()

}
bartosz25 commented 5 years ago

Hello and thanks for your point. I've just pushed a fix for an assertion error thanks to it.

And to answer to your question. I've observed this kind of behavior and it's the reason why I don't want to assert on strict time processing difference. IMO the behavior is the race condition between stream ingestion and query execution beginning. When the query begins its real execution it already has 2 records to process. And Spark processes them as micro-batches with 2 seconds of difference. If you execute this code for much longer than 30 seconds and you print processingTimes you'll see that the difference is always something between 4 and 5 seconds for all records except 2 first, like here:

processing times ListBuffer(3, 4, 4, 5, 4, 5, 5, 4, 5, 4, 5, 4, 5, 4, 5, 5, 4, 4, 5, 5, 4, 4, 4)
bithw1 commented 5 years ago

Thanks @bartosz25 for the helpful answer, I understood.