johanhaleby / occurrent

Unintrusive Event Sourcing Library for the JVM
https://occurrent.org
124 stars 16 forks source link

Add support for "Dynamic Consistency Boundary" (DCB) #166

Open johanhaleby opened 2 days ago

johanhaleby commented 2 days ago

The goal of this issue is to support the approach discussed in various talks such as this.

First of all, WriteCondition should be renamed and a "condition" method should be added:

ConsistencyCondition {
    fun streamVersionEq(...)
    fun condition(Filter filer) // It's the filter that we specify types etc, like Filter.type(in("A", "B"))
}

Actual Example

A "User" has a name and an address. Atm, there are no invariants between name and address, so we decide to create two deciders/aggregates/functions for the User, one that deals with "name changes" (i.e. one that is interested in "name" related events), and another that deals with "address changes" (i.e. one that is interested in "address" related events).

Now let's say we have a stream for this User at stream1 that contains the following events:

stream1: NameDefined, AddressDefined, AddressChanged, NameChanged, AddressRemoved

If the user wants to change his/her name again, this is how it can be done without taking the address events into account:

val eventStream = eventStore.read("stream1", condition(Filter.type(in("NameDefined", "NameChanged", "NameRemoved"))))
val newEvents = NameAggregate.changeName(eventStream.events(), "John")
val eventStore.write("stream1", eventStream.consistencyCondition(), newEvents);

The EventStream returned by read looks something like this today:

public interface EventStream<T> {
    String id();
    long version();
    Stream<T> events();
}

This should be changed into something like this:

public interface EventStream<T> {
    String id();
    long version();
    String consistencyTag;
    ConsistencyCondition consistencyCondition();
    Stream<T> events();
}

Now, in the ES impl we can no longer just check that the streamVersion is equal to the stream version in the event store when writing new events. Instead, before returning the EventStream we need to calculate the consistencyTag (mongodb example):

db.events.aggregate([
    {
        $match: { /* "streamId + consistencyCondition" query */ }
    },
    {
        $project: { id: 1 }
    },
    {
        $group: {
            _id: null,
            consistencyTag: { $md5: { $toString: "$id" } }
        }
    }
]);

This will generate a unique consistencyTag per write. Note that we don't store the consistency tag in the database, I don't think that is required, it's an in-memory thing and it's calculated on the fly. So, instead of reading the "stream version" and comparing it to the supplied stream version of the WriteCondition, we can just check if the result of the "consistencyTag" matches. This means that we're also backward compatible, because the default streamVersionEq condition will just yield "streamId+streamVersion" consistency condition.

Now, if the business rule changes, and you cannot change the name to "John Doe" if the address is "Address street", we can change the "consistency condition" to include more events (or the entire stream like before).

Note 1

While the above (hopefully) solves the case where you have smaller "logical aggregates" than the number of different event types you have in the stream, it doesn't solve the reverse problem. For example, if you represent the "User" in two different streams, one for name and one for address:

user1:name: NameDefined, NameChanged
user1:address: AddressDefined, AddressChanged, AddressRemoved

To solve this case, if the new business rule is applied, the consistencyCondition must span multiple streams so the EventStream model above needs to be changed. Maybe to this:

public interface EventStream<T> {
    List<StreamInfo> streams();
    String consistencyTag;
    ConsistencyCondition consistencyCondition();
    Stream<T> events();
}

where StreamInfo contains "streamId" and "version" for each involved stream. Then we can we can perform the use case like this:

