NASA-IMPACT / veda-pforge-job-runner

Apache Beam + EMR Serverless Job Runner for Pangeo Forge Recipes
2 stars 2 forks source link

Memory Leak Investigation #32

Closed ranchodeluxe closed 6 months ago

ranchodeluxe commented 6 months ago

Problem

A dependency of pangeo-forge-recipes might be producing a memory leak based on the memory distribution we see during job runs.

Next Steps (all can be done in parallel)

  1. run recipes in GCP Dataflow (an over-provisioned instance) and see if we see the same memory pattern

  2. profile the jobs with the DirectRunner locally using Scalene or see what we can do with apache-beam about profiling as this issue talks about

  3. It's a guess, but this might have to deal with file openers and pickling. Anyhow some issues that might give us a bearing:

https://github.com/ecmwf/cfgrib/issues/283 https://github.com/fsspec/filesystem_spec/issues/825 https://github.com/apache/beam/issues/28246#issuecomment-1918082973

Distributions

~7k time steps of MURSST data (WriteCombinedReference workflow):

https://github.com/developmentseed/pangeo-forge-staging/tree/mursst-kerchunk/recipes/mursst

Screen Shot 2024-02-03 at 10 17 49 AM

~5k time steps of GPM IMERG data (WriteCombinedReference workflow):

https://github.com/developmentseed/pangeo-forge-staging/tree/gpm_imerg/recipes/gpm_imerg

Screen Shot 2024-02-04 at 6 39 13 AM
~20 yearly time steps of LEAP data (StoreToZarr workflow):

https://github.com/ranchodeluxe/leap-pgf-example/tree/main/feedstock

Screen Shot 2024-02-05 at 12 45 02 PM
ranchodeluxe commented 6 months ago

If you look at the LEAP run above you'll see the difference. Yes, it uses a ton of memory during rechunking but then levels out so this means it's only our kerchunk worfklows that show the bad memory pattern and we can start whittling it down further

ranchodeluxe commented 6 months ago

I was able to get memray giving feedback using --live mode about heap allocations and which parts of the workflow owns those allocations. To get back good info you need to run at least a three year subset of GPM IMERG data. At different parts of the job within CombineReferences we see different parts of fsspec and kerchunk own > 50% of the allocations and memory on the heap but they seem to be cleaned up fine

Job (configured to run as one worker process on DirectRunner)

class PrintKeyValueFn(beam.DoFn):
    def process(self, element):
        for mapper in element:
            print(f"{mapper}")
            #for key, value in mapper.items():
            #    print(f"Key: {key}, Value: {value}")

from apache_beam.options.pipeline_options import PipelineOptions
import argparse
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args()

with beam.Pipeline(argv=pipeline_args) as p:
    (
        p 
        | beam.Create(pattern.items()) 
        | OpenWithKerchunk(
            remote_protocol=earthdata_protocol,
            file_type=pattern.file_type,
            # lat/lon are around 5k, this is the best option for forcing kerchunk to inline them
            inline_threshold=6000,
            storage_options=fsspec_open_kwargs,
        ) 
        | CombineReferences(
            concat_dims=CONCAT_DIMS, 
            identical_dims=IDENTICAL_DIMS, 
            target_options=target_fsspec_open_kwargs, 
            remote_options=target_fsspec_open_kwargs, 
            remote_protocol='s3', 
            mzz_kwargs={}, 
            precombine_inputs=False
        ) | "Print Key-Value Pairs" >> beam.ParDo(PrintKeyValueFn())
    )

JH Memory Distribution

Screen Shot 2024-02-07 at 7 53 04 AM

Sorted by Own (late in the job)

Screen Shot 2024-02-07 at 7 53 12 AM

Sorted by Total (late in the job)

Screen Shot 2024-02-07 at 7 53 25 AM

Notable Other Theories

@norlandrhagen is correct that the biggest contributor is the result of the huge reduce we do in memory on the whole PCollection. So breaking this up with parallelism across distributed machines will help but will still require that we provision larger instances than we expected

https://beam.apache.org/documentation/transforms/java/aggregation/combine/

A user-defined CombineFn may be applied to combine all elements in a PCollection (global combine) or to combine all elements associated with each key

☝️ transforms.CombineReferences uses beam.CombineGlobally( CombineMultiZarrToZarr())

norlandrhagen commented 6 months ago

The CombineReferences seems like the culprit to me. As far as I understand it, it takes in references as they are generated and accumulates them with add_input and then combines them with MultiZarrToZarr. Since this is ran with a ombineGlobally, I think this transform wants accumulate (and multizarrtozarr-ify) all the references before it passes the MultiZarrToZarr object to the WriteReferences Transform.

From how I understand it, the StoreToZarr recipe builds an Xarray schema, writes and empty zarr and then inserts dataset chunks into that store as they arrive, which isn't blocking.

Maybe we can rewrite the Kerchunk pipeline to act in a similar way vis-a-vis appending?

moradology commented 6 months ago

Been looking at this for a few minutes and trying to get a handle on the various pieces implicated by the CombineReferences function. I think @norlandrhagen's right that the global combine here is worth looking at. If the issue is excess shuffle (and associated memory pressure), it may be worth breaking this up into smaller pieces and that might be a smaller lift than rewriting the zarr writing function to pre-allocate disk and write out in windows

Perhaps something like this for CombineReferences:

    def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
        pcoll = pcoll | 'PreCombine' >> beam.groupIntoBatches | beam.ParDo (batch + combine per worker)
        return pcoll | 'CombineGlobally' >> beam.CombineGlobally(CombineMultiZarrToZarr(...))

I'm also seeing a pre_combine flag - it doesn't look to me like it quite does the above but I'm not confident about kerchunk/multizarrtozarr behavior to be honest

ranchodeluxe commented 6 months ago

Good stuff in here to review later but closing for now