Open-EO / openeo-geotrellis-extensions

Java/Scala extensions for Geotrellis, for use with OpenEO GeoPySpark backend.
Apache License 2.0
5 stars 3 forks source link

Better error messages for executor errors #115

Closed jdries closed 1 year ago

jdries commented 1 year ago

Errors for executors show up towards the user as:

  {
    "id": "[1673931603101, 438194]",
    "time": "2023-01-17T05:00:03.101Z",
    "level": "error",
    "message": "Exception in task 55.0 in stage 82.0 (TID 2035)"
  },

Can we translate this into something better, perhaps by using a spark listener?

The full stack trace is in elasticsearch, it could also be an option to filter out relevant messages from stack traces. This is full log content:

{
  "_index": "openeo-creo-index-1m-2022.10.18-000015",
  "_type": "_doc",
  "_id": "B_gZvoUBVWXUH_mWapCL",
  "_version": 1,
  "_score": null,
  "_source": {
    "process": 14,
    "@timestamp": "2023-01-17T05:00:13.553Z",
    "log": {
      "file": {
        "path": "/var/log/containers/job-541a56bc5f-dfa678cb9ab17f65d4f0-f10c0985be09161c-exec-14_spark-jobs_spark-kubernetes-executor-eb9a4a56e40119aad54a4a2d70cdaf96705339205ae9df2799f63c5d49fa4ed3.log"
      },
      "offset": 495456
    },
    "filename": "Logging.scala",
    "name": "org.apache.spark.executor.Executor",
    "host": {
      "architecture": "x86_64",
      "containerized": false,
      "name": "filebeat-filebeat-vbbtk",
      "os": {
        "version": "7 (Core)",
        "platform": "centos",
        "codename": "Core",
        "name": "CentOS Linux",
        "family": "redhat",
        "kernel": "4.18.0-348.2.1.el8_5.x86_64"
      },
      "hostname": "filebeat-filebeat-vbbtk"
    },
    "lineno": 94,
    "user_id": "dfa678cb9ab17f65d4f025e30fac5e0d90116176e44fd17d703419322747cbbd@egi.eu",
    "message": "Exception in task 40.3 in stage 82.0 (TID 2040)",
    "exc_info": "org.apache.spark.api.python.PythonException: Traceback (most recent call last):\n  File \"/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py\", line 619, in main\n    process()\n  File \"/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py\", line 611, in process\n    serializer.dump_stream(out_iter, outfile)\n  File \"/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py\", line 132, in dump_stream\n    for obj in iterator:\n  File \"/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py\", line 74, in wrapper\n    return f(*args, **kwargs)\n  File \"/opt/openeo/lib/python3.8/site-packages/openeogeotrellis/utils.py\", line 49, in memory_logging_wrapper\n    return function(*args, **kwargs)\n  File \"/opt/openeo/lib/python3.8/site-packages/epsel.py\", line 44, in wrapper\n    return _FUNCTION_POINTERS[key](*args, **kwargs)\n  File \"/opt/openeo/lib/python3.8/site-packages/epsel.py\", line 37, in first_time\n    return f(*args, **kwargs)\n  File \"/opt/openeo/lib/python3.8/site-packages/openeogeotrellis/geopysparkdatacube.py\", line 519, in tile_function\n    result_data = run_udf_code(code=udf_code, data=data)\n  File \"/opt/openeo/lib64/python3.8/site-packages/openeo/udf/run_code.py\", line 175, in run_udf_code\n    result_cube = func(data.get_datacube_list()[0], data.user_context)\n  File \"<string>\", line 326, in apply_datacube\n  File \"<string>\", line 289, in classify\n  File \"<string>\", line 112, in _make_prediction\n  File \"tmp/venv/vito_crop_classification/predict.py\", line 46, in main\n    return run_inference(\n  File \"tmp/venv/vito_crop_classification/inference/main.py\", line 87, in main\n    pred_df = predictor(df, transform=transform)\n  File \"tmp/venv/vito_crop_classification/inference/predictor.py\", line 77, in __call__\n    batch_embs = self.model.enc(df_seg)\n  File \"tmp/venv/vito_crop_classification/model/encoders/base.py\", line 67, in __call__\n    return self.forward_process(self.preprocess_df(df))\n  File \"tmp/venv/vito_crop_classification/model/encoders/concatenate.py\", line 85, in preprocess_df\n    results = [[r] for r in enc.preprocess_df(df)]\n  File \"tmp/venv/vito_crop_classification/model/encoders/base.py\", line 76, in preprocess_df\n    return self._process_f(df)\n  File \"tmp/venv/vito_crop_classification/model/encoders/utils.py\", line 25, in extract_sc\n    return sc_to_tensor(df=df, cols=cols)\n  File \"tmp/venv/vito_crop_classification/utils.py\", line 69, in sc_to_tensor\n    cols = [c for c in cols if type(df[c].iloc[0]) != str] if cols is not None else df.columns\n  File \"tmp/venv/vito_crop_classification/utils.py\", line 69, in <listcomp>\n    cols = [c for c in cols if type(df[c].iloc[0]) != str] if cols is not None else df.columns\n  File \"/opt/openeo/lib64/python3.8/site-packages/pandas/core/indexing.py\", line 931, in __getitem__\n    return self._getitem_axis(maybe_callable, axis=axis)\n  File \"/opt/openeo/lib64/python3.8/site-packages/pandas/core/indexing.py\", line 1566, in _getitem_axis\n    self._validate_integer(key, axis)\n  File \"/opt/openeo/lib64/python3.8/site-packages/pandas/core/indexing.py\", line 1500, in _validate_integer\n    raise IndexError(\"single positional indexer is out-of-bounds\")\nIndexError: single positional indexer is out-of-bounds\n\n\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:545)\n\tat org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:703)\n\tat org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:685)\n\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:498)\n\tat org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)\n\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)\n\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)\n\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)\n\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)\n\tat scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:511)\n\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)\n\tat scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:511)\n\tat scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)\n\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)\n\tat org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:223)\n\tat org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:302)\n\tat org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1481)\n\tat org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1408)\n\tat org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1472)\n\tat org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1295)\n\tat org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:335)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:337)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:131)\n\tat org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)\n\tat org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)",
    "levelname": "ERROR",
    "kubernetes": {
      "labels": {
        "spark-role": "executor",
        "version": "3.2.0",
        "spark-app-selector": "spark-3d6abe9ec9df48e8ac24fabd4db0c554",
        "spark-exec-id": "14",
        "sparkoperator_k8s_io/submission-id": "5f097cc2-adf0-49f8-a236-e2c0fdd52a31",
        "sparkoperator_k8s_io/launched-by-spark-operator": "true",
        "sparkoperator_k8s_io/app-name": "job-541a56bc5f-dfa678cb9ab17f65d4f0",
        "spark-exec-resourceprofile-id": "0"
      },
      "namespace": "spark-jobs",
      "pod": {
        "name": "job-541a56bc5f-dfa678cb9ab17f65d4f0-f10c0985be09161c-exec-14",
        "uid": "5e19fc11-dcfe-48f9-8999-b596f5883d28"
      },
      "container": {
        "name": "spark-kubernetes-executor",
        "image": "vito-docker.artifactory.vgt.vito.be/openeo-geotrellis-kube:20230116-337"
      },
      "node": {
        "name": "kube-node-dev-36"
      }
    },
    "input": {
      "type": "container"
    },
    "@version": "1",
    "created": 1673931613.55,
    "tags": [
      "1m",
      "openeo",
      "openeo-creo"
    ],
    "agent": {
      "hostname": "filebeat-filebeat-vbbtk",
      "ephemeral_id": "e94038a7-2b65-4f99-a06b-562648fcd3ef",
      "version": "7.5.1",
      "type": "filebeat",
      "id": "0614c2d8-5763-4f16-ba8d-2f70bc90612c"
    },
    "job_id": "j-541a56bc5f934e6cb69dc87477282f79",
    "stream": "stdout",
    "ecs": {
      "version": "1.1.0"
    }
  },
  "fields": {
    "@timestamp": [
      "2023-01-17T05:00:13.553Z"
    ]
  },
  "highlight": {
    "job_id": [
      "@kibana-highlighted-field@j-541a56bc5f934e6cb69dc87477282f79@/kibana-highlighted-field@"
    ]
  },
  "sort": [
    1673931613553
  ]
}
EmileSonneveld commented 1 year ago

