gchq / stroom

Stroom is a highly scalable data storage, processing and analysis platform.
https://gchq.github.io/stroom-docs/
Apache License 2.0
424 stars 56 forks source link

Make Stroom resilient to bad processor filters. #4276

Open gcdev373 opened 1 month ago

gcdev373 commented 1 month ago

When a processor filter is enabled for a feed that doesn't exist - this can happen following import for example, Stroom can fail to perform any processing, only endlessly erroring because of the one bad filter.

 ERROR  [2024-05-14T11:19:19.593Z] [Create Processor Tasks #4] stroom.processor.impl.ProcessorTaskCreatorImpl - 
 Error creating tasks for filter: 
filter id=16, filter uuid=0cbd118e-ea16-40f1-a19a-8f7f7f4acd7a, 
pipeline uuid=5722027f-5546-44dc-9a8e-dfd3af9b62ef 
RuntimeException - Unable to find doc with reference 
'DocRef{type='Feed', uuid='a2604bae-5ded-425d-b488-78eaec2f368d', name='TEST'}' 
for term: Feed is TEST 
    at stroom.db.util.TermHandler.getDocValue(TermHandler.java:172)
    at stroom.db.util.TermHandler.apply(TermHandler.java:130)
    at stroom.db.util.TermHandler.apply(TermHandler.java:29)
    at stroom.db.util.CommonExpressionMapper.innerApply(CommonExpressionMapper.java:103)
    at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
    at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1708)
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
    at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
    at stroom.db.util.CommonExpressionMapper.innerApply(CommonExpressionMapper.java:121)
    at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
    at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1708)
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
    at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
    at stroom.db.util.CommonExpressionMapper.innerApply(CommonExpressionMapper.java:121)
    at stroom.db.util.CommonExpressionMapper.apply(CommonExpressionMapper.java:65)
    at stroom.db.util.ExpressionMapper.apply(ExpressionMapper.java:85)
    at stroom.meta.impl.db.MetaDaoImpl.createCondition(MetaDaoImpl.java:1660)
    at stroom.meta.impl.db.MetaDaoImpl.createCondition(MetaDaoImpl.java:1656)
    at stroom.meta.impl.db.MetaDaoImpl.find(MetaDaoImpl.java:1374)
    at stroom.meta.impl.MetaServiceImpl.secureFind(MetaServiceImpl.java:379)
    at stroom.meta.impl.MetaServiceImpl.lambda$find$6(MetaServiceImpl.java:324)
    at stroom.util.logging.LocationAwareLambdaLogger.logDurationIfTraceEnabled(LocationAwareLambdaLogger.java:275)
    at stroom.util.logging.LambdaLogger.logDurationIfTraceEnabled(LambdaLogger.java:80)
    at stroom.meta.impl.MetaServiceImpl.find(MetaServiceImpl.java:316)
    at stroom.processor.impl.ProcessorTaskCreatorImpl.runSelectMetaQuery(ProcessorTaskCreatorImpl.java:737)
    at stroom.processor.impl.ProcessorTaskCreatorImpl.createTasksFromCriteria(ProcessorTaskCreatorImpl.java:412)
    at stroom.processor.impl.ProcessorTaskCreatorImpl.doCreateTasksForFilter(ProcessorTaskCreatorImpl.java:352)
    at stroom.processor.impl.ProcessorTaskCreatorImpl.lambda$createTasksForFilter$8(ProcessorTaskCreatorImpl.java:285)
    at java.base/java.util.Optional.ifPresent(Optional.java:178)
    at stroom.processor.impl.ProcessorTaskCreatorImpl.createTasksForFilter(ProcessorTaskCreatorImpl.java:267)
    at stroom.processor.impl.ProcessorTaskCreatorImpl.lambda$createTasksForFilter$6(ProcessorTaskCreatorImpl.java:241)
    at stroom.task.impl.TaskContextFactoryImpl.lambda$createFromConsumer$0(TaskContextFactoryImpl.java:181)
    at stroom.task.impl.TaskContextFactoryImpl.lambda$wrap$2(TaskContextFactoryImpl.java:253)
    at stroom.util.logging.LocationAwareLambdaLogger.logDurationIfDebugEnabled(LocationAwareLambdaLogger.java:307)
    at stroom.task.impl.TaskContextFactoryImpl.lambda$wrap$4(TaskContextFactoryImpl.java:253)
    at stroom.util.pipeline.scope.PipelineScopeRunnable.scopeResult(PipelineScopeRunnable.java:39)
    at stroom.task.impl.TaskContextFactoryImpl.lambda$wrap$5(TaskContextFactoryImpl.java:250)
    at stroom.task.impl.TaskContextFactoryImpl.lambda$wrap$7(TaskContextFactoryImpl.java:264)
    at stroom.security.impl.SecurityContextImpl.asUserResult(SecurityContextImpl.java:322)
    at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
    at java.base/java.lang.reflect.Method.invoke(Method.java:580)
    at com.google.inject.internal.DelegatingInvocationHandler.invoke(DelegatingInvocationHandler.java:50)
    at jdk.proxy2/jdk.proxy2.$Proxy122.asUserResult(Unknown Source)
    at stroom.task.impl.TaskContextFactoryImpl.lambda$wrap$11(TaskContextFactoryImpl.java:260)
    at stroom.processor.impl.ProcessorTaskCreatorImpl.lambda$createTasksForFilter$7(ProcessorTaskCreatorImpl.java:247)
    at stroom.security.impl.SecurityContextImpl.asUser(SecurityContextImpl.java:340)
    at stroom.processor.impl.ProcessorTaskCreatorImpl.createTasksForFilter(ProcessorTaskCreatorImpl.java:229)
    at stroom.processor.impl.ProcessorTaskCreatorImpl.lambda$createNewTasks$3(ProcessorTaskCreatorImpl.java:189)
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
    at java.base/java.lang.Thread.run(Thread.java:1583)

Possible explanation: Stroom creates tasks in blocks of a fixed size. First time round it makes that many threads. Each of these is allocated a task to process the pathological filter. Each of these runs for ~100ms or whatever and fails. That’s the whole block / opportunity for processor creation done. No other processors get a look in. On the next run, there’s another block. Same fixed size. It also creates that many threads. Same logic is used and once again the block is filled with tasks that process same dodgy filter… So Stroom never does any other processing and can’t get over this without manual intervention to identify the bad filter and disable it.