Open-EO / openeo-geopyspark-driver

OpenEO driver for GeoPySpark (Geotrellis)
Apache License 2.0
25 stars 4 forks source link

Sync requests: more accurate usage logging #734

Open jdries opened 3 months ago

jdries commented 3 months ago

We currently can not accurately measure usage of sync requests. Via a detour however, we might be getting there: Spark has a json api, example requests are below. I already configured jobGroup to point to request id (not always filled in,can also be empty). From there we get stage ids, and stage id json contains cpu time!

So we can either do a spark listener for finished jobs, or somehow set up an external scraper for jobs endpoint?

https://spark-ui-0.stag.warsaw.openeo.dataspace.copernicus.eu/api/v1/applications/spark-1be174b9cf8248438bf18097063c7f54/jobs/0

{
"jobId": 0,
"name": "collectAsMap at package.scala:330",
"description": "Write geotiff GridExtent(Extent(642040.0, 5672800.0, 647160.0, 5677920.0), CellSize(10.0,10.0), 512x512) of type float32",
"submissionTime": "2024-03-20T15:30:46.581GMT",
"completionTime": "2024-03-20T15:31:06.625GMT",
"stageIds": [
0,
1,
5,
2,
3,
4
],
"jobGroup": "r-240320e5a0bc42229f6b612eba56833b",
"status": "SUCCEEDED",
"numTasks": 36,
"numActiveTasks": 0,
"numCompletedTasks": 36,
"numSkippedTasks": 0,
"numFailedTasks": 0,
"numKilledTasks": 0,
"numCompletedIndices": 36,
"numActiveStages": 0,
"numCompletedStages": 6,
"numSkippedStages": 0,
"numFailedStages": 0,
"killedTasksSummary": {}
}

https://spark-ui-0.stag.warsaw.openeo.dataspace.copernicus.eu/api/v1/applications/spark-1be174b9cf8248438bf18097063c7f54/stages/4?details=false

[
{
"status": "COMPLETE",
"stageId": 4,
"attemptId": 0,
"numTasks": 10,
"numActiveTasks": 0,
"numCompleteTasks": 10,
"numFailedTasks": 0,
"numKilledTasks": 0,
"numCompletedIndices": 10,
"submissionTime": "2024-03-20T15:30:55.234GMT",
"firstTaskLaunchedTime": "2024-03-20T15:30:55.240GMT",
"completionTime": "2024-03-20T15:31:05.531GMT",
"executorDeserializeTime": 6928,
"executorDeserializeCpuTime": 3672368720,
"executorRunTime": 16000,
"executorCpuTime": 1610058957,
"resultSize": 21562,
"jvmGcTime": 414,
"resultSerializationTime": 5,
"memoryBytesSpilled": 0,
"diskBytesSpilled": 0,
"peakExecutionMemory": 167888,
"inputBytes": 0,
"inputRecords": 0,
"outputBytes": 0,
"outputRecords": 0,
"shuffleRemoteBlocksFetched": 9,
"shuffleLocalBlocksFetched": 17,
"shuffleFetchWaitTime": 51,
"shuffleRemoteBytesRead": 14517,
"shuffleRemoteBytesReadToDisk": 0,
"shuffleLocalBytesRead": 27662,
"shuffleReadBytes": 42179,
"shuffleReadRecords": 60,
"shuffleCorruptMergedBlockChunks": 0,
"shuffleMergedFetchFallbackCount": 0,
"shuffleMergedRemoteBlocksFetched": 0,
"shuffleMergedLocalBlocksFetched": 0,
"shuffleMergedRemoteChunksFetched": 0,
"shuffleMergedLocalChunksFetched": 0,
"shuffleMergedRemoteBytesRead": 0,
"shuffleMergedLocalBytesRead": 0,
"shuffleRemoteReqsDuration": 206,
"shuffleMergedRemoteReqsDuration": 0,
"shuffleWriteBytes": 495811,
"shuffleWriteTime": 23188637,
"shuffleWriteRecords": 60,
"name": "flatMap at OpenEOProcesses.scala:484",
"description": "Write geotiff GridExtent(Extent(642040.0, 5672800.0, 647160.0, 5677920.0), CellSize(10.0,10.0), 512x512) of type float32",
"details": "org.apache.spark.rdd.RDD.flatMap(RDD.scala:421)\norg.openeo.geotrellis.OpenEOProcesses.aggregateTemporal(OpenEOProcesses.scala:484)\njava.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\njava.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\njava.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.base/java.lang.reflect.Method.invoke(Method.java:566)\npy4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\npy4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)\npy4j.Gateway.invoke(Gateway.java:282)\npy4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\npy4j.commands.CallCommand.execute(CallCommand.java:79)\npy4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)\npy4j.ClientServerConnection.run(ClientServerConnection.java:106)\njava.base/java.lang.Thread.run(Thread.java:829)",
"schedulingPool": "default",
"rddIds": [
32,
30,
23,
21,
22
],
"accumulatorUpdates": [
{
"id": 2,
"name": "SecondsPerChunk_FileCollection-Sentinel2",
"value": "2.305875"
},
{
"id": 1,
"name": "ChunkCount_FileCollection-Sentinel2",
"value": "59"
}
],
"killedTasksSummary": {},
"resourceProfileId": 0,
"peakExecutorMetrics": {
"JVMHeapMemory": 279591536,
"JVMOffHeapMemory": 123360192,
"OnHeapExecutionMemory": 0,
"OffHeapExecutionMemory": 0,
"OnHeapStorageMemory": 41968,
"OffHeapStorageMemory": 0,
"OnHeapUnifiedMemory": 41968,
"OffHeapUnifiedMemory": 0,
"DirectPoolMemory": 8471305,
"MappedPoolMemory": 0,
"ProcessTreeJVMVMemory": 6216826880,
"ProcessTreeJVMRSSMemory": 1269485568,
"ProcessTreePythonVMemory": 0,
"ProcessTreePythonRSSMemory": 0,
"ProcessTreeOtherVMemory": 0,
"ProcessTreeOtherRSSMemory": 0,
"MinorGCCount": 200,
"MinorGCTime": 355,
"MajorGCCount": 3,
"MajorGCTime": 284,
"TotalGCTime": 626
},
"isShufflePushEnabled": false,
"shuffleMergersCount": 0
}
]
bossie commented 3 months ago

The problem is that there's a mismatch between a synchronous request (in which you want to return the OpenEO-Costs header) and something asynchronous like a SparkListener.

jdries commented 3 months ago

Perhaps we can not use a listener, and just do a rest api request before returning the result. (When the result is ready, the job is anyway finished, we don't really need a listener for that, and I guess we also know the request id at that point.)

bossie commented 3 months ago

Makes me wonder if there's a Scala API we can use and do away with the REST call (it only makes sense that the latter is implemented in terms of the former). :thinking:

jdries commented 3 months ago

yeah, I also searched for that scala api quite a bit. If it exists, it's probably going to be internal spark api...

soxofaan commented 1 month ago

(putting some keywords here so can more easily find this ticket next time :smile: ETL API credit credits billing)