@jdries When I test, I do get a nice error message. Not the unclear message. Example code:

import openeo

eoconn = openeo.connect("openeo-dev.vito.be").authenticate_oidc()

udf_code = """
from openeo.udf import UdfData, StructuredData
def transform(data: UdfData) -> UdfData:
    raise Exception("This error message should be visible to user")
    res = [
        StructuredData(description="res", data=[x * x for x in sd.data], type="list")
        for sd in data.get_structured_data_list()
    ]
    data.set_structured_data_list(res)
"""

datacube = eoconn.load_collection(
    "SENTINEL1_GRD",
    spatial_extent={"west": 16.06, "south": 48.06, "east": 16.15, "north": 48.15},
    temporal_extent=["2017-03-01", "2017-04-01"],
    bands=["VV", "VH"]
)

udf = openeo.UDF(udf_code)
rescaled_cube = datacube.apply(process=udf)
rescaled_cube.download("test.tiff")

Output end with this nice error message: image

Is there a way I can better test this ticket? I have the feeling that the SparkLiStener was not needed to make this error message.

jdries commented 1 year ago

That's good already, maybe also try with batch jobs?

EmileSonneveld commented 1 year ago

When running with a batch job, there are many logs, where the thrown error is shown 17 times. But they get logged as info and debug level, while the SparkListener logs I added as error level. So Maybe my change did not have an impact

