Open bbsun92 opened 3 years ago
Hey @bbsun92, org.apache.arrow.vector.util.OversizedAllocationException
is caused by having hundreds of thousands of genotypes in an array, which is outside the bounds it expects by default. The solution is to lower the arrow batch size so fewer rows are processed per batch. The default configuration is 10,000 (docs)
1,000 phenotypes simultaneously is a lot! In internal testing we've not gone beyond 15-20 yet
Also, please ensure that the genotypes dataframe conforms to the schema expected by Glow, which expects the values_column ("n_alt") as an array of doubles
instead of ints
, cast the array as follows,
genotypes2 = genotypes2.withColumn("n_alt", genotypes2.n_alt.cast("array<double>"))
And please cast the index of the covariates, phenotypes and offsets to string,
phenotypes.index = phenotypes.index.map(str)
covariates.index = covariates.index.map(str)
offsets.index = offsets.index.map(str)
1,000 phenotypes is a lot, in internal testing we've not gone beyond 15-20 yet
yes it didn't work with 10 traits either.
ok, let me do some testing
Setting maxRecordsPerBatch
to 100 worked (failed at 1000)
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 100)
On a 256 core cluster, 2 phenotypes took 6 mins 20 phenotypes took 62 mins
Note, the spark executors will blow up,
Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues
On a 64 core cluster or when repartitioning to the size of the cluster,
Will file a ticket with the Spark team to address the scalability limitations on this use case.
I published a notebook to reproduce (link), please run in your own environment and confirm it works!
@bbsun92, Spark 3.1 has an upgraded version of Arrow (v2.0). https://issues.apache.org/jira/browse/SPARK-33213
Tested on Databricks Runtime 8.0, and it works for two phenotypes without needing to adjust the maxRecordsPerBatch
spark configuration.
Does it write out quickly (in terms of scalability and speed?) What cluster set-up was used? Would be nice to have the engineers work the magic they did with the genotype_states(genotypes).
yah so it looks like the issue is only partly alleviated by upgrading the spark + arrow versions.
For 2 phenotypes it works.
For 20 phenotypes I am getting a new failure,
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 37 in stage 32.0 failed 4 times, most recent failure: Lost task 37.3 in stage 32.0 (TID 966) (10.126.238.198 executor 7): java.net.SocketException: Connection reset
This is resolved by setting maxRecordsPerBatch
back to 100.
Let's keep this issue open for now.
On a 256 core cluster with maxRecordsPerBatch
of 100,
2 phenotypes took 6 mins
20 phenotypes took 62 mins
Hi,
I tried to run the glow gwas logistic regression but running into errors when writing out the results at the end:
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 11.0 failed 4 times, most recent failure: Lost task 3.3 in stage 11.0 (TID 1147, 10.248.243.193, executor 10): org.apache.arrow.vector.util.OversizedAllocationException: Memory required for vector capacity 264305678 is (2147483648), which is more than max allowed (2147483647)
Tried various large sized cluster set-up on databricks. It writes ok with very small sets but 1 or 2 phenos with 1000 "simulated counts" but crashes with bigger ones. Here's the sim data:
phenotypes = pd.DataFrame(np.random.randint(0,2,size=(400000, 1000)), columns=['P'+ str(i) for i in range(1000)]) phenotypes.index.name = "sampleid"
covariates = pd.DataFrame(np.random.random((400000, 20))) covariates.index.name = "sampleid" offset = pd.DataFrame(np.random.random(size=(400000, 1000)), columns=['P'+ str(i) for i in range(1000)]) offset.index.name = "sampleid"
genotypes = spark.createDataFrame([Row(contigName = "1",
start = 123456,
genotypes2 = genotypes.withColumn('n_alt', expr('explode(array_repeat(n_alt,1000))'))
results = glow.gwas.logistic_regression(genotype_df=genotypes2, phenotype_df=phenotypes, covariate_df=covariates, offset_df=offset, correction = 'approx-firth', pvalue_threshold = 1, values_column="n_alt")
results.write.mode('overwrite').parquet("test/res.parquet")
Any help would be great. Thanks!