Closed bossie closed 4 months ago
Some context.
Original job by Darius C. was as such:
# Now we extract the same input cube with openeo gfmap
import openeo
connection = openeo.connect("openeofed.dataspace.copernicus.eu").authenticate_oidc()
backend_context = BackendContext(Backend.FED)
EXTENT = dict(zip(["west", "south", "east", "north"], [5.318868004541495, 50.628576059801816, 5.3334400271343725, 50.637843899562576]))
EXTENT['crs'] = "EPSG:4326"
STARTDATE = '2022-01-01'
ENDDATE = '2022-03-31'
s2_cube = connection.load_collection("SENTINEL2_L2A", spatial_extent=EXTENT, temporal_extent=[STARTDATE, ENDDATE], bands=["B04"])
meteo_cube = connection.load_collection("AGERA5", spatial_extent=EXTENT, temporal_extent=[STARTDATE, ENDDATE], bands=["temperature-mean"])
s2_cube = s2_cube.aggregate_temporal_period(period='month', reducer='median', dimension='t')
meteo_cube = meteo_cube.aggregate_temporal_period(period='month', reducer='mean', dimension='t')
inputs = s2_cube.merge_cubes(meteo_cube)
job = inputs.create_job(
out_format="NetCDF",
title="Test extraction job",
job_options={
"split_strategy": "crossbackend",
"driver-memory": "2G",
"driver-memoryOverhead": "2G",
"driver-cores": "1",
"executor-memory": "1800m",
"executor-memoryOverhead": "1900m",
}
)
job.start_and_wait()
The openeofed aggregator split up this job between:
j-240528eeb7ce4b8bbd00e51746f0d01b
) andj-24052834d2d340839c3d047f60d4b8da
)where it essentially replaced the load_collection("AGERA5")
in the CDSE job with a load_stac("j-24052834d2d340839c3d047f60d4b8da?partial=true")
, with the intention that j-240528eeb7ce4b8bbd00e51746f0d01b
waits until j-24052834d2d340839c3d047f60d4b8da
results are available before resuming (#489).
Unfortunately,j-240528eeb7ce4b8bbd00e51746f0d01b
failed with this error:
load_stac from url 'https://openeo-cdse.vito.be/openeo/1.1/jobs/j-24052834d2d340839c3d047f60d4b8da/results/YmJlNzQ2YzAtMzRjYy00NTFiLTk5YzYtNDhjYTU1Y2RhMzQ5/0be8a4541d6c7a44e276d2c44b13655d?expires=1717501136&partial=true' with load params {'temporal_extent': ['1970-01-01', '2070-01-01'], 'spatial_extent': {}, 'global_extent': None, 'bands': None, 'properties': {}, 'aggregate_spatial_geometries': None, 'sar_backscatter': None, 'process_types': set(), 'custom_mask': {}, 'data_mask': None, 'target_crs': None, 'target_resolution': None, 'resample_method': 'near', 'pixel_buffer': None}
Traceback (most recent call last):
File "/opt/openeo/lib64/python3.8/site-packages/openeogeotrellis/deploy/batch_job.py", line 1374, in <module>
main(sys.argv)
File "/opt/openeo/lib64/python3.8/site-packages/openeogeotrellis/deploy/batch_job.py", line 1039, in main
run_driver()
File "/opt/openeo/lib64/python3.8/site-packages/openeogeotrellis/deploy/batch_job.py", line 1010, in run_driver
run_job(
File "/opt/openeo/lib/python3.8/site-packages/openeogeotrellis/utils.py", line 56, in memory_logging_wrapper
return function(*args, **kwargs)
File "/opt/openeo/lib64/python3.8/site-packages/openeogeotrellis/deploy/batch_job.py", line 1103, in run_job
result = ProcessGraphDeserializer.evaluate(process_graph, env=env, do_dry_run=tracer)
File "/opt/openeo/lib/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 377, in evaluate
result = convert_node(result_node, env=env)
File "/opt/openeo/lib/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 402, in convert_node
process_result = apply_process(process_id=process_id, args=processGraph.get('arguments', {}),
File "/opt/openeo/lib/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 1581, in apply_process
args = {name: convert_node(expr, env=env) for (name, expr) in sorted(args.items())}
File "/opt/openeo/lib/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 1581, in <dictcomp>
args = {name: convert_node(expr, env=env) for (name, expr) in sorted(args.items())}
File "/opt/openeo/lib/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 416, in convert_node
return convert_node(processGraph['node'], env=env)
File "/opt/openeo/lib/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 402, in convert_node
process_result = apply_process(process_id=process_id, args=processGraph.get('arguments', {}),
File "/opt/openeo/lib/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 1581, in apply_process
args = {name: convert_node(expr, env=env) for (name, expr) in sorted(args.items())}
File "/opt/openeo/lib/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 1581, in <dictcomp>
args = {name: convert_node(expr, env=env) for (name, expr) in sorted(args.items())}
File "/opt/openeo/lib/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 416, in convert_node
return convert_node(processGraph['node'], env=env)
File "/opt/openeo/lib/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 402, in convert_node
process_result = apply_process(process_id=process_id, args=processGraph.get('arguments', {}),
File "/opt/openeo/lib/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 1581, in apply_process
args = {name: convert_node(expr, env=env) for (name, expr) in sorted(args.items())}
File "/opt/openeo/lib/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 1581, in <dictcomp>
args = {name: convert_node(expr, env=env) for (name, expr) in sorted(args.items())}
File "/opt/openeo/lib/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 416, in convert_node
return convert_node(processGraph['node'], env=env)
File "/opt/openeo/lib/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 402, in convert_node
process_result = apply_process(process_id=process_id, args=processGraph.get('arguments', {}),
File "/opt/openeo/lib/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 1613, in apply_process
return process_function(args=ProcessArgs(args, process_id=process_id), env=env)
File "/opt/openeo/lib/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 2234, in load_stac
return env.backend_implementation.load_stac(url=url, load_params=load_params, env=env)
File "/opt/openeo/lib/python3.8/site-packages/openeogeotrellis/backend.py", line 760, in load_stac
return load_stac.load_stac(url, load_params, env, layer_properties={}, batch_jobs=self.batch_jobs)
File "/opt/openeo/lib/python3.8/site-packages/openeogeotrellis/load_stac.py", line 342, in load_stac
raise no_data_available_exception
openeo_driver.errors.OpenEOApiException: There is no data available for the given extents.
In the end, Terrascope job j-24052834d2d340839c3d047f60d4b8da
finished successfully and produced 89 STAC Items.
At this moment load_stac
is able to load its canonical URL into a data cube (without specifying spatial nor temporal extent) but apparently couldn't in the context of j-240528eeb7ce4b8bbd00e51746f0d01b
.
It should be noted that the dependent job ran on CDSE but this environment doesn't have the necessary infrastructure to poll its dependency job.
This seems to be the point where it decides that CDSE does not in fact support dependencies: https://github.com/Open-EO/openeo-geopyspark-driver/blob/facd8189384857f7b9cb4d1c1aff9645c16e4035/openeogeotrellis/backend.py#L1765-L1809
CDSE batch job j-240528eeb7ce4b8bbd00e51746f0d01b
therefore just proceeded with evaluating its process graph; at the point where it tried to load_stac("j-24052834d2d340839c3d047f60d4b8da?partial=true")
the latter's results were not yet computed and there were no STAC Items to load, hence the original error:
File "/opt/openeo/lib/python3.8/site-packages/openeogeotrellis/load_stac.py", line 342, in load_stac
raise no_data_available_exception
openeo_driver.errors.OpenEOApiException: There is no data available for the given extents.
This also explains why the CDSE job did not fail fast: it skipped putting the poll-message on Kafka altogether.
async_tasks_supported = not ConfigParams().is_kube_deploy
I was afraid there would be something like that somewhere in the code path.
I'm really not a fan of such bazooka-style configs
Reversing the arguments of merge_cubes
will run the main job on Terrascope but it will not get into the "running" state because of this EJR error upon starting the job:
inputs = meteo_cube.merge_cubes(s2_cube)
Traceback (most recent call last):
File "/opt/venv/lib64/python3.8/site-packages/flask/app.py", line 1484, in full_dispatch_request
rv = self.dispatch_request()
File "/opt/venv/lib64/python3.8/site-packages/flask/app.py", line 1469, in dispatch_request
return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)
File "/opt/venv/lib64/python3.8/site-packages/openeo_driver/users/auth.py", line 88, in decorated
return f(*args, **kwargs)
File "/opt/venv/lib64/python3.8/site-packages/openeo_driver/views.py", line 939, in queue_job
backend_implementation.batch_jobs.start_job(job_id=job_id, user=user)
File "/opt/venv/lib64/python3.8/site-packages/openeogeotrellis/backend.py", line 1745, in start_job
self._start_job(job_id, user, _get_vault_token)
File "/opt/venv/lib64/python3.8/site-packages/openeogeotrellis/backend.py", line 1812, in _start_job
dbl_registry.set_dependencies(
File "/opt/venv/lib64/python3.8/site-packages/openeogeotrellis/job_registry.py", line 851, in set_dependencies
self.elastic_job_registry.set_dependencies(
File "/opt/venv/lib64/python3.8/site-packages/openeo_driver/jobregistry.py", line 498, in set_dependencies
return self._update(job_id=job_id, data={"dependencies": dependencies})
File "/opt/venv/lib64/python3.8/site-packages/openeo_driver/jobregistry.py", line 493, in _update
return self._do_request("PATCH", f"/jobs/{job_id}", json=data)
File "/opt/venv/lib64/python3.8/site-packages/openeo_driver/jobregistry.py", line 330, in _do_request
raise EjrHttpError.from_response(response=response)
openeo_driver.jobregistry.EjrHttpError: EJR API error: 400 'Bad Request' on `PATCH 'https://jobregistry.vgt.vito.be/jobs/j-2405308c6a2b4f1197bc4b429e7199b2'`: {"statusCode":400,"message":["dependencies.0.property partial_job_results_url should not exist"],"error":"Bad Request"}
In this case, dependencies look like this:
[{'partial_job_results_url': 'https://openeo.dataspace.copernicus.eu/openeo/1.1/jobs/j-2405304298c14e609b85ffc70b5b6382/results/OTc2MGU1OWItNTY2ZC00MmQxLWI2NWItMzRlZDE4NzlkYThh/a21f52059a2d099abf9ff73539d4618b?expires=1717678778&partial=true'}]
It will happily accept dependencies that look like:
[{'collection_id': 'SENTINEL1_GRD', 'batch_request_ids': ['9ecd1d54-3b44-492f-9021-6eed00ea8a30'], 'results_location': 's3://openeo-sentinelhub/9ecd1d54-3b44-492f-9021-6eed00ea8a30', 'card4l': False}]
or even
[{'collection_id': 'SENTINEL1_GRD']
even though the ES mapping type for dependencies
is flattened
, like e.g. job_options
but that one does seem to allow arbitrary properties. :shrug:
Maybe @JanssenBrm can explain this?
Possible course of action:
partial_job_results_url
in ES: should work regardless of where the main job runs;async_task
on CDSE: consider polling from within the main batch job;async_task
on CDSE.A new version has been deployed that should fix this issue from EJR side point of view. Can you verify if you still see the bad request error?
EJR is fixed, dependencies are good.
This job ran to completion; the only difference is the order of the arguments of merge_cubes
so there's that:
connection = openeo.connect("openeofed.dataspace.copernicus.eu").authenticate_oidc()
EXTENT = dict(zip(["west", "south", "east", "north"],
[5.318868004541495, 50.628576059801816, 5.3334400271343725, 50.637843899562576]))
EXTENT['crs'] = "EPSG:4326"
STARTDATE = '2022-01-01'
ENDDATE = '2022-03-31'
s2_cube = connection.load_collection("SENTINEL2_L2A", spatial_extent=EXTENT,
temporal_extent=[STARTDATE, ENDDATE], bands=["B04"])
meteo_cube = connection.load_collection("AGERA5", spatial_extent=EXTENT, temporal_extent=[STARTDATE, ENDDATE],
bands=["temperature-mean"])
s2_cube = s2_cube.aggregate_temporal_period(period='month', reducer='median', dimension='t')
meteo_cube = meteo_cube.aggregate_temporal_period(period='month', reducer='mean', dimension='t')
inputs = meteo_cube.merge_cubes(s2_cube)
job = inputs.create_job(
out_format="NetCDF",
title="Test extraction job",
job_options={
"split_strategy": "crossbackend",
"driver-memory": "2G",
"driver-memoryOverhead": "2G",
"driver-cores": "1",
"executor-memory": "1800m",
"executor-memoryOverhead": "1900m",
}
)
job.start_and_wait()
To avoid confusion and wrong results in the short term, main jobs on CDSE will now fail fast with an error like this:
E openeo.rest.OpenEoApiError: [501] Internal: this backend does not support loading unfinished results from https://openeo.dataspace.copernicus.eu/openeo/1.2/jobs/j-240531df993e4a1eaf8f149ecd40e053/results/OTc2MGU1OWItNTY2ZC00MmQxLWI2NWItMzRlZDE4NzlkYThh/7fc817d5f9b343abd84d7b89acfe5bfe?expires=1717766014&partial=true with load_stac (ref: r-24053154a1224203a1a1e9b462da4797)
429 errors are being handled in https://github.com/Open-EO/openeo-geotrellis-extensions/issues/299.
Darius is unblocked so this became less urgent.
As discussed: as a first implementation, try polling from within the batch job. This then becomes a default implementation for which no extra infrastructure is required and effectively platform-agnostic.
Random ramblings to summarize things and refresh my memory.
In the case of OG SHub:
start_job
interprets load_collection(some_extent)
and schedules an appropriate SHub batch process to dump its results at a particular S3 "directory";async_task
;DONE
, async_task
submits a batch job on YARN, passing said S3 "directory" as a parameter;load_collection
to the S3 "directory" and load a data cube from there.In the case of unfinished job results:
start_job
interprets load_stac(partial_results_url)
; there is nothing to schedule because the dependency job is managed externally;async_task
;finished
: async_task
submits a batch job on YARN; no extra parameters are passed because unnecessary;load_stac(partial_results_url)
; this will succeed because the dependency job has finished while polling.The easiest way to do the polling in the main batch job seems to be to just poll partial_results_url
in the load_stac
method itself, actually loading a data cube if the dependency job eventually finishes, because:
load_stac
already has the partial_results_url
so there's no mapping involved;async_task
that did the polling in advance (like on YARN), nothing has to be changed in load_stac
: its polling will finish immediately so there's no additional delay.Notes:
async_task
essentially becomes a way to do the polling in a separate process rather than the batch job itself.load_stac
from actual partial results (load_stac
will await their completion instead), but that was also not the case in the current implementation (and would probably require an additional parameter to control this behavior too).Original job ran successfully on openeofed-staging (agg-pj-20240627-113805
).
The aggregator delegates a load_collection to Terrascope and then loads it with load_stac, but gives an error "NoDataAvailable". Adding spatial and temporal extent to the load_stac node in JSON makes the job come trough
meteo_cube = connection.load_stac("https://stac.openeo.vito.be/collections/agera5_daily", spatial_extent=EXTENT, temporal_extent=[STARTDATE, ENDDATE], bands=["2m_temperature_mean"])
gives a 429 error