Open-EO / openeo-geotrellis-extensions

Java/Scala extensions for Geotrellis, for use with OpenEO GeoPySpark backend.
Apache License 2.0
5 stars 3 forks source link

netCDF writer out of driver memory #199

Closed jdries closed 9 months ago

jdries commented 1 year ago

Writing large netCDF's make the driver still go out of memory, which should not happen because we write in a streaming manner. Investigate if something is still suboptimal.

{
"process_graph": {
"loadco1": {
"arguments": {
"bands": [
"B03",
"B04",
"B02"
],
"id": "SENTINEL2_L2A",
"spatial_extent":
{ "east": 13.132874787754762, "north": 42.192832302555644, "south": 41.200957744003716, "west": 11.966782528409608 }

,
"temporal_extent": [
"2023-08-18T00:00:00Z",
null
]
},
"description": "Load the data, including the bands:\r\n- G = B03\r\n- R = B04\r\n- B = B02",
"process_id": "load_collection"
},
"reduce1": {
"arguments": {
"data":
{ "from_node": "loadco1" }

,
"dimension": "bands",
"reducer": {
"process_graph": {
"add1": {
"arguments": {
"x":
{ "from_node": "multip2" }

,
"y":
{ "from_node": "arraye2" }

},
"process_id": "add"
},
"add2": {
"arguments": {
"x":
{ "from_node": "add1" }

,
"y":
{ "from_node": "arraye3" }

},
"process_id": "add"
},
"arraye1": {
"arguments": {
"data":
{ "from_parameter": "data" }

,
"index": 0
},
"process_id": "array_element"
},
"arraye2": {
"arguments": {
"data":
{ "from_parameter": "data" }

,
"index": 1
},
"process_id": "array_element"
},
"arraye3": {
"arguments": {
"data":
{ "from_parameter": "data" }

,
"index": 2
},
"process_id": "array_element"
},
"divide1": {
"arguments": {
"x":
{ "from_node": "subtra2" }

,
"y":
{ "from_node": "add2" }

},
"process_id": "divide",
"result": true
},
"multip1": {
"arguments": {
"x": 2,
"y":
{ "from_node": "arraye1" }

},
"process_id": "multiply"
},
"multip2": {
"arguments": {
"x": 2,
"y":
{ "from_node": "arraye1" }

},
"process_id": "multiply"
},
"subtra1": {
"arguments": {
"x":
{ "from_node": "multip1" }

,
"y":
{ "from_node": "arraye2" }

},
"process_id": "subtract"
},
"subtra2": {
"arguments": {
"x":
{ "from_node": "subtra1" }

,
"y":
{ "from_node": "arraye3" }

},
"process_id": "subtract"
}
}
}
},
"description": "Compute the GLI (Green Leaf Index) for the bands dimension\r\nFormula: (2.0 * G - R - B) / (2.0 * G + R + B)",
"process_id": "reduce_dimension"
},
"savere1": {
"arguments": {
"data":
{ "from_node": "reduce1" }

,
"format": "NETCDF"
},
"description": "Store as NETCDF",
"process_id": "save_result",
"result": true
}
}
}
Dave0178 commented 10 months ago

Hello, I'm still unable to run this: s2_cube = connection.load_collection( "SENTINEL2_L2A", bands=["B04", "B03", "B02", "B08", "SCL"], temporal_extent=("2022-01-01", "2022-12-31"), spatial_extent={ 'west': -0.166208030000000, 'south': 11.825353154999900, 'east': 0.083791970000000, 'north': 12.075353154999900, "crs": "EPSG:4326", }, max_cloud_cover=50, )

Options pour augmenter la mémoire disponible

job_options = {"driver-memory": "16G"}

Modèle de chemin pour les fichiers GeoTIFF

output_pattern_tiff = "output/batch_job_nc_gtiff/.tif" output_pattern_netcdf = "output/batch_job_nc_gtiff/.nc"

Obtenir la liste des fichiers correspondant au modèle de chemin

tiff_files = glob.glob(output_pattern_tiff) netcdf_files = glob.glob(output_pattern_netcdf)

Vérifier si des fichiers GeoTIFF existent déjà

if not tiff_files:

# Sauvegarder le cube en format GeoTIFF
s2_cube = s2_cube.save_result(format="GTiff")

# Exécuter le batch job pour le cube en format GeoTIFF
job = s2_cube.execute_batch(title="TIFF Slice of S2 data", job_options=job_options)

# Télécharger les données en GeoTIFF dans le dossier spécifique
results = job.get_results()
results.download_files("output/batch_job_nc_gtiff")

else: print(f"Les fichiers suivants existent déjà: {', '.join(tiff_files)}. Pas de nouvelle sauvegarde.")

if not netcdf_files:

# Sauvegarder le cube en format NetCDF
s2_cube_netcdf = s2_cube.save_result(format="netCDF")

# Exécuter le batch job pour le cube en format NetCDF
job_netcdf = s2_cube_netcdf.execute_batch(title="NetCDF Slice of S2 data")

# Attendre un certain temps (par exemple, 10 secondes) avant de continuer
time.sleep(10)

# Attendre que le job se termine (notez que le temps d'attente dépendra de la durée du job)
job_netcdf.start_and_wait()

# Télécharger les données en NetCDF dans un dossier spécifique
results_netcdf = job_netcdf.get_results()
results_netcdf.download_files("output/batch_job_nc_gtiff")

