gbif / pipelines

Pipelines for data processing (GBIF and LivingAtlases)
Apache License 2.0
40 stars 28 forks source link

Improve logging in EventsInterpretationCallback.runDistributed #954

Closed timrobertson100 closed 9 months ago

timrobertson100 commented 10 months ago

During the processing of many datasets several failed in the EVENTS_INTERPRETED_TO_INDEX stage. The linked logs&_a=(columns:!(_source),filters:!(),index:'439da4d0-290a-11ed-8155-a37cb1ead50e',interval:auto,query:(language:lucene,query:'datasetKey.keyword:%224a99b94e-0c47-4a0b-bb8f-68248e484806%22%20AND%20attempt:%221%22'),sort:!(!('@timestamp',desc)))) indicate that a distributed submission failed, but don't give much further info (example below).

To aid diagnostics, especially when many datasets are being processed, it would be helpful if this could log some further information if possible; e.g. a job ID that could be used from the history server, whether the job wasn't accepted by YARN, or a timeout or so.

java.lang.IllegalStateException: Process has been finished with exit value - 1
    at org.gbif.pipelines.tasks.events.interpretation.EventsInterpretationCallback.runDistributed(EventsInterpretationCallback.java:127)
    at org.gbif.pipelines.tasks.events.interpretation.EventsInterpretationCallback.lambda$createRunnable$0(EventsInterpretationCallback.java:90)
    at org.gbif.pipelines.tasks.PipelinesCallback.handleMessage(PipelinesCallback.java:159)
    at org.gbif.pipelines.tasks.events.interpretation.EventsInterpretationCallback.handleMessage(EventsInterpretationCallback.java:52)
    at org.gbif.pipelines.tasks.events.interpretation.EventsInterpretationCallback.handleMessage(EventsInterpretationCallback.java:28)
    at org.gbif.common.messaging.MessageConsumer.handleCallback(MessageConsumer.java:129)
    at org.gbif.common.messaging.MessageConsumer.handleDelivery(MessageConsumer.java:82)
    at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
    at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

Aside: we might consider making use of Airflow (as an Oozie replacement) to launch all Spark jobs as that would then provide the history of failed jobs and links through the history server etc, similar to how downloads work. The pipeline coordinator would then hold the Airflow job ID.

muttcg commented 10 months ago

Every job contains datasetKey_attempt, so you go to yarn ui and filter by that, fx job from logs: http://c5master2-vh.gbif.org:8088/cluster/app/application_1677232479944_1517365

fmendezh commented 10 months ago

Since we can control the application name and match that against a job id, we can also add a Spark Listener to improve the information at different stages of a Spark application https://spark.apache.org/docs/3.1.2/api/java/index.html?org/apache/spark/scheduler/SparkListener.html

muttcg commented 10 months ago

I don't see any logging issues at this moment, java issues - go to ELK, yarn - go to YARN java.lang.IllegalStateException: Process has been finished with exit value - 1 I would just corrected the logging message to check yarn logs job_name Spark logs can be redirected to ELK, we have all the code and configs. I personally found YARN UI better to read Spark issues

timrobertson100 commented 10 months ago

Thanks @muttcg

Error: Process failed in distributed Job. Check yarn logs

Would be a helpful change. I hadn't realized the dataset key could be used.

muttcg commented 9 months ago

Deployed to PROD