johanhaleby / occurrent

Unintrusive Event Sourcing Library for the JVM
https://occurrent.org
120 stars 16 forks source link

CatchupSubscriptionModel does not persist position and stops working for new events without 'hostinfo' #142

Closed gavvvr closed 10 months ago

gavvvr commented 11 months ago

Hi

This one is easy to reproduce. Just create a free shared instance of MongoDB in Atlas, it does not allow 'hostinfo'. After that run the following code 3 times to reproduce 2 issues from the title (I guess all of them are caused by uncaught NPE). Here is the code:

val converter = JacksonCloudEventConverter<SomeSpecificDomainEvent>(jacksonObjectMapper(), URI("urn:test"))

data class SomeSpecificDomainEvent(val id: Int)

fun main() {
    val connectionString =
        "mongodb+srv://?:?@occurrent-test.g1kojdq.mongodb.net/?retryWrites=true&w=majority"
    val mongoClient = MongoClients.create(connectionString)
    val dbName = "occurrent-test"
    val database = mongoClient.getDatabase(dbName)

    val eventsCollectionName = "events"
    val eventStore =
        MongoEventStore(mongoClient, dbName, eventsCollectionName, EventStoreConfig(TimeRepresentation.RFC_3339_STRING))

    val nativeMongoSubscriptionModel = NativeMongoSubscriptionModel(
        database,
        eventsCollectionName,
        TimeRepresentation.RFC_3339_STRING,
        Executors.newCachedThreadPool(),
        RetryStrategy.retry().maxAttempts(1),
    )
    val positionStorage =
        NativeMongoSubscriptionPositionStorage(database, "subscription-position-collection")
    val wrappedSubscriptionModel = DurableSubscriptionModel(
        nativeMongoSubscriptionModel,
        positionStorage,
    )
    val catchupSubscriptionModel = CatchupSubscriptionModel(
        wrappedSubscriptionModel,
        eventStore,
        CatchupSubscriptionModelConfig(useSubscriptionPositionStorage(positionStorage))
    )

    // Now subscribe!
    catchupSubscriptionModel.subscribe("logger-subscription") { e ->
        println("Event number=${converter.toDomainEvent(e).id} received!")
    }

    when (eventStore.count()) {
        0L -> {
            println("Writing very first event to db")
            eventStore.write("some-stream", converter.toCloudEvent(SomeSpecificDomainEvent(1)))
        }

        1L -> {
            println("Writing very 2nd event to db")
            eventStore.write("some-stream", converter.toCloudEvent(SomeSpecificDomainEvent(2)))
        }
    }
}

On first (and each) run you will see a WARNing about 'hostinfo' and an NPE following soon after that:

[main] WARN  o.o.s.m.n.b.NativeMongoSubscriptionModel - Failed to get global subscription position from MongoDB, probably because the server doesn't allow to execute the "hostinfo" command. This only affects the very first event received by the subscription. If the processing of this event fails _and_ the application is restarted the event cannot be retried. If this is major concern, consider upgrading your MongoDB server to a non-shared environment that supports the "hostinfo" command. Error is:
Command failed with error 8000 (AtlasError): 'CMD_NOT_ALLOWED: hostInfo' on server ac-udi6xr8-shard-00-01.g1kojdq.mongodb.net:27017. The full response is {"ok": 0, "errmsg": "CMD_NOT_ALLOWED: hostInfo", "code": 8000, "codeName": "AtlasError"}
Exception in thread "pool-1-thread-1" java.lang.NullPointerException: SubscriptionPosition cannot be null
    at java.base/java.util.Objects.requireNonNull(Objects.java:235)
    at org.occurrent.subscription.StartAt$StartAtSubscriptionPosition.<init>(StartAt.java:99)
    at org.occurrent.subscription.StartAt.subscriptionPosition(StartAt.java:134)
    at org.occurrent.subscription.blocking.durable.catchup.CatchupSubscriptionModel.lambda$subscribe$7(CatchupSubscriptionModel.java:186)
    at org.occurrent.subscription.StartAt$Dynamic.get(StartAt.java:85)
    at org.occurrent.subscription.mongodb.nativedriver.blocking.NativeMongoSubscriptionModel.newInternalSubscription(NativeMongoSubscriptionModel.java:172)
    at org.occurrent.subscription.mongodb.nativedriver.blocking.NativeMongoSubscriptionModel.lambda$subscribe$1(NativeMongoSubscriptionModel.java:156)
    at org.occurrent.retry.internal.RetryExecution.lambda$executeWithRetry$2(RetryExecution.java:75)
    at org.occurrent.retry.internal.RetryExecution.lambda$executeWithRetry$4(RetryExecution.java:82)
    at org.occurrent.retry.internal.RetryExecution.lambda$executeWithRetry$3(RetryExecution.java:76)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)