else: print(f"Les fichiers suivants existent déjà: {', '.join(netcdf_files)}. Pas de nouvelle sauvegarde.")

I'm getting this message :" 0:09:48 Job 'j-5721ff00c1cc4c7fa4f642b6b97d93eb': error (progress N/A) Your batch job 'j-5721ff00c1cc4c7fa4f642b6b97d93eb' failed. Error logs: [{'id': '[1697904790296, 1023937]', 'time': '2023-10-21T16:13:10.296Z', 'level': 'error', 'message': "OpenEO batch job failed: Your batch job failed because the 'driver' used too much java memory. Consider increasing driver-memory or contact the developers to investigate."}] ". Is this still under invastigation?

EmileSonneveld commented 10 months ago

As @jdries mentioned. Maybe we can automatically determine the required data memory when requesting NetCDF?

jdries commented 10 months ago

yes, we can not determine memory exactly, but we now it should be increased for netcdf

jdries commented 10 months ago

@Dave0178 yes, we didn't plan it yet, but are pushing it higher on the roadmap, as it's annoying to have to change memory.

@EmileSonneveld we could try inserting netcdfFile.flush(), and hope that this dumps data to disk and frees memory.

jdries commented 10 months ago

We got one step further, it no longer fails while writing the netcdf, but while uploading to object storage:


  File "/opt/openeo/lib64/python3.8/site-packages/openeogeotrellis/deploy/batch_job.py", line 1278, in <module>
    main(sys.argv)
  File "/opt/openeo/lib64/python3.8/site-packages/openeogeotrellis/deploy/batch_job.py", line 1013, in main
    run_driver()
  File "/opt/openeo/lib64/python3.8/site-packages/openeogeotrellis/deploy/batch_job.py", line 984, in run_driver
    run_job(
  File "/opt/openeo/lib/python3.8/site-packages/openeogeotrellis/utils.py", line 54, in memory_logging_wrapper
    return function(*args, **kwargs)
  File "/opt/openeo/lib64/python3.8/site-packages/openeogeotrellis/deploy/batch_job.py", line 1121, in run_job
    the_assets_metadata = result.write_assets(str(output_file))
  File "/opt/openeo/lib/python3.8/site-packages/openeo_driver/save_result.py", line 113, in write_assets
    return self.cube.write_assets(filename=directory, format=self.format, format_options=self.options)
  File "/opt/openeo/lib/python3.8/site-packages/openeogeotrellis/geopysparkdatacube.py", line 1825, in write_assets
    asset_paths = get_jvm().org.openeo.geotrellis.netcdf.NetCDFRDDWriter.writeRasters(
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.openeo.geotrellis.netcdf.NetCDFRDDWriter.writeRasters.
: java.lang.OutOfMemoryError: Java heap space
    at java.base/java.util.Arrays.copyOf(Arrays.java:3745)
    at java.base/java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:120)
    at java.base/java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:95)
    at java.base/java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:156)
    at java.base/sun.net.www.http.PosterOutputStream.write(PosterOutputStream.java:78)
    at software.amazon.awssdk.utils.IoUtils.copy(IoUtils.java:113)
    at software.amazon.awssdk.utils.IoUtils.copy(IoUtils.java:99)
    at software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient$RequestCallable.lambda$null$0(UrlConnectionHttpClient.java:208)
    at software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient$RequestCallable$$Lambda$3191/0x00000008413af040.get(Unknown Source)
    at software.amazon.awssdk.utils.FunctionalUtils.lambda$safeSupplier$4(FunctionalUtils.java:108)
    at software.amazon.awssdk.utils.FunctionalUtils$$Lambda$1623/0x0000000840d67840.get(Unknown Source)
    at software.amazon.awssdk.utils.FunctionalUtils.invokeSafely(FunctionalUtils.java:136)
    at software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient$RequestCallable.lambda$call$1(UrlConnectionHttpClient.java:208)
    at software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient$RequestCallable$$Lambda$1866/0x0000000840e3e040.accept(Unknown Source)
    at java.base/java.util.Optional.ifPresent(Optional.java:183)
    at software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient$RequestCallable.call(UrlConnectionHttpClient.java:207)
    at software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient$RequestCallable.call(UrlConnectionHttpClient.java:193)
    at software.amazon.awssdk.core.internal.util.MetricUtils.measureDurationUnsafe(MetricUtils.java:64)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.executeHttpRequest(MakeHttpRequestStage.java:76)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:55)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:39)
    at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
    at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
    at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
    at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:73)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:77)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:39)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:50)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:36)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:64)
jdries commented 10 months ago

There's an example for large file upload, this is what we need: https://github.com/awsdocs/aws-doc-sdk-examples/blob/main/javav2/example_code/s3/src/main/java/com/example/s3/transfermanager/UploadFile.java

jdries commented 10 months ago

I committed it to a branch! Required AWS sdk upgrade, so have to see how that goes.

jdries commented 9 months ago

The issue with netcdf driver memory is resolved on staging instance, so ready for release. The original process graph in this issue still has another memory issue, but potentially caused by not having an end date. I created a separate issue for that: https://github.com/Open-EO/openeo-geotrellis-extensions/issues/238

jdries commented 9 months ago

Additional note: we now bump into the limitations of object storage, which limits the file size we can upload. At this point, we are talking about netCDF files of beyond 1GB, which are also increasingly hard to manage user side. In the original example, we would typically advice the user to apply cloud masking and conversion to byte to drastically reduce the file size, and avoid such large extractions.