Open-EO / openeo-geopyspark-driver

OpenEO driver for GeoPySpark (Geotrellis)
Apache License 2.0
26 stars 4 forks source link

udf_apply_feature_dataframe UDF in executor? #458

Open soxofaan opened 1 year ago

soxofaan commented 1 year ago

(I stumbled on this issue while working on #437 / https://github.com/Open-EO/openeo-python-driver/issues/197)

251 / #262 added parallelized UDF execution on vector cubes (udf_apply_feature_dataframe and udf_apply_udf_data entrypoints), as documented at https://github.com/Open-EO/openeo-geopyspark-driver/blob/1f0ad56cc749d9f3ade315a85f39f1200f74168c/docs/vectorcube-run_udf.md . The idea was to get parallelization and executor isolation automatically by using the pyspark.pandas with apply

However, it seems that a pyspark.pandas apply callback does not run in the executors, but just in the driver.

example snippet to illustrate:

import openeo
import openeo.processes
connection = openeo.connect("openeo.vito.be").authenticate_oidc()
cube = connection.load_collection(
    "TERRASCOPE_S2_TOC_V2",
    temporal_extent=["2023-03-01", "2023-03-20"],
    bands=["B02"],
)
geometries = {"type": "Polygon", "coordinates": [[[3.68, 51.04], [3.69, 51.04], [3.69, 51.05], [3.68, 51.05], [3.68, 51.04]]]}
aggregates = cube.aggregate_spatial(geometries=geometries, reducer="mean")
udf_code = """
import pandas as pd
import pyspark

def udf_apply_feature_dataframe(df: pd.DataFrame):
    # Executor detection based on pyspark.SparkContext._assert_on_driver
    in_executor = (pyspark.TaskContext.get() is not None)
    raise ValueError(f"{in_executor=}")
"""
processed = openeo.processes.run_udf(data=aggregates, udf=udf_code, runtime="Python")
connection.download(processed, outputfile="tmp.json")

This fails with: Internal: Server error: ValueError('in_executor=False') indicating the callback did not run in executor

soxofaan commented 1 year ago

as comparison, here is UDF usage with openeo apply, where the UDF does run in executor:

s2_cube = connection.load_collection(
    "TERRASCOPE_S2_TOC_V2",
    spatial_extent={"west": 4.00, "south": 51.00, "east": 4.01, "north": 51.01},
    temporal_extent=["2022-03-01", "2022-03-31"],
    bands=["B02"]
)
udf = openeo.UDF("""
import pyspark
from openeo.udf import XarrayDataCube

def apply_datacube(cube: XarrayDataCube, context: dict) -> XarrayDataCube:
    # Executor detection based on pyspark.SparkContext._assert_on_driver
    in_executor = (pyspark.TaskContext.get() is not None)
    raise ValueError(f"{in_executor=}")
""")
rescaled = s2_cube.apply(process=udf)
rescaled.download("udf-in-executor-apply_datacube-tmp.nc")

which fails with [500] Internal: Server error: UDF Exception during Spark execution: ... ValueError: in_executor=True indicating the UDF ran in an executor