If you run the code for the 2nd time, you will see:

Event number=1 received! Writing very 2nd event to db

and the NPE again. You would expect "Event number=2 received!" too, but nothing happens. So it means that after the NPE the subscription model is broken.

Run the above code for the 3rd time and you will see:

Event number=1 received! Event number=2 received!

Receiving 1st event for the 2nd time means that subscription's position was not saved and it's not durable. Any subsequent execution of the above code will lead to processing all events from the very beginning as if the subscription is not durable.

gavvvr commented 11 months ago

The same problem is observed in example-domain-word-guessing-game-es-mongodb-spring-blocking example. The Spring context simply won't start on attempt to create whenGameWasWonThenSendEmailToWinner bean with the similar NPE:

org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.occurrent.subscription.api.blocking.Subscription]: Factory method 'whenGameWasWonThenSendEmailToWinner' threw exception with message: SubscriptionPosition cannot be null
    at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:171) ~[spring-beans-6.0.11.jar:6.0.11]
    at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:655) ~[spring-beans-6.0.11.jar:6.0.11]
    ... 19 common frames omitted
Caused by: java.lang.NullPointerException: SubscriptionPosition cannot be null
    at java.base/java.util.Objects.requireNonNull(Objects.java:235) ~[na:na]
    at org.occurrent.subscription.StartAt$StartAtSubscriptionPosition.<init>(StartAt.java:99) ~[classes/:na]
    at org.occurrent.subscription.StartAt.subscriptionPosition(StartAt.java:134) ~[classes/:na]
    at org.occurrent.subscription.blocking.durable.catchup.CatchupSubscriptionModel.lambda$subscribe$7(CatchupSubscriptionModel.java:186) ~[classes/:na]
    at org.occurrent.subscription.StartAt$Dynamic.get(StartAt.java:85) ~[classes/:na]
    at org.occurrent.subscription.mongodb.internal.MongoCommons.applyStartPosition(MongoCommons.java:86) ~[classes/:na]
    at org.occurrent.subscription.mongodb.spring.blocking.SpringMongoSubscriptionModel.lambda$subscribe$0(SpringMongoSubscriptionModel.java:151) ~[classes/:na]
    at org.occurrent.subscription.mongodb.spring.blocking.SpringMongoSubscriptionModel.lambda$subscribe$5(SpringMongoSubscriptionModel.java:177) ~[classes/:na]
    at org.occurrent.subscription.mongodb.spring.blocking.SpringMongoSubscriptionModel.subscribe(SpringMongoSubscriptionModel.java:178) ~[classes/:na]
    at org.occurrent.subscription.blocking.durable.DurableSubscriptionModel.subscribe(DurableSubscriptionModel.java:100) ~[classes/:na]
    at org.occurrent.subscription.blocking.durable.catchup.CatchupSubscriptionModel.subscribe(CatchupSubscriptionModel.java:199) ~[classes/:na]
    at org.occurrent.subscription.api.blocking.Subscribable.subscribe(Subscribable.java:48) ~[classes/:na]
    at org.occurrent.dsl.subscription.blocking.Subscriptions.subscribe(Subscriptions.kt:104) ~[classes/:na]
    at org.occurrent.dsl.subscription.blocking.Subscriptions.subscribe(Subscriptions.kt:94) ~[classes/:na]
    at org.occurrent.example.domain.wordguessinggame.mongodb.spring.blocking.features.emailwinner.SendEmailToWinner.whenGameWasWonThenSendEmailToWinner(SendEmailToWinner.kt:42) ~[classes/:na]
    at org.occurrent.example.domain.wordguessinggame.mongodb.spring.blocking.features.emailwinner.SendEmailToWinner$$SpringCGLIB$$0.CGLIB$whenGameWasWonThenSendEmailToWinner$0(<generated>) ~[classes/:na]
    at org.occurrent.example.domain.wordguessinggame.mongodb.spring.blocking.features.emailwinner.SendEmailToWinner$$SpringCGLIB$$2.invoke(<generated>) ~[classes/:na]
    at org.springframework.cglib.proxy.MethodProxy.invokeSuper(MethodProxy.java:258) ~[spring-core-6.0.10.jar:6.0.10]
    at org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.intercept(ConfigurationClassEnhancer.java:331) ~[spring-context-6.0.11.jar:6.0.11]
    at org.occurrent.example.domain.wordguessinggame.mongodb.spring.blocking.features.emailwinner.SendEmailToWinner$$SpringCGLIB$$0.whenGameWasWonThenSendEmailToWinner(<generated>) ~[classes/:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
    at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:139) ~[spring-beans-6.0.11.jar:6.0.11]
    ... 20 common frames omitted
