gbif / pipelines

Pipelines for data processing (GBIF and LivingAtlases)
Apache License 2.0
40 stars 28 forks source link

Fragmenter stage doesn't update status in registry correctly #957

Open muttcg opened 9 months ago

muttcg commented 9 months ago

For some datasets (usually small), the status is 'State: QUEUED', but the logs show that processing was completed without any issues, and the metrics file is present with the correct statistics.

muttcg commented 9 months ago

https://registry.gbif.org/dataset/04b949ba-e65a-427c-acce-8a5df15605e8/ingestion-history

timrobertson100 commented 9 months ago

Just spotted in logs - don't know if this is related:

ERROR [10-12 11:45:38,539+0000] [pipelines_verbatim_fragmenter-3] 34d91d0e-49e1-4d8f-9cdc-25c3bf9f322f 1 FRAGMENTER org.gbif.pipelines.tasks.PipelinesCallback: Couldn't track pipeline step for message {"datasetUuid":"34d91d0e-49e1-4d8f-9cdc-25c3bf9f322f","attempt":1,"pipelineSteps":["FRAGMENTER","HDFS_VIEW","INTERPRETED_TO_INDEX","DWCA_TO_VERBATIM","VERBATIM_TO_INTERPRETED"],"numberOfRecords":0,"numberOfEventRecords":1,"runner":"STANDALONE","repeatAttempt":true,"resetPrefix":null,"executionId":null,"endpointType":"DWC_ARCHIVE","validationResult":{"tripletValid":false,"occurrenceIdValid":false,"useExtendedRecordId":null,"numberOfRecords":0,"numberOfEventRecords":1},"interpretTypes":["LOCATION","TEMPORAL","GRSCICOLL","MULTIMEDIA","BASIC","TAXONOMY","AMPLIFICATION","IMAGE","IDENTIFIER_ABSENT","CLUSTERING","OCCURRENCE","VERBATIM","MULTIMEDIA_TABLE","MEASUREMENT_OR_FACT","LOCATION_FEATURE","AUDUBON","METADATA"],"datasetType":"OCCURRENCE"}
java.lang.NullPointerException: null
    at org.gbif.api.model.pipelines.PipelinesWorkflow$Graph.bfs(PipelinesWorkflow.java:230)
    at org.gbif.api.model.pipelines.PipelinesWorkflow$Graph.lambda$getAllNodesFor$4(PipelinesWorkflow.java:165)
    at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
    at java.util.Collections$2.tryAdvance(Collections.java:4719)
    at java.util.Collections$2.forEachRemaining(Collections.java:4727)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
    at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
    at org.gbif.api.model.pipelines.PipelinesWorkflow$Graph.getAllNodesFor(PipelinesWorkflow.java:167)
    at org.gbif.pipelines.tasks.PipelinesCallback.trackPipelineStep(PipelinesCallback.java:294)
    at org.gbif.pipelines.tasks.PipelinesCallback.handleMessage(PipelinesCallback.java:152)
    at org.gbif.pipelines.tasks.verbatims.fragmenter.FragmenterCallback.handleMessage(FragmenterCallback.java:68)
    at org.gbif.pipelines.tasks.verbatims.fragmenter.FragmenterCallback.handleMessage(FragmenterCallback.java:42)
    at org.gbif.common.messaging.MessageConsumer.handleCallback(MessageConsumer.java:129)
    at org.gbif.common.messaging.MessageConsumer.handleDelivery(MessageConsumer.java:82)
    at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
    at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
INFO  [10-12 11:45:38,540+0000] [pipelines_verbatim_fragmenter-3] 34d91d0e-49e1-4d8f-9cdc-25c3bf9f322f 1 FRAGMENTER org.gbif.pip
muttcg commented 9 months ago

Added extra logging shows, that Fragmenter tries to update the status, but status wasn't updated in Registry:

History client: update pipeline step: 
PipelineStep[key=11901653, 
type=FRAGMENTER, 
runner=STANDALONE, 
started=2023-10-19T11:56:33.452, 
finished=null, 
state=RUNNING, 
message='{"datasetUuid":"6cd91be8-f762-11e1-a439-00145eb45e9a","attempt":229,"pipelineSteps":["FRAGMENTER","HDFS_VIEW","INTERPRETED_TO_INDEX","DWCA_TO_VERBATIM","VERBATIM_TO_INTERPRETED"],"runner":"STANDALONE","numberOfRecords":40,"numberOfEventRecords":null,"repeatAttempt":true,"resetPrefix":null,"executionId":3352231,"endpointType":"DWC_ARCHIVE","validationResult":{"tripletValid":true,"occurrenceIdValid":true,"useExtendedRecordId":null,"numberOfRecords":40,"numberOfEventRecords":null},"interpretTypes":["LOCATION","TEMPORAL","GRSCICOLL","MULTIMEDIA","BASIC","TAXONOMY","IDENTIFIER_ABSENT","IMAGE","AMPLIFICATION","CLUSTERING","OCCURRENCE","VERBATIM","AUDUBON","MEASUREMENT_OR_FACT","LOCATION_FEATURE","METADATA"],"datasetType":"OCCURRENCE","routingKey":"occurrence.pipelines.interpretation.finished.standalone","datasetInfo":{"datasetType":"OCCURRENCE","containsOccurrences":true,"containsEvents":false}}', 
numberRecords='null', 
pipelinesVersion='2.17.7', 
createdBy='crawler.gbif.org', 
modified=null, modifiedBy='null', 
metrics=[]]