val eventStream = eventStore.read(streamId(in("user1:name", "user1:address").and(type(in("NameDefined", "NameChanged", "NameRemoved", "AddressDefined", "AddressChanged", "AddressRemoved"))))
val newEvents = NameAggregate.changeName(eventStream.events(), "John")
eventStore.write("user1:name", eventStream.consistencyCondition(), newEvents);

Note the subtle difference on the "read" method here. We don't pass a "streamId" explicitly, but only a ConsistencyCondition. This means that read(<streamId>, <consistencyCondition>) will just do:

read(streamId(<streamId>).and(<consistencyCondition>))

Another easier and arguably better way (if only these two streams exist for a User), would be to change the consistency condition to make use of a "subject" filter:

val eventStream = eventStore.read(subject("user1"))
val newEvents = NameAggregate.changeName(eventStream.events(), "John")
val eventStore.write("user1:name", eventStream.consistencyCondition(), newEvents);

Note 2

A cool thing if we implement what's implied by "Note 1" is that we can have consistency conditions that selectively pick explicit events from different streams like this:

Filter nameChangesForUser1 = stream("user1:name").and(type("NameChanged"))
Filter adressChangesForUser1 = stream("user1:address").and(type("AddressChanged"))
val eventStream = eventStore.read(nameChangesForUser1.and(adressChangesForUser1))

Note 3

We should NOT replace "version" with the consistency tag. This won't be good because then projections have no way to compare etags "in time" (i.e. now you can check if a version is greater than another version).

Note 4

Consistency conditions will only be performant if correct indexes are added.

Note 5

Maybe it would be a good idea to include some sort of "global time" when a consistency tag is generated:

db.events.aggregate([
    {
        $match: { /* "consistencyCondition" query */ }
    },
    {
        $project: { id: 1 }
    },
    {
        $group: {
            _id: null,
            consistencyTag: { $md5: { $toString: "$id" } }, 
            timestamp: { $first: "$$NOW" }
        }
    }
]);

Then the EventStream could contain a ConsistencyTag object:

ConsistencyTag(String tag, ZonedDateTime globalTimestamp)

Idea: Could this replace the "stream version" altogether? For example, instead of doing:

val eventStream = eventStore.read("streamId")
..
eventStore.write("streamId", eventStream.version(), newEvents);

you could do:

val eventStream = eventStore.read("streamId")
..
eventStore.write("streamId", eventStream.consistencyTag(), newEvents);

The "consistencyTag" here is constructed from the mongo aggregation above, with an (implicitly created) "consistency condition" of "streamId".

Maybe it would be a good idea to write the consistencyTag, including the timestamp instead of the version. This way you would have a global position as well!? (it'll fail as soon as the time is changed on the server though which is bad).

Note 6

This is a variant of "Note 5", but we instead combine the md5 checksum + timestamp into the "consistency tag":

db.events.aggregate([
    {
        $match: { /* "consistencyCondition" query */ }
    },
    {
        $project: { id: 1 }
    },
    {
        $group: {
            _id: null,
            checksum: { $md5: { $toString: "$id" } }, 
            timestamp: { $first: "$$NOW" }
        }
    },
    {
        $project: {
            rawConsistencyTag: {
                $concat: [
                    "$checksum",
                    ":",
                    { $dateToString: { format: "%Y-%m-%dT%H:%M:%S.%LZ", date: "$timestamp" } }
                ]
            }
        }
    }
]);

Then in the code we could Base64 encode the rawConsistencyTag so that it's possible to easily just send it in HTTP query params etc (this cannot be done in MongoDB). The ConsistencyTag object in EventStream may look like this:

// consistencyTag is Base64 encoded
record ConsistencyTag(String consistencyTag) {
       String checksum() {
            return Base64.decode(consistencyTag).substringBefore(":");
       }

       ZonedDateTime timestamp() {
                  var timestampString = Base64.decode(consistencyTag).substringAfter(":")
                 return someDateTimeFormatter.parse(timestampString);
       }

      boolean isAfter(ConsistencyTag ct) { ... }
      boolean isBefore(ConsistencyTag ct) { ... }
}

The cool thing with this is that projections (or whoever else) that need to check if one ConsistencyTag is before or after another can easily do so (and it works globally).

johanhaleby commented 12 hours ago

Also note that if we're to generate a consistencyTag in the db, it should be based on "id+source".

Also, how well does the consistencyTag calculation do for large streams?

johanhaleby commented 12 hours ago

We must also consider backward compatibility with old versions that uses "stream version"