johanhaleby commented 11 months ago

Hi,

Thanks for the very detailed description. I'm running applications in Atlas free tier myself with Occurrent, and I remember that I had problems with it in the past. Have you tried "Restart Subscription when Oplog Lost" (search for it in the docs) or add this to subscription model:

var subscriptionModel = new SpringMongoSubscriptionModel(mongoTemplate, SpringSubscriptionModelConfig.withConfig("events", TimeRepresentation.RFC_3339_STRING).restartSubscriptionsOnChangeStreamHistoryLost(true));

What's important is restartSubscriptionsOnChangeStreamHistoryLost(true)).

Please try and see if that helps.

gavvvr commented 11 months ago

Hi @johanhaleby Thank you for fast response!

My original app is non-Spring based and there is no restartSubscriptionsOnChangeStreamHistoryLost property on NativeMongoSubscriptionModel.

Anyway, this does not help me even with Spring. The NPE and both problems still exists (position is not saved and no new events processed) Here is my Spring setup with restartSubscriptionsOnChangeStreamHistoryLost(true):

val converter = JacksonCloudEventConverter<SomeSpecificDomainEvent>(jacksonObjectMapper(), URI("urn:test"))
data class SomeSpecificDomainEvent(val id: Long)

@SpringBootApplication
class OccurrentSpringMongoApplication {
    @Bean
    fun eventStore(mongoTemplate: MongoTemplate, dbFactory: MongoDatabaseFactory): SpringMongoEventStore {
        val eventStoreConfig =
            EventStoreConfig.Builder()
                .eventStoreCollectionName("events")
                .transactionConfig(MongoTransactionManager(dbFactory))
                .timeRepresentation(TimeRepresentation.RFC_3339_STRING)
                .build()
        return SpringMongoEventStore(mongoTemplate, eventStoreConfig)
    }

    @Bean
    fun subscriptionPositionStorage(mongoTemplate: MongoTemplate): SubscriptionPositionStorage =
        SpringMongoSubscriptionPositionStorage(mongoTemplate, "subscriptions")

    @Bean
    fun subscriptionModel(
        storage: SubscriptionPositionStorage,
        mongoTemplate: MongoTemplate,
        eventStoreQueries: EventStoreQueries
    ): SubscriptionModel {
        val config = SpringMongoSubscriptionModelConfig.withConfig("events", TimeRepresentation.RFC_3339_STRING).restartSubscriptionsOnChangeStreamHistoryLost(true)
        val mongoSubscriptionModel =
            SpringMongoSubscriptionModel(mongoTemplate, config)
        val durableSubscriptionModel = DurableSubscriptionModel(mongoSubscriptionModel, storage)
        return CatchupSubscriptionModel(
            durableSubscriptionModel, eventStoreQueries,
            CatchupSubscriptionModelConfig(SubscriptionPositionStorageConfig.useSubscriptionPositionStorage(storage))
        )
    }
}

@Component
class Actions(val subscriptionModel: SubscriptionModel, val eventStore: SpringMongoEventStore) {
    @PostConstruct
    fun doWork() {
        val currentEventCount = eventStore.count()
        when (currentEventCount) {
            0L -> {
                println("Writing very first event to db")
                eventStore.write("some-stream", converter.toCloudEvent(SomeSpecificDomainEvent(1)))
            }

            1L -> {
                println("Writing 2nd event to db")
                eventStore.write("some-stream", converter.toCloudEvent(SomeSpecificDomainEvent(2)))
            }

            else -> {
                println("Writing a couple more events")
                eventStore.write("some-stream", converter.toCloudEvent(SomeSpecificDomainEvent(currentEventCount + 1)))
                eventStore.write("some-stream", converter.toCloudEvent(SomeSpecificDomainEvent(currentEventCount + 2)))
            }
        }

        subscriptionModel.subscribe("logging-subscription") { e ->
            println("Event number=${converter.toDomainEvent(e).id} received!")
        }

    }
}

