Open-EO / openeo-geopyspark-driver

OpenEO driver for GeoPySpark (Geotrellis)
Apache License 2.0
26 stars 4 forks source link

load_stac: support loading unfinished results #489

Closed jdries closed 1 year ago

jdries commented 1 year ago

Using load_stac, we can normally also load openEO generated results provided by a signed url. Using the 'partial' query parameter, it seems possible to get a canonical link to a job that is still running: https://api.openeo.org/#tag/Data-Processing/operation/list-results (support for 'partial' needs to be added to our backend)

So if our backend receives a process graph with a load_stac, it should only start the actual processing when all dependencies are finished! This is an important piece of the puzzle for federated processing, because it allows the aggregator to perform job splitting without keeping track of the various jobs itself: it can just schedule all jobs and forward them to the respective backends. Hence it also no longer needs a long lived token.

bossie commented 1 year ago

As discussed: works similarly to SHub batch process polling.

SHub batch processes: when starting a batch job, detect a load_collection(sh_collection, big_bbox) -> schedule corresponding batch processes, poll asynchronously and start the batch job if all dependencies have "status" == "DONE"

load_stac: when starting a batch job, detect a load_stac(canonical_partial_job_results) -> if "openeo:status" == "running", poll asynchronously and start the batch job if all dependencies have "openeo:status" == "finished".

bossie commented 1 year ago

"openeo:status" is in the root of the STAC Collection object.

Some considerations/decisions that make sense at this time but might need revisiting:

bossie commented 1 year ago

Currently implemented as such:

bossie commented 1 year ago

Did some testing and loading partial job results from OpenEO dev takes a very long time (but not always):

requests.exceptions.ConnectionError: HTTPSConnectionPool(host='openeo-dev.vito.be', port=443): Max retries exceeded with url: /openeo/jobs/j-0977e9aed9da431f872b716c20e500cf/results/...?expires=1693901122&partial=true (Caused by ReadTimeoutError("HTTPSConnectionPool(host='openeo-dev.vito.be', port=443): Read timed out. (read timeout=60)"))
bossie commented 1 year ago

Not sure what the deal is with:

  1. creating job A on openeo-dev but deliberately not starting it (= openeo:status: running);
  2. creating job B on openeo-dev that load_stac's the canonical URL of A's partial results and starting it.

In step 2, the web app driver will attempt to fetch A's openeo:status but this request never gets a response. Requests from outside of the application (think: curl) will also be stalled. :thinking:

bossie commented 1 year ago

If I load_stac a cdse-staging job on openeo-dev, things behave better.

1) create original job A on cdse-staging (back-end needs to support ?partial and have a public URL) but don't start it:

{
  "process_graph": {
    "load1": {
      "arguments": {
        "id": "SENTINEL2_L2A",
        "spatial_extent": {
          "coordinates": [
            [
              [
                14.20922527067026,
                40.855657765536336
              ],
              [
                14.20922527067026,
                40.95056915081699
              ],
              [
                14.316342442933973,
                40.95056915081699
              ],
              [
                14.316342442933973,
                40.855657765536336
              ],
              [
                14.20922527067026,
                40.855657765536336
              ]
            ]
          ],
          "type": "Polygon"
        },
        "temporal_extent": [
          "2022-04-17T00:00:00Z",
          "2022-04-17T00:00:00Z"
        ]
      },
      "process_id": "load_collection"
    },
    "save2": {
      "arguments": {
        "data": {
          "from_node": "load1"
        },
        "format": "GTIFF"
      },
      "process_id": "save_result",
      "result": true
    }
  }
}

2) get the canonical partial URL of the results of A 3) create and start job B on openeo-dev (needs to support async_task) that load_stac's A:

{
  "process_graph": {
    "load1": {
      "arguments": {
        "url": "https://openeo-staging.dataspace.copernicus.eu/openeo/1.1/jobs/j-735dfda9c31849efba3895cf8b8cf64c/results/...&partial=true"
      },
      "process_id": "load_stac"
    },
    "save2": {
      "arguments": {
        "data": {
          "from_node": "load1"
        },
        "format": "GTiff"
      },
      "process_id": "save_result",
      "result": true
    }
  }
}

4) start job A and B proceeds as soon as A is done:

