memsql / singlestore-spark-connector

A connector for SingleStore and Spark
Apache License 2.0
160 stars 54 forks source link

Parallel Read from Aggregators not working in Aws Glue #85

Closed akshaysyaduvanshi closed 6 months ago

akshaysyaduvanshi commented 7 months ago

SingleStore DB Version: 7.6.5 Aws Glue Version 3.0 Marketplace SingleStore connector for AWS Glue : 4.0.0

i am loading the data like below

sparkSession.read.format("singlestore") .option("ddlEndpoint", "") .option("database", "") .option("user", "") .option("password", "") .option("query", "simple select query") .option("enableParallelRead", "forced") .option("parallelRead.Features", "readFromAggregators") .option("parallelRead.repartition", "true") .option("parallelRead.repartition.columns", "col1, col2") .option("parallelRead.maxNumPartitions", "100") .option("parallelRead.TableCreationTimeout", "1000") .load()

I have enabled trace logs for singlestore spark connector and i am getting below trace message which is disabling parallelism

TRACE SinglestorePartitioner: readFromAggregators disabled for this query: maximum number of concurrent tasks that can be launched in the cluster is 0 when the required amount is 32

I checked the connector code which is returning 0 and below code is causing this

object MaxNumConcurrentTasks { def get(rdd: RDD[]): Int = { val (, resourceProfiles) = rdd.sparkContext.dagScheduler.getShuffleDependenciesAndResourceProfiles(rdd) val resourceProfile = rdd.sparkContext.dagScheduler.mergeResourceProfilesForStage(resourceProfiles) rdd.sparkContext.maxNumConcurrentTasks(resourceProfile) } }

Can someone please help here? Am I missing any config while loading the data from singlestore?

AdalbertMemSQL commented 7 months ago

This may happen because AWS Glue adds executors to Spark dynamically. readFromAggregators mode checks that it has enough resources to start reading from all SingleStore partitions at the same time (this is a requirement of non-materialized parallel read). You can try using readFromAggregatorsMaterialized. It should work well in this situation.

BTW, what version of the SingleStore Spark connector are you using? If I remember correctly, the latest version handles this situation in another way.

akshaysyaduvanshi commented 7 months ago

I am using aws marketplace singelstore connector , which has a latest version of 4.0.0, there is no update to 4.1.5 on Aws marketplace yet.

https://aws.amazon.com/marketplace/pp/prodview-yb46ffbibyjj2?sr=0-1&ref_=beagle&applicationId=GlueStudio#pdp-support

Problem with using readFromAggregatorsMaterialized is I am loading lot of data in this query (~2 billion rows) which is resulting in aggregators going OOM.

AdalbertMemSQL commented 7 months ago

Oh, I see. I will figure out how to publish never version of the connector to aws glue. But most likely, it won’t resolve the issue.

Do you have auto-scaling enabled in aws glue? Can you try to disable it and configure spark to have enough workers?

Regarding OOM - it may also happen because of the lack of memory on the leaf nodes. I observed situations when scaling leaves helped with it.

akshaysyaduvanshi commented 7 months ago

Thanks. I will watch out for newer version in case it helps.

Autoscaling is disabled on glue and i have setup around 50 G.2X worker, which should be enough for 32 executors i guess.

Leaf nodes have 128 GB of memory , I will see if increasing that helps.

AdalbertMemSQL commented 7 months ago

Unfortunately, the newer version of the connector won't resolve this issue. SingleStore Spark connector uses barrier execution mode for readFromAggregators (it is needed to guarantee that all reading tasks are started at the same time). This mode doesn't support Dynamic Allocation of Executors which AWS Glue enables by default. https://books.japila.pl/apache-spark-internals/barrier-execution-mode/ https://books.japila.pl/apache-spark-internals/dynamic-allocation/

I'm going to investigate if it is possible to disable dynamic executor allocation in AWS Glue.

AdalbertMemSQL commented 7 months ago

It occurs that even when Dynamic Allocation of Executors is disabled, the SingleStore Spark connector fails to get information about the number of free task slots in the Spark cluster. I will try to workaround it by adding a parameter to the connector which will allow to specify this value.

akshaysyaduvanshi commented 7 months ago

Thanks @AdalbertMemSQL.

AdalbertMemSQL commented 7 months ago

Hey @akshaysyaduvanshi I created a demo version of the connector with a new parameter Here is a link for AWS Glue https://console.aws.amazon.com/gluestudio/home?region=us-east-1#/connector/add-connection?connectorName=%22SingleStore%20connector%20for%20AWS%20Glue%22&connectorType=%22Spark%22&connectorDescription=%22The%20SingleStore%20connector%20for%20AWS%20Glue%20allows%20you%20to%20connect%20to%20data%20in%20AWS%20Glue%22&connectorUrl=%22https://709825985650.dkr.ecr.us-east-1.amazonaws.com/singlestore/marketplace-spark:4.1.6-beta3%22&connectorVersion=%224.1.6-beta3%22&connectorClassName=%22com.singlestore.spark%22

Can you please try it and let me know, if it works well? The new parameter is parallelRead.numPartitions and in your case, it should be set to 32. Note 1: this version of the connector works only with AWS Glue 3 (with AWS Glue 4 it will throw an error) Note 2: this demo version is not intended to be used in a production environment.

akshaysyaduvanshi commented 7 months ago

Hi @AdalbertMemSQL I tried with the above connector but I am getting below error. I think glue is not able to fetch the required jars for this connector

Exception in User Class: java.lang.ClassNotFoundException : Failed to find data source: singlestore. Please find packages at http://spark.apache.org/third-party-projects.html

Caused by: java.lang.ClassNotFoundException: singlestore.DefaultSource

AdalbertMemSQL commented 7 months ago

Hmm... Can you try using format("com.singlestore.spark") instead of format("singlestore")?

akshaysyaduvanshi commented 7 months ago

It works with format("com.singlestore.spark"), job started reading in parallel so this is working

AdalbertMemSQL commented 7 months ago

Nice! I will work on getting these changes reviewed. The release will be available somewhere early next week.

akshaysyaduvanshi commented 7 months ago

@AdalbertMemSQL Thanks for all the help.

akshaysyaduvanshi commented 7 months ago

@AdalbertMemSQL I hope this will also get released on aws marketplace next week.

AdalbertMemSQL commented 6 months ago

The release of SingleStore Spark Connector is ready. The release of the SingleStore AWS Glue connector has been submitted and has an "Under review" status

AdalbertMemSQL commented 6 months ago

@akshaysyaduvanshi 4.1.6 version of AWS Glue connector should be available

akshaysyaduvanshi commented 6 months ago

@AdalbertMemSQL Thanks. I will upgrade the connector version.