JaneliaSciComp / BigStitcher-Spark

Running compute-intense parts of BigStitcher distributed
BSD 2-Clause "Simplified" License
18 stars 10 forks source link

OutOfMemoryError caused by creation of too many N5ImageLoader fetcher threads #4

Closed trautmane closed 2 years ago

trautmane commented 2 years ago

While working through issue 2 with a larger data set, @boazmohar discovered many OutOfMemoryError: unable to create new native thread exceptions in the worker logs. These exceptions are raised because parallelized RDDs create many N5ImageLoader instances like this one and each N5ImageLoader instance in turn creates Runtime.getRuntime().availableProcessors() fetcher threads.

I reduced some of the fetcher thread creation by reusing loaders in this commit. However, reusing loaders did not completely solve the problem.

I think the best solution is to parameterize the number of fetcher threads in the N5ImageLoader and then explicitly set fetcher thread counts in spark clients. This issue can remain open until that happens or until another long term solution is developed.

In the mean time as a work-around, overriding the default availableProcessors value with a -XX:ActiveProcessorCount=1 JVM directive seems to fix the problem.

More specifically, here are the spark-janelia flintstone.sh environment parameters I used to successfully process @boazmohar 's larger data set:

# --------------------------------------------------------------------
# Default Spark Setup (11 cores per worker)
# --------------------------------------------------------------------
export N_EXECUTORS_PER_NODE=2
export N_CORES_PER_EXECUTOR=5
export N_OVERHEAD_CORES_PER_WORKER=1
# Note: N_CORES_PER_WORKER=$(( (N_EXECUTORS_PER_NODE * N_CORES_PER_EXECUTOR) + N_OVERHEAD_CORES_PER_WORKER ))

# To distribute work evenly, recommended number of tasks/partitions is 3 times the number of cores.
export N_TASKS_PER_EXECUTOR_CORE=3

export N_CORES_DRIVER=1

# setting ActiveProcessorCount to 1 ensures Runtime.availableProcessors() returns 1
export SUBMIT_ARGS="--conf spark.executor.extraJavaOptions=-XX:ActiveProcessorCount=1"

With the limited active processor count and reusing loaders, no OutOfMemory exceptions occur and processing completes much faster. @boazmohar noted that with his original setup, it took 3.5 hours using a Spark cluster with 2011 cores. My run with the parameters above took 7 minutes using 2200 cores (on 200 11-core worker nodes). Boaz's original run might have had other configuration issues, so this isn't necessarily apples-to-apples. Nevertheless, my guess is that his performance was adversely affected by the fetcher thread problem.

Finally, @StephanPreibisch may want to revisit the getTransformedBoundingBox code and any other loading/reading to see if there are other options for reducing/reusing loaded data within the parallelized RDD loops. Broadcast variables might be suitable/helpful for this use case - but I'm not sure.

StephanPreibisch commented 2 years ago

Thanks so much Eric! We should clearly differentiate between fetching metadata (as I do in getTransformedBoundingBox) and actually reading the data, and you're right, we should be able to set the number of threads within the API. We should have a meeting with @tpietzsch to incorporate the necessary changes next year!

Happy new year :)

boazmohar commented 2 years ago

@StephanPreibisch So I confirmed that the error is solved with SUBMIT_ARGS="--conf spark.executor.extraJavaOptions=-XX:ActiveProcessorCount=1". I ran 2 volumes, and didn't see any error (yay!). But the speed up is not orders of magnitude. The difference was that @trautmane was using just the affine, so without the non-rigid. It is still faster, it now took ~2h, so definitely helps! What could be helpful for performance is maybe playing with the block size. For my volume at 128^3 I get ~130,000 blocks which also my downstream processing is having a hard time with.

I can also see it scales with the number of points selected for the non-rigid, in the second volume (same size) there are ~5x #potins and it runs ~5x slower (took 15h). I should probably open separate issues for each of these.

Thanks!!! Boaz

