ibis-project / ibis

the portable Python dataframe library
https://ibis-project.org
Apache License 2.0
4.43k stars 544 forks source link

[EPIC] add Spark streaming support #8868

Open mfatihaktas opened 2 months ago

mfatihaktas commented 2 months ago

Is your feature request related to a problem?

This meta-issue is for the Spark Streaming epic by which we plan to add spark streaming support in Ibis. This issue is meant to contain general information and notes that we collect while breaking down the epic. More specifically, we will use this doc to record and share

Once we have clarity on the high-level tasks, they will be assigned to individual owners, who will then create the corresponding Github issues.

High-level design decisions

We should

Initial exploration (Ongoing Done)

@chloeh13q has been experimenting with spark streaming. Initial findings:

Update: See comment below for the summary of the exploration outcome.

Breakdown into issues

As mentioned above, our initial understanding is that Spark SQL has the same syntax for both batch and streaming. This is why, anything at the intersection of these two should just work. Majority of the work in this epic will be to add support for streaming specific features in pyspark/streaming.

Note: The to-do list below includes all the tasks that need to finish to support spark streaming with enough feature support. We do not expect to complete all of these in a single quarter.

Note that the following streaming queries are not supported in Spark

Note: Issues created for the items above will be linked here.

What version of ibis are you running?

8.0.0

What backend(s) are you using, if any?

Spark

Code of Conduct

chloeh13q commented 2 months ago

I can't seem to be able to modify the description - can you add the window op GitHub issue? It's #8847

Going to add more some details here based on my investigation because I can't directly modify the issue. Also figured it may be easier to track discussions over time -

High-level observations & summaries

I'm going to use "spark streaming" below to refer to "spark in streaming mode" as opposed to the Spark Streaming API, which is no longer maintained in favor of the Spark Structured Streaming API.

We're looking to support Spark Structured Streaming in Spark SQL, rather than Dataframe API. This is because our current pyspark backend is a string-generating backend. This allows us to leverage existing work that we have done for spark batch.

Unfortunately Spark SQL for streaming is not well documented or widely used. I have not come across companies that use Spark SQL for streaming. Stackoverflow posts on Spark SQL for streaming are also sparse.

Ops

There are some operations not supported with streaming dataframes/datasets:

Sources

Sinks

Task breakdown

A list of tasks in order to support spark streaming:

My understanding is that over aggregation in the way that is supported by Flink needs to be accomplished with arbitrary stateful operations in spark streaming.

Example workflow

I have set up an streaming window aggregation example using pyspark. This example reads from an upstream Kafka source, computes a windowed aggregation, then writes the output into a downstream Kafka sink. I'm using a Kafka sink here because it's easy to set up and does not require spinning up additional infrastructure. This is very similar to the example that we were using for Flink and is (somewhat) representative of a real-world use case.

Detailed steps as follows (with notes):

  1. Connect to a Spark session:
from pyspark.sql import SparkSession

session = SparkSession.builder \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1")\
    .getOrCreate()

[NOTES]

  1. Define the upstream source table:
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructField, StructType, TimestampType, LongType, DoubleType, IntegerType

schema = StructType(
    [
        StructField('createTime', TimestampType(), True), 
        StructField('orderId', LongType(), True), 
        StructField('payAmount', DoubleType(), True), 
        StructField('payPlatform', IntegerType(), True), 
        StructField('provinceId', IntegerType(), True),
    ])

streaming_df = session.readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", "localhost:9092")\
    .option("subscribe", "payment_msg")\
    .option("startingOffsets","earliest")\
    .load()\
    .select(from_json(col("value").cast("string"), schema).alias("parsed_value"))\
    .select("parsed_value.*")\
    .withWatermark("createTime", "10 seconds")

[NOTES]

  1. Define a view on top of the parsed df in order to write SQL operations from here
streaming_df.createOrReplaceTempView("streaming_df")
  1. Write windowed aggregation in Spark SQL
    window_agg = session.sql("""
    SELECT
    window.start, window.end, provinceId, sum(payAmount) as totalPayAmount
    FROM streaming_df
    GROUP BY provinceId, window(createTime, '1 minute', '30 seconds')
    """)

[NOTES]

  1. Convert the output into JSON in order to be able to write it to a downstream Kafka sink (this is basically reversing what we did to read from the upstream source, i.e. we need to do the parsing ourselves)

This is how steps 4-5 would look like in a single query. I think when we compile it using Ibis it will be a two-step process most likely.

window_agg = session.sql("""
SELECT to_json(named_struct('start', window.start, 'end', window.end, 'provinceId', provinceId, 'totalPayAmount', sum(payAmount))) as value
    FROM streaming_df
    GROUP BY provinceId, window(createTime, '1 minute', '30 seconds')
""")
  1. Write to sink:
result_df = (window_agg
.writeStream
.outputMode("append")
.format("kafka")
.option("checkpointLocation", "checkpoint")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "sink")
.start())

[NOTES]

Helpful reads:

mfatihaktas commented 2 months ago

Weekly Update, 2024-04-03

mfatihaktas commented 2 months ago

Weekly Update, 2024-04-10

chloeh13q commented 2 months ago

Does not this refer to the same operation as windowing functions above?

Yes it's the same. I grouped everything under op logic

mfatihaktas commented 1 month ago

Weekly Update, 2024-04-25

chloeh13q commented 1 month ago

Weekly update, 5/2/2024

chloeh13q commented 1 month ago

Weekly update, 5/9/24

chloeh13q commented 1 month ago

Weekly update, 5/15/24

chloeh13q commented 3 weeks ago

Weekly update, 5/23/24

chloeh13q commented 2 weeks ago

Weekly update, 5/30/24