Snippet used to run as batch job:

...
rescaled_cube = datacube.apply(process=udf)
# rescaled_cube.download("test.tiff")

# Creating a new job at the back-end by sending the datacube information.
job = rescaled_cube.create_job()

# Starts the job and waits until it finished to download the result.
job.start_and_wait()
job.get_results().download_files("output")
jdries commented 1 year ago

Batch job on creo just gave me a better message, like the one below. There's still a lot of less usefull info in the stack traces. I'm wondering if we can maybe put the most important part first. In this case, this would be: FileNotFoundError: [Errno 2] No such file or directory: '/eodata/auxdata/SRTMGL1/dem/N64E024.SRTMGL1.hgt.zip

The part below org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException is also quite useless.

 {
    "id": "[1675450345603, 653605]",
    "time": "2023-02-03T18:52:25.603Z",
    "level": "error",
    "message": "LogErrorSparkListener.onTaskEnd(...) error: org.apache.spark.api.python.PythonException: Traceback (most recent call last):\n  File \"/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py\", line 619, in main\n    process()\n  File \"/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py\", line 611, in process\n    serializer.dump_stream(out_iter, outfile)\n  File \"/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py\", line 132, in dump_stream\n    for obj in iterator:\n  File \"/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py\", line 74, in wrapper\n    return f(*args, **kwargs)\n  File \"/opt/openeo/lib/python3.8/site-packages/epsel.py\", line 44, in wrapper\n    return _FUNCTION_POINTERS[key](*args, **kwargs)\n  File \"/opt/openeo/lib/python3.8/site-packages/epsel.py\", line 37, in first_time\n    return f(*args, **kwargs)\n  File \"/opt/openeo/lib/python3.8/site-packages/openeo/util.py\", line 362, in wrapper\n    return f(*args, **kwargs)\n  File \"/opt/openeo/lib/python3.8/site-packages/openeogeotrellis/collections/s1backscatter_orfeo.py\", line 794, in process_product\n    dem_dir_context = S1BackscatterOrfeo._get_dem_dir_context(\n  File \"/opt/openeo/lib64/python3.8/site-packages/openeogeotrellis/collections/s1backscatter_orfeo.py\", line 258, in _get_dem_dir_context\n    dem_dir_context = S1BackscatterOrfeo._creodias_dem_subset_srtm_hgt_unzip(\n  File \"/opt/openeo/lib64/python3.8/site-packages/openeogeotrellis/collections/s1backscatter_orfeo.py\", line 664, in _creodias_dem_subset_srtm_hgt_unzip\n    with zipfile.ZipFile(zip_filename, 'r') as z:\n  File \"/usr/lib64/python3.8/zipfile.py\", line 1251, in __init__\n    self.fp = io.open(file, filemode)\nFileNotFoundError: [Errno 2] No such file or directory: '/eodata/auxdata/SRTMGL1/dem/N64E024.SRTMGL1.hgt.zip'\n\n\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:545)\n\tat org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:703)\n\tat org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:685)\n\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:498)\n\tat org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)\n\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)\n\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)\n\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)\n\tat scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:511)\n\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)\n\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)\n\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)\n\tat scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:511)\n\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)\n\tat scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)\n\tat org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)\n\tat org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)\n\tat org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)\n\tat org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:131)\n\tat org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)\n\tat org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\n"
  },
EmileSonneveld commented 1 year ago

Non-batch jobs can show a nice and to-the point error message. Would it be possible to show the same kind of error for batch-jobs? Considering the more complex job pipeline. image

Adding more logs to the already enormous output might still keep it hard to use for users that don't have acces to Kibana.

Errors that happen before running in the executors gets shown nicely in batch and non-batch job requests.

jdries commented 1 year ago

FYI: verbose log printing by python client for batch jobs got handled here: https://github.com/Open-EO/openeo-python-client/issues/332

EmileSonneveld commented 1 year ago

Errors now look like this to the user: OpenEO batch job failed: UDF Exception during Spark execution: File "<string>", line 8, in transform File "<string>", line 7, in function_in_transform File "<string>", line 4, in function_in_root Exception: This error message should be visible to user Visible to the user, even in for example https://editor.openeo.org/?server=https%3A%2F%2Fopeneo-dev.vito.be