dpshepherd commented 2 years ago

Hi all,

We've tried running the current main branch as both a local Spark instance and spinning up a master-worker Spark instance on our 32 core / 1 TB RAM server.

We are trying to Affine fuse an image that will have ~600,000 blocks in the final N5. There are only translation transformations in the XML.

No matter how much RAM we allocate to local instance or driver/executors, eventually the execution hits the memory error noted in this thread. We are passing in the jvm flag as proposed in this issue.

We are quite excited to get this working, but unless there are some further suggestions we will wait a bit until the code updates are made.

Thanks!

trautmane commented 2 years ago

Hi Doug,

You may be encountering a different issue - it's hard to know without looking at more details. Would you mind creating a new separate GitHub issue and attaching/posting relevant log and exception information there?

Helpful starting details would be the explicit exception traceback you get, the specific driver and worker launch commands you used, and basic details about your running environment (Spark version, RAM and cores for driver and workers, local vs. stand-alone cluster, ...).
Hopefully, we'll be able to use that information to find/fix the problem you are having.

Finally, this specific fetcher thread issue is still a work-in-progress since we are waiting for @tpietzsch 's return from holidays to properly fix it.

Best, Eric

tpietzsch commented 2 years ago

We should have a meeting with @tpietzsch to incorporate the necessary changes next year!

Yes, let's set up a meeting!

A related problem/solution is that ideally multiple ImgLoaders should be able to share a FetcherTread pool and the associated queue (of blocks to load). This is something I wanted to look into anyway, and we can solve this simultaneously.

tpietzsch commented 2 years ago

I reduced some of the fetcher thread creation by reusing loaders in this commit. However, reusing loaders did not completely solve the problem.

Did you check that this actually reduces the number of created threads? This is very surprising to me. The dataLocal should always return the same instance for getSequenceDescription(), and that should in turn always return the same instance for getImgLoader(). What am I missing here?

trautmane commented 2 years ago

Good point @tpietzsch - I think I got lost while tracing instances and did not realize the imgLoader was held/cached in the dataLocal object. That would explain why my commit did not solve the problem and I needed to resort to the active processor count workaround. Let's discuss whether rolling back my unhelpful commit makes sense when we discuss other related changes at our meeting today.

tpietzsch commented 2 years ago

With https://github.com/bigdataviewer/bigdataviewer-core/pull/130 it is possible now to specify the number of threads via

spimData = new XmlIoSpimDataMinimal().load( xmlFilename, NUMBER_OF_THREADS );

For spark, because it does not involve visualization, I would just use NUMBER_OF_THREADS==0. @StephanPreibisch @trautmane Could you check whether this works?

tpietzsch commented 2 years ago

@StephanPreibisch @trautmane Could you check whether this works?

Ah... actually, you're using XmlIoSpimData2... https://github.com/PreibischLab/BigStitcher-Spark/blob/fa17fca84156d5f8560b0c50c8f7b9b32c89b997/src/main/java/net/preibisch/bigstitcher/spark/NonRigidFusionSpark.java#L102-L103

So for that, you would do it like

final XmlIoSpimData2 io = new XmlIoSpimData2( "" );
final SpimData2 data = io.load( xmlPath );
final BasicImgLoader imgLoader = data.getSequenceDescription().getImgLoader();
if ( imgLoader instanceof ViewerImgLoader )
    ( ( ViewerImgLoader ) imgLoader ).setNumFetcherThreads( NUMBER_OF_THREADS );
trautmane commented 2 years ago

Hi @tpietzsch, I finally got time to test your changes tonight (using NUMBER_OF_THREADS=0). After running @boazmohar 's small and big test cases, I did not see any OOM exceptions so I think your fix worked. Please merge and deploy your fix at your earliest convenience (along with the spim_data thread safety change). Once the updated bigdataviewer-core and spim_data packages are deployed, I'll update BigStitcher-Spark to use them. Thanks!