johanhaleby / occurrent

Unintrusive Event Sourcing Library for the JVM
https://occurrent.org
124 stars 16 forks source link

Add "catch-up" blocking subscription support #10

Closed johanhaleby closed 4 years ago

johanhaleby commented 4 years ago

I.e. a new util that would wrap a subscription to allow supplying something like "StartAt.date(new Date(..))". This utility should automatically use the EventStoreQueries API to read all events up to the globalSubscriptionPosition if Date(..) is too long back and not found in the oplog.

johanhaleby commented 4 years ago
  1. Kolla om subscription finns i storage, om den finns, pipe:a bara till underliggande subscription
  2. Om den inte finns läs ut globalPosition (A)
  3. Läs sen senaste event som matchar filter från all ström
  4. Läs sen alla events fram till och pipe:a till subscriptionen's action. Hur löses SubscriptionPosition? Kanske skicka med "time" som "MongoDBOperationTimeBasedSubscriptionPosition"?
  5. När den är pipe är färdig, byt till att starta subscriptionen från globalPosition (A), försök eventuellt att filtrera bort duplicates.

    Alt

    1. Läs tidigaste oplog tidsspämpeln som finns (https://stackoverflow.com/questions/25276725/mongodb-read-search-timestamp-based-on-oplog). Kör: db.oplog.rs.find({},{"ts":1}).limit(1) under db "local" Kolla upp om "msg: "initiating set") (när man gör db.oplog.rs.find({}).limit(1)) betyder att det är första write i db! Isf vet vi att allt finns i oplog och behöver inte göra query!
    2. Om positionen man skickat in inte finns i oplog så läs från eventstore queries (skicka till action), så läs till först globalPosition. while-loop krävs om inläsning tar tid och globalPosition inte finns kvar i oplog när det är dags att starta från den.

    Alt 2.

    Om man får reda på vilket exception som händer när oplog är full så kan man använda det för att veta att man ska läsa från db:n istället.

johanhaleby commented 4 years ago

Doesn't seem like one has access to database "local" on e.g. Atlas so we can't use the oplog.rs to get the latest oplog :/

This means that we probably have to always use EventStoreQueries API to catch-up. We can first read the "globalSubscriptionPosition" and stream from the EventStoreQueries API until time <= bsontimestamp of globalSubscriptionPosition.

johanhaleby commented 4 years ago

Fixed