Open cccs-jc opened 2 months ago
To use multiple sessions you would essentially move this part of your code into the start_streaming_query
function:
# Initialize Spark Session
spark = ( SparkSession.builder
.appName("streaming merg into")
.config("spark.sql.shuffle.partitions", "2")
.config("spark.log.level", "ERROR")
.getOrCreate()
)
You might also want to make some changes such as appending the thread ID to the app name.
@eric-maynard I have other examples where I do what you suggest. If I move this inside start_streaming_query
the builder will simply return the existing spark session getOrCreate
it will not create a new spark application.
When I do so I see the same behavior.
@eric-maynard I have other examples where I do what you suggest. If I move this inside
start_streaming_query
the builder will simply return the existing spark sessiongetOrCreate
it will not create a new spark application.When I do so I see the same behavior.
You need to create a new session object, not get the active one like so:
scala> spark.newSession()
res0: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@5d643896
Ha a new session.. is this possible in pyspark did not know you could create multiple sessions. Are multiple sessions within the same spark application (i.e.: using the same resources CPU/memory) used by an application.
Well what do you know in pyspark you can create a new session spark.newSession()
which will isolate views, UDF and have a separate SQLConf. For example
s1 = spark.newSession()
s2 = spark.newSession()
print(s1)
df1 = s1.sql("select 1")
df1.createOrReplaceTempView("view1")
s1.sql("show tables").show()
print(s2)
df2 = s2.sql("select 2")
df2.createOrReplaceTempView("view2")
s2.sql("show tables").show()
outputs
<pyspark.sql.session.SparkSession object at 0x7f50688726e0>
+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
| | view1| true|
+---------+---------+-----------+
<pyspark.sql.session.SparkSession object at 0x7f5068870040>
+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
| | view2| true|
+---------+---------+-----------+
When we modify the streaming MERGE INTO example above with separate sessions for each streaming query no more errors :-) Thank you @RussellSpitzer and @eric-maynard
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import time
table_name = <put target table name here>
# Initialize Spark Session
g_spark = ( SparkSession.builder
.appName("streaming merg into")
.config("spark.sql.shuffle.partitions", "2")
.config("spark.log.level", "ERROR")
.getOrCreate()
)
g_spark.sparkContext.setLogLevel("ERROR")
g_spark.sql(f"drop table if exists {table_name}")
g_spark.sql(f"""
create table if not exists {table_name} (
timestamp timestamp,
value long,
query_id integer
)
using iceberg
partitioned by (query_id)
tblproperties (
"commit.retry.num-retries"="25",
"write.delete.isolation-level"= "snapshot",
"write.update.mode"= "copy-on-write",
"write.update.isolation-level"= "snapshot",
"write.merge.mode"= "copy-on-write",
"write.merge.isolation-level"= "snapshot"
)
""")
g_spark.table(table_name).printSchema()
# Function to run a streaming query with foreachBatch
def start_streaming_query(q_spark, query_id):
# Define a write function using foreachBatch
def write_to_table(df, batch_id):
view_name = f"updates_{query_id}"
full_view_name = "global_temp." + view_name
q_spark.sql(f"uncache table if exists {full_view_name}")
df.createOrReplaceGlobalTempView(view_name)
q_spark.sql(f"cache table {full_view_name}")
df.show(truncate=False, n=10000)
# Execute the MERGE INTO statement
merge_query = f"""
MERGE INTO {table_name} AS t
USING {full_view_name} AS s
ON (t.query_id = s.query_id and t.value = s.value)
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
"""
num_retry = 0
while True:
try:
q_spark.sql(merge_query)
break
except Exception as e:
num_retry += 1
error_message = str(e)
print(f"Query {query_id} is retrying {num_retry} error was {error_message}")
if num_retry > 20:
raise e
# Simulate a streaming DataFrame
(
q_spark
.readStream
.format("rate")
.option("rowsPerSecond", 1)
.load()
.withColumn("query_id", F.lit(query_id))
.writeStream
.foreachBatch(write_to_table)
.outputMode("append")
.trigger(processingTime="10 seconds")
.start()
)
num_queries = 10 # Number of concurrent streaming queries
for i in range(num_queries):
start_streaming_query(g_spark.newSession(), i)
g_spark.streams.awaitAnyTermination()
g_spark.stop()
Query engine
Question
I’m trying to use the
MERGE INTO
statement within a Spark streaming application that runs several streaming queries. These queries all write to the same target table.I understand that the
MERGE INTO
statement performs a copy-on-write operation on files affected by updated or deleted rows. To prevent multiple queries from modifying the same files, I have partitioned the target table byquery_id
.Additionally, I’ve configured the target table with
snapshot
isolation, which is less strict than theserializable
isolation level.However, despite these precautions, the commit operation occasionally fails with the following error:
@RussellSpitzer explained on a Slack channel
My question is: How can I avoid this error when committing? Specifically, what does it mean to "use independent Spark sessions in each thread"?
I have attached a simple pyspark Spark application illustrating the problem and I'm curious how this example could be modified to prevent the error. I would I introduce "independent Spark sessions" in this example.
Is my issue related to this issue https://github.com/apache/iceberg/issues/11066#issue-2500468234 ?