OpenEO batch job results statuses for batch job j-7c5e66fbba9f402b9ce7c19f3d42c29c: {'https://openeo-staging.dataspace.copernicus.eu/openeo/1.1/jobs/j-735dfda9c31849efba3895cf8b8cf64c/results/...&partial=true': 'running'} ... OpenEO batch job results statuses for batch job j-7c5e66fbba9f402b9ce7c19f3d42c29c: {'https://openeo-staging.dataspace.copernicus.eu/openeo/1.1/jobs/j-735dfda9c31849efba3895cf8b8cf64c/results/ZGY3ZWE0NWQtZWNjNC00NTNmLThhZjktZGU4Y2ZiMTA1OGIx/...&partial=true': None} ... Submitting job with command ['/opt/venv/lib64/python3.8/site-packages/openeogeotrellis/deploy/submit_batch_job_spark3.sh', 'openEO batch_test_debug_load_stac_partial_job_results: load_stac_j-7c5e66fbba9f402b9ce7c19f3d42c29c_user 7d523381374b62e6f2aad1f2f9fb6eddf624d382a8f71db6dcf5788e3aae0af3@egi.eu', '/data/projects/OpenEO/j-7c5e66fbba9f402b9ce7c19f3d42c29c_rj0d0rou.in', '/data/projects/OpenEO/j-7c5e66fbba9f402b9ce7c19f3d42c29c', 'out', 'log', 'job_metadata.json', 'openeo@VGT.VITO.BE', 'openeo.keytab', 'vdboschj', '1.1.0', '8G', '2G', '3G', '5', '2', '2G', 'default', 'false', '[]', 'custom_processes.py', '100', '7d523381374b62e6f2aad1f2f9fb6eddf624d382a8f71db6dcf5788e3aae0af3@egi.eu', 'j-7c5e66fbba9f402b9ce7c19f3d42c29c', '0.0', '1', 'default', '/data/projects/OpenEO/j-7c5e66fbba9f402b9ce7c19f3d42c29c_rqtx6d0z.properties', '', 'INFO', '']

However, B does not yet run to completion; there are 2 errors in its logs:

OpenEO batch job failed: java.lang.IllegalArgumentException: requirement failed: Server doesn't support ranged byte reads