fun main(args: Array<String>) {
    runApplication<OccurrentSpringMongoApplication>(*args)
}
johanhaleby commented 11 months ago

Thank you again! The non-spring-based implementation is not as battle-proven (I've only used Spring Boot in production), but it's very interesting that you get the same problem with the spring implementation. Maybe I've managed to introduce a regression issue somewhere because I currently have Occurrent subscriptions running on atlas in both free- and paid tiers.

However, I'm currently not using the CatchupSubscription in any of my apps. I know it may be a lot to ask, but do you get the same issue if you're not using the "catchup subscription model"? Maybe there's a bug in the CatchupSubscriptionModel.

johanhaleby commented 11 months ago

Oh, sorry I didn't see that the issue was indeed mentioning the CatchupSubscriptionModel 🤦‍♂️ But then it would be even more interesting to see if it works without it. My guess/hope is that it will, and if you stop your app, and restart it with the CatchupSubscriptionModel enabled again, it'll work. But it definitely sounds like a bug!

gavvvr commented 11 months ago

@johanhaleby

... do you get the same issue if you're not using the "catchup subscription model"?

But then it would be even more interesting to see if it works without it.

If I use raw DurableSubscriptionModel without wrapping it into CatchupSubscriptionModel, then it mostly works as expected and saves the "subscription position". One a bit unexpected thing which I noticed is that if you do eventStore.write() right after wrappedSubscriptionModel.subscribe returns, then the 1st event will be missed. But if you add Thread.sleep(1000) between wrappedSubscriptionModel.subscribe and eventStore.write(), all events gets captured by subscription.

My guess/hope is that it will, and if you stop your app, and restart it with the CatchupSubscriptionModel enabled again, it'll work

Yes, if position was persisted by raw DurableSubscriptionModel previously, the CatchupSubscriptionModel will continue where the DurableSubscriptionModel left off.

johanhaleby commented 11 months ago

If I use raw DurableSubscriptionModel without wrapping it into CatchupSubscriptionModel, then it mostly works as expected and saves the "subscription position".

Thanks for investigating this.

One a bit unexpected thing which I noticed is that if you do eventStore.write() right after wrappedSubscriptionModel.subscribe returns, then the 1st event will be missed. But if you add Thread.sleep(1000) between wrappedSubscriptionModel.subscribe and eventStore.write(), all events gets captured by subscription.

Is this also the case even if you do:

wrappedSubscriptionModel.subscribe(..).waitUntilStarted()?

The reason for this is mentioned in the Javadoc for the Subscription:

Subscriptions are typically started in a background thread and you may wish to wait ({@link #waitUntilStarted(Duration)} for them to start before continuing.

Yes, if position was persisted by raw DurableSubscriptionModel previously, the CatchupSubscriptionModel will continue where the DurableSubscriptionModel left off.

Thank you for letting me know.

gavvvr commented 11 months ago

Is this also the case even if you do:

wrappedSubscriptionModel.subscribe(..).waitUntilStarted()?

@johanhaleby the waitUntilStarted() worked out! Did not know about this method :)

So, the CatchupSubscriptionModel is buggy for me in both cases (Spring and native) and the raw DurableSubscriptionModel works as expected.

johanhaleby commented 11 months ago

Is this also the case even if you do: wrappedSubscriptionModel.subscribe(..).waitUntilStarted()?

@johanhaleby the waitUntilStarted() worked out! Did not know about this method :)

Glad to hear that it works. waitUntilStarted() is easy to miss I suppose, I see that it's not well documented besides the javadoc.

So, the CatchupSubscriptionModel is buggy for me in both cases (Spring and native) and the raw DurableSubscriptionModel works as expected.

So CatchupSubscriptionModel is the culprit! I'll try to fix the bug when I get some time, hopefully on friday.

johanhaleby commented 10 months ago

@gavvvr I think I've fixed the issue now in the main branch. I have some other things I want to change/fix as well, but then I'll make a new release, hopefully during the day.

johanhaleby commented 10 months ago

It's been released now, version 0.16.8