johanhaleby / occurrent

Unintrusive Event Sourcing Library for the JVM
https://occurrent.org
120 stars 16 forks source link

Fresh CatchupSubscriptionModel does not write subscription position if Mongo user does not have rights for 'hostInfo' and no new events arrived #148

Closed gavvvr closed 6 months ago

gavvvr commented 8 months ago

Given:

val connectionString = System.getenv("MONGO_CONNECTION_STRING") ?: DEFAULT_MONGO_CONNECTION_STRING

val mongoClient = MongoClients.create(connectionString)
val dbName = DB_NAME
val database = mongoClient.getDatabase(dbName)

val eventsCollectionName = EVENTS_COLLECTION
val eventStore =
    MongoEventStore(mongoClient, dbName, eventsCollectionName, EventStoreConfig(TimeRepresentation.RFC_3339_STRING))

val nativeMongoSubscriptionModel = NativeMongoSubscriptionModel(
    database,
    eventsCollectionName,
    TimeRepresentation.RFC_3339_STRING,
    Executors.newCachedThreadPool(),
    RetryStrategy.retry().maxAttempts(1),
)
val positionStorage =
    NativeMongoSubscriptionPositionStorage(database, "subscription-position-collection")
val wrappedSubscriptionModel = DurableSubscriptionModel(
    nativeMongoSubscriptionModel,
    positionStorage,
)
val catchupSubscriptionModel = CatchupSubscriptionModel(
    wrappedSubscriptionModel,
    eventStore,
    CatchupSubscriptionModelConfig(useSubscriptionPositionStorage(positionStorage))
)

catchupSubscriptionModel.subscribe("subscription") { e ->
    println("Event number=${e.id} received!")
}.waitUntilStarted()

When:

Then: subscription's position does not get written to subscription-position-collection (which leads to duplicated event processing if you restart your code before new events arrive)

gavvvr commented 8 months ago

I created a repo to easily experiment with this situation: https://github.com/gavvvr/occurrent-148 If you just start docker-compose stack and wait a bit for completion you will see that subscription-position-collection is not created... unless you generate one more event with the test 🤔

johanhaleby commented 8 months ago

@gavvvr Thanks for you report! I'm on vacation and I don't have my computer with me. I'll look at this when I come back, but it'll probably take 2 weeks or so.

johanhaleby commented 7 months ago

I'm sorry for taking this long to look into this. At least I've managed to confirm it (thanks A LOT for your example project). I think I've fixed this problem in the spring implementation so need to look at how it's solved there.

johanhaleby commented 7 months ago

@gavvvr Could you please try the very latest (non-released version) and see you if get the same problem?

I.e. you need to check-out and build Occurrent, it should work by just doing mvn clean install -Dmaven.test.skip=true.

gavvvr commented 6 months ago

Hi @johanhaleby

I've just tested it from the top of master and, unfortunately, the issue is still present. To test it on your own easily, don't built the app's image from sources with Docker.

Add mavenLocal() to repositories {} and use jib (./gradlew :jibDockerBuild) to build app's image with Gradle:

// build.gradle.kts changes

plugins {
    application
    kotlin("jvm") version "1.9.22"
    id("com.google.cloud.tools.jib") version "3.4.0" // <--- add jib plugin
}

// add the below jib configuration
jib {
    from {
        platforms {
            platform {
                architecture = "arm64" // <--- make sure to change this, if you have amd64
                os = "linux"
            }
        }
        image = "gcr.io/distroless/java21-debian12:debug-nonroot"
    }
    to {
        image = "dockerized-occurrent-app"
    }
}
johanhaleby commented 6 months ago

@gavvvr Okay, now I finally understand the issue :) The reason why the events will be replayed again when no global position is found is that, well there's no global position to store.. So unless a new event comes in, there's no position to continue from. Thus all the events will be replayed again (until the first new event arrives because then we can get the position of the event in the oplog).

A workaround for this is to instruct the catchup subscription to save the position of the events that it reads from the event store while it's in the catchup phase. In your example, you would need to change:

 val catchupSubscriptionModel = CatchupSubscriptionModel(
        wrappedSubscriptionModel,
        eventStore,
        CatchupSubscriptionModelConfig(useSubscriptionPositionStorage(positionStorage)
    )

to

 val catchupSubscriptionModel = CatchupSubscriptionModel(
        wrappedSubscriptionModel,
        eventStore,
        CatchupSubscriptionModelConfig(useSubscriptionPositionStorage(positionStorage)
                              // This is the workaround!
                              .andPersistSubscriptionPositionDuringCatchupPhaseForEveryNEvents(1))
    )

But even if you did this there was another bug that caused an IllegalArgumentException to be thrown. I've fixed this and I plan to release a new version now :)

johanhaleby commented 6 months ago

I've released a new version, 0.17.1 now so I'm going to close this issue. Re-open if you don't agree! And thanks for our patience :)