Traceback (most recent call last):
  File "batch_job.py", line 1291, in <module>
    main(sys.argv)
  File "batch_job.py", line 1028, in main
    run_driver()
  File "batch_job.py", line 999, in run_driver
    run_job(
  File "/opt/venv/lib64/python3.8/site-packages/openeogeotrellis/utils.py", line 52, in memory_logging_wrapper
    return function(*args, **kwargs)
  File "batch_job.py", line 1092, in run_job
    result = ProcessGraphDeserializer.evaluate(process_graph, env=env, do_dry_run=tracer)
  File "/opt/venv/lib64/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 348, in evaluate
    result = convert_node(result_node, env=env)
  File "/opt/venv/lib64/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 368, in convert_node
    process_result = apply_process(process_id=process_id, args=processGraph.get('arguments', {}),
  File "/opt/venv/lib64/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 1480, in apply_process
    args = {name: convert_node(expr, env=env) for (name, expr) in sorted(args.items())}
  File "/opt/venv/lib64/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 1480, in <dictcomp>
    args = {name: convert_node(expr, env=env) for (name, expr) in sorted(args.items())}
  File "/opt/venv/lib64/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 380, in convert_node
    return convert_node(processGraph['node'], env=env)
  File "/opt/venv/lib64/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 368, in convert_node
    process_result = apply_process(process_id=process_id, args=processGraph.get('arguments', {}),
  File "/opt/venv/lib64/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 1512, in apply_process
    return process_function(args=ProcessArgs(args, process_id=process_id), env=env)
  File "/opt/venv/lib64/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 2091, in load_stac
    return env.backend_implementation.load_stac(url=url, load_params=load_params, env=env)
  File "/opt/venv/lib64/python3.8/site-packages/openeogeotrellis/backend.py", line 1078, in load_stac
    pyramid = pyramid_factory.datacube_seq(projected_polygons, from_date, to_date, metadata_properties,
  File "/opt/spark3_4_0/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
  File "/opt/spark3_4_0/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o1635.datacube_seq.
: java.io.IOException: Exception while determining data type of collection https://openeo-staging.dataspace.copernicus.eu/openeo/1.1/jobs/j-735dfda9c31849efba3895cf8b8cf64c/results/...&partial=true and item https://openeo-staging.dataspace.copernicus.eu/openeo/1.1/jobs/j-735dfda9c31849efba3895cf8b8cf64c/results/assets/.../openEO_2022-04-17Z.tif?expires=1694006445. Detailed message: requirement failed: Server doesn't support ranged byte reads
    at org.openeo.geotrellis.layers.FileLayerProvider.determineCelltype(FileLayerProvider.scala:662)
    at org.openeo.geotrellis.layers.FileLayerProvider.readKeysToRasterSources(FileLayerProvider.scala:690)
    at org.openeo.geotrellis.layers.FileLayerProvider.readMultibandTileLayer(FileLayerProvider.scala:862)
    at org.openeo.geotrellis.file.PyramidFactory.datacube(PyramidFactory.scala:111)
    at org.openeo.geotrellis.file.PyramidFactory.datacube_seq(PyramidFactory.scala:84)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalArgumentException: requirement failed: Server doesn't support ranged byte reads
    at scala.Predef$.require(Predef.scala:281)
    at org.openeo.geotrellis.CustomizableHttpRangeReader.totalLength$lzycompute(CustomizableHttpRangeReader.scala:31)
    at org.openeo.geotrellis.CustomizableHttpRangeReader.totalLength(CustomizableHttpRangeReader.scala:10)
    at geotrellis.util.StreamingByteReader.ensureChunk(StreamingByteReader.scala:109)
    at geotrellis.util.StreamingByteReader.get(StreamingByteReader.scala:130)
    at geotrellis.raster.io.geotiff.reader.GeoTiffInfo$.read(GeoTiffInfo.scala:127)
    at geotrellis.raster.io.geotiff.reader.GeoTiffReader$.readMultiband(GeoTiffReader.scala:211)
    at geotrellis.raster.geotiff.GeoTiffReprojectRasterSource.$anonfun$tiff$1(GeoTiffReprojectRasterSource.scala:46)
    at scala.Option.getOrElse(Option.scala:189)
    at geotrellis.raster.geotiff.GeoTiffReprojectRasterSource.tiff$lzycompute(GeoTiffReprojectRasterSource.scala:43)
    at geotrellis.raster.geotiff.GeoTiffReprojectRasterSource.tiff(GeoTiffReprojectRasterSource.scala:40)
    at geotrellis.raster.geotiff.GeoTiffReprojectRasterSource.$anonfun$cellType$1(GeoTiffReprojectRasterSource.scala:50)
    at scala.Option.getOrElse(Option.scala:189)
    at geotrellis.raster.geotiff.GeoTiffReprojectRasterSource.cellType(GeoTiffReprojectRasterSource.scala:50)
    at org.openeo.geotrellis.layers.BandCompositeRasterSource.$anonfun$cellType$1(FileLayerProvider.scala:79)
    at cats.data.NonEmptyList.map(NonEmptyList.scala:87)
    at org.openeo.geotrellis.layers.BandCompositeRasterSource.cellType(FileLayerProvider.scala:79)
    at org.openeo.geotrellis.layers.FileLayerProvider.determineCelltype(FileLayerProvider.scala:656)
    ... 16 more
Failed status sync for job_id='j-7c5e66fbba9f402b9ce7c19f3d42c29c': unexpected KeyError: 'batch_request_id'

Traceback (most recent call last):
  File "/opt/venv/lib64/python3.8/site-packages/openeogeotrellis/job_tracker_v2.py", line 383, in update_statuses
    self._sync_job_status(
  File "/opt/venv/lib64/python3.8/site-packages/openeogeotrellis/job_tracker_v2.py", line 460, in _sync_job_status
    dependency_sources = list(set(get_dependency_sources(job_info)))
  File "/opt/venv/lib64/python3.8/site-packages/openeogeotrellis/job_registry.py", line 433, in get_dependency_sources
    return [source for dependency in (job_info.get("dependencies") or []) for source in sources(dependency)]
  File "/opt/venv/lib64/python3.8/site-packages/openeogeotrellis/job_registry.py", line 433, in <listcomp>
    return [source for dependency in (job_info.get("dependencies") or []) for source in sources(dependency)]
  File "/opt/venv/lib64/python3.8/site-packages/openeogeotrellis/job_registry.py", line 427, in sources
    subfolder = dependency.get("subfolder") or dependency["batch_request_id"]
KeyError: 'batch_request_id
bossie commented 1 year ago

A load_stac on Terrascope from an unfinished job on CDSE works after the latest fixes. :partying_face:

bossie commented 1 year ago

With both jobs on Terrascope, it still hangs.

TimingLogger when starting the load_stac batch job (backend.py):

load_stac(https://openeo-dev.vito.be/openeo/1.1/jobs/j-dcc5727733a34956a34ca97a16ef57ef/results/...&partial=true): extract "openeo:status": start 2023-09-01 06:38:21.490097

TimingLogger in the subsequent get partial job results call (views.py):

backend_implementation.batch_jobs.get_job_info(job_id='j-dcc5727733a34956a34ca97a16ef57ef', user_id='7d523381374b62e6f2aad1f2f9fb6eddf624d382a8f71db6dcf5788e3aae0af3@egi.eu'): start 2023-09-01 06:39:14.346796

In both cases, there's no corresponding "end/elapsed" log.

Added some more logging to the implementation of get_job_info.

bossie commented 1 year ago

I think the mutex introduced in 656b7ceb2a658834ea374cdab1ac182bb2126843 leads to a deadlock.

GeoPySparkBackendImplementation is a singleton, therefore GpsBatchJobs is a singleton, therefore DoubleJobRegistry is a singleton, and the mutex prevents it from being used as a context manager in multiple threads at the same time.

The mutex is acquired for the whole of start_job, which includes an HTTP call to get the batch job details and that will also try to acquire the same mutex.

bossie commented 1 year ago

Confirmed and subsequently fixed by splitting up the DoubleJobRegistry context manager blocks in _start_job().

Why was this RLock introduced @soxofaan ?

bossie commented 1 year ago

Both original + load_stac jobs on Terrascope works as well.