holunda-io / camunda-bpm-taskpool

Library for pooling user tasks and process related business objects.
https://www.holunda.io/camunda-bpm-taskpool/
Apache License 2.0
66 stars 26 forks source link

Mongo View Change Tracker Improvements #953

Closed zambrovski closed 2 months ago

zambrovski commented 4 months ago

The test run of Mongo View in Polyflow logs strange exceptions. I started to analyze the implementation but I'll need the help of @lbilger because I can't fix it for the moment.

The test class is: TaskChangeTrackerTest

I also looked in the implementation and found a strange reactive code style in the TaskChangeTracker itself. I believe modification of a class member as a side effect of the reactive doOnNext() is a smell. Can this be improved too?

  private var lastSeenResumeToken: BsonValue? = null

  private val changeStream: Flux<TaskDocument> = Flux.defer { taskRepository.getTaskUpdates(lastSeenResumeToken) }
    // When there are no more subscribers to the change stream, the flux is cancelled. When a new subscriber appears, they should not get any past updates.
    // This shouldn't happen at all because the `trulyDeleteChangeStream` subscription should always stay active, but we keep it as a last resort.
    .doOnCancel { lastSeenResumeToken = null }
    // Remember the last seen resume token if one is present
    .doOnNext { event -> lastSeenResumeToken = event.resumeToken ?: lastSeenResumeToken }
    // When the resume token is out of date, Mongo will throw an error 'resume of change stream was not possible, as the resume token was not found.'
    // Unfortunately, there is no way to identify exactly this error because error codes and messages vary by Mongo server version.
    // The closest we can get is reacting on any MongoCommandException and resetting the token so that upon the next retry, we start without a token.
    .doOnError(MongoCommandException::class.java) { lastSeenResumeToken = null }
    .doOnNext { event -> logger.debug { "Got event: $event" } }
    .log(TaskChangeTracker::class.qualifiedName, Level.WARNING, SignalType.ON_ERROR)
    .retryWhen(Retry.backoff(Long.MAX_VALUE, Duration.ofMillis(100)).maxBackoff(Duration.ofSeconds(10)))
    .concatMap { event -> Mono.justOrEmpty(event.body) }
    .share()

Steps to reproduce

Expected behaviour

Only expected exceptions (thrown in the test and logged by the correposning statements) should appear in log of the test execution.

Actual behaviour

This one is surprising

13:25:07.014 [main] ERROR i.h.p.v.mongo.task.TaskChangeTracker - 
java.lang.NullPointerException: The Publisher returned by the supplier is null
    at java.base/java.util.Objects.requireNonNull(Objects.java:235)
    at reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:46)
    at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62)
    at reactor.core.publisher.FluxRetryWhen.subscribe(FluxRetryWhen.java:77)
    at reactor.core.publisher.FluxRetryWhen.subscribeOrReturn(FluxRetryWhen.java:83)
    at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:55)
    at reactor.core.publisher.FluxPublish.connect(FluxPublish.java:106)
    at reactor.core.publisher.FluxRefCount.subscribe(FluxRefCount.java:88)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8773)
    at reactor.core.publisher.Flux.subscribeWith(Flux.java:8894)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8739)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8663)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8581)
    at io.holunda.polyflow.view.mongo.task.TaskChangeTracker.<init>(TaskChangeTracker.kt:85)
    at io.holunda.polyflow.view.mongo.service.TaskChangeTrackerTest$taskChangeTracker$2.invoke(TaskChangeTrackerTest.kt:47)
    at io.holunda.polyflow.view.mongo.service.TaskChangeTrackerTest$taskChangeTracker$2.invoke(TaskChangeTrackerTest.kt:47)
    at kotlin.SynchronizedLazyImpl.getValue(LazyJVM.kt:74)
    at io.holunda.polyflow.view.mongo.service.TaskChangeTrackerTest.getTaskChangeTracker(TaskChangeTrackerTest.kt:47)
    at io.holunda.polyflow.view.mongo.service.TaskChangeTrackerTest.schedules job for deleting leftover tasks(TaskChangeTrackerTest.kt:246)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727)
    at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
    at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86)
    at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
    at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
    at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
    at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:217)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:213)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:138)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:68)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:147)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:127)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:90)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:55)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:102)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:54)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
    at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
    at org.apache.maven.surefire.junitplatform.LazyLauncher.execute(LazyLauncher.java:56)
    at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.execute(JUnitPlatformProvider.java:184)
    at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invokeAllTests(JUnitPlatformProvider.java:148)
    at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invoke(JUnitPlatformProvider.java:122)
    at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:385)
    at org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:162)
    at org.apache.maven.surefire.booter.ForkedBooter.run(ForkedBooter.java:507)
    at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:495)
lbilger commented 2 months ago

This message probably means that taskRepository.getTaskUpdates(lastSeenResumeToken) returns null. I often see that when a mock is not stubbed for the given parameters.

lbilger commented 2 months ago

Regarding the lastSeenResumeToken: yes, it is not perfect, but I think it's ok in this context because there will only ever be one subscription per instance of TaskChangeTracker. Possibly there is a way to do it with an AtomicReference in the SubscriberContext, but I'm not sure if that's worth the effort.