bartosz25 / spark-scala-playground

Sample processing code using Spark 2.1+ and Scala
50 stars 25 forks source link

Stream-static join are not stateful, so no state management is necessary #13

Closed bithw1 closed 5 years ago

bithw1 commented 5 years ago

Hi, @bartosz25

I am reading your posts about structured streaming with join, and from spark's official document about stream-static join http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#stream-static-joins,

it says:

Note that stream-static joins are not stateful, so no state management is necessary

I don't understand what it means and I write a simple test case which simply runs

select t1.id, count(t2.id) from t1 join t2 on t1.id = t2.id group by t1.id

Full code is:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FlatSpec, Matchers}
import scala.reflect.io.{File, Path}
class StreamJoin_Then_Aggregation_Test  extends FlatSpec with Matchers with BeforeAndAfterAll with BeforeAndAfter  {
  val sparkSession: SparkSession = SparkSession
    .builder()
    .config("spark.sql.shuffle.partitions", 1)
    .appName("Spark Structured Streaming state store")
    .master("local[1]").getOrCreate()
  import sparkSession.sqlContext.implicits._
  val input_dir = "D:/applog/spark/structuredstreaming/ids"
  before {
    File(input_dir + "/1.txt").writeAll(Seq("1", "2","3","4","2","1","6").mkString("\n"))
    File(input_dir + "/2.txt").writeAll(Seq("4", "1","8","4","9","5","6").mkString("\n"))
    File(input_dir + "/3.txt").writeAll(Seq("1", "2","3","4","2","1","6").mkString("\n"))
  }

  after {
    Path(input_dir).deleteRecursively()
  }

  "stream-static join and then aggregate" should "work" in {

     sparkSession.range(0,10).toDF("id").createOrReplaceTempView("t1")
    sparkSession.readStream
      .option("maxFilesPerTrigger", 1)
      .text(s"file:///$input_dir")
      .as[String]
      .map(_.toInt)
      .toDF("id")
      .createOrReplaceTempView("t2")

    val df = sparkSession.sql("select t1.id, count(t2.id) from t1 join t2 on t1.id = t2.id group by t1.id")
    val query = df.writeStream
      .format("console")
      .outputMode("complete")
      .trigger(Trigger.ProcessingTime("2 seconds"))
      .start()

    query.awaitTermination()
  }

}

I think this is a stateful stream-static join(aggregate after join), so I don't understand what the above note means. Could you please take a look and explain? thanks!

bartosz25 commented 5 years ago

Hi @bithw1 ,

In my understanding the notice says 2 things. The first one is that you don't need to worry about watermarks. The watermarks are there to clean up the state kept in memory if 2 stream-based sources are joined. It's because one of them can be really on late and maybe you will not want to accumulate several hours of data to make the joins.

In the case of batch-streaming it's different because from one side you've a static dataset which will never be late. On the other side you've a stream dataset which may be on late though but not from the batch Dataset perspective.

Another point implies that since there are no state, the aggregation will be made only on the streaming window and not on the whole streaming data.

Long story short, batch-stream join will (should) output the joined entries as soon as a match between static and dynamic dataset is found, only within the time of the processing window. With stream-stream joins and data retention are controlled by the watemark

Although, I'd like to see whether we can't override the behavior and use watermarks on batch-streaming joins ? Maybe I'll investigate it at the beginning of 2019.

Best regards, Bartosz.

bithw1 commented 5 years ago

Thanks @bartosz25 for the detail explanation. stream join is still a bit difficult for me to thoroughly understand it so far