eugene-khyst / postgresql-event-sourcing

A reference implementation of an event-sourced system that uses PostgreSQL as an event store built with Spring Boot. Fork the repository and use it as a template for your projects. Or clone the repository and run end-to-end tests to see how everything works together.
Apache License 2.0
986 stars 84 forks source link

Does this work with two writers? #2

Closed hjohn closed 1 year ago

hjohn commented 1 year ago

Let's say I have two instances of the service. Both services are creating events. When using postgres, it's possible for there to be gaps in the bigserial id used for the event table.

For example, it's possible for there to be visible id's 0, 1, 3, 4, 5 while the other service is still in the process of getting an event committed with id 2.

eugene-khyst commented 1 year ago

In PostgreSQL SERIAL and BIGSERIAL are built on top of SEQUENCE. It is possible to have gaps in sequence values if nextval was called but the transaction was rolled back. Sequence value is not influenced by the transaction rollback. Thus, the next transaction will get the next value calling nextval.

But I don't see how this effects concurrent writers.

There is an optimistic concurrency control that prevents lost updated anomaly: https://github.com/evgeniy-khist/postgresql-event-sourcing/blob/main/src/main/java/com/example/eventsourcing/eventstore/service/OrderEventStore.java

hjohn commented 1 year ago

Gaps can also be temporary, depending on commit order. If there are two transactions, one which claims sequence 1 and the other claims sequence 2, if the latter commits first, your table will temporarily not have an event with id 1. If listeners query this at the wrong moment, they might conclude that 1 was rolled back, even though it may appear milliseconds later.

eugene-khyst commented 1 year ago

I don't get the problem. ID is a surrogate key. There is no logic taking the ID value into account. You can change BIGSERIAL to UUID and everything will keep working correctly. The optimistic concurrency control is based on a version check (VERSION column): https://github.com/evgeniy-khist/postgresql-event-sourcing/blob/0e46c4c3a6609755194822ee5526700084e55ebe/src/main/resources/db/migration/V1__init.sql#L9 The version column can also be used to get the predictable order of events in the stream: https://github.com/evgeniy-khist/postgresql-event-sourcing/blob/0e46c4c3a6609755194822ee5526700084e55ebe/src/main/java/com/example/eventsourcing/eventstore/repository/OrderEventRepository.java#L58

hjohn commented 1 year ago

What I mean is, there can be two processes doing (for different aggregates):

 UPDATE ORDER_AGGREGATE SET VERSION = VERSION + 1 WHERE ID = ? AND VERSION = ?
 INSERT INTO ORDER_EVENT(AGGREGATE_ID, VERSION, EVENT_TYPE, JSON_DATA) VALUES(?, ?, ?, ?)

The ID in ORDER_EVENT is BIGSERIAL.

Temporarily this ID column can contain gaps. If the second transaction commits first, there will be a gap in ID for a short while:

  Thread 1                    Thread 2
  INSERTS 101
                               INSERTS 102
  <hiccup>
                               COMMIT
   (subscription queries run)
   COMMIT

The subscription code runs this query:

  SELECT LAST_ID FROM ORDER_EVENT_OUTBOX WHERE SUBSCRIPTION_GROUP = ? FOR UPDATE NOWAIT

Let's say LAST_ID is 100, the ID from the first ORDER_EVENT is going to be 101 and the ID from the second ORDER_EVENT is 102 that is getting committed first.

Then these queries run:

 SELECT ID, EVENT_TYPE, JSON_DATA FROM ORDER_EVENT WHERE ID > ? ORDER BY ID ASC

This returns a single record, 102. Then you update LAST_ID with it:

 UPDATE ORDER_EVENT_OUTBOX SET LAST_ID = ? WHERE SUBSCRIPTION_GROUP = ?

Only now, the transaction commits that makes ID 101 visible. Your subscription however will skip it and never put it in the outbox since it already saw 102.

eugene-khyst commented 1 year ago

@hjohn, after more detailed analysis I think it is not possbile to have mess with ID ordering and thus it is safe to use the solution with multiple concurrent writers.

The part that solves potential concurrency issues is https://github.com/evgeniy-khist/postgresql-event-sourcing/blob/0e46c4c3a6609755194822ee5526700084e55ebe/src/main/java/com/example/eventsourcing/eventstore/repository/OrderEventRepository.java#L32

UPDATE ORDER_AGGREGATE SET VERSION = VERSION + 1 WHERE ID = ? AND VERSION = ? acquires lock on the ORDER_AGGREGATE row, so another transaction will have to wait to acquire lock.

Transaction 1 Transaction 2
UPDATE ORDER_AGGREGATE SET VERSION = VERSION + 1 WHERE ID = 101 AND VERSION = 2
Acquired lock on ORDER_AGGREGATE row with ID 101 UPDATE ORDER_AGGREGATE SET VERSION = VERSION + 1 WHERE ID = 101 AND VERSION = 2
Wating to acquire a lock on ORDER_AGGREGATE row with ID 101
INSERT INTO ORDER_EVENT(AGGREGATE_ID, VERSION, EVENT_TYPE, JSON_DATA) VALUES(101, 2, ?, ?) wating...
ID for ORDER_EVENT is generated from sequence waiting
Transaction is committed wating...
Optimistic concurrency control error in aggregate 101: actual version doesn't match expected version 2
Transaction is rolled back

Thus, I think the following situation is impossible:

For example, it's possible for there to be visible id's 0, 1, 3, 4, 5 while the other service is still in the process of getting an event committed with id 2.

hjohn commented 1 year ago

Yes, but you are updating the same aggregate here. If you update two different aggregates, then the step:

 ID for ORDER_EVENT is generated from sequence

... can happen in a different order than the final commit order. A record can therefore become visible with a higher ORDER_EVENT.ID which a subscriber may see before seeing one with a lower id. If the subscriber is unaware of this, then it may never process the event with the lower id once it becomes visible.

The problem is described here as well, as I seem to have some trouble explaining it:

https://softwaremill.com/implementing-event-sourcing-using-a-relational-database/

Jump to part about "transactions might be committed out-of-order" and the use of advisory locks to solve it. Also this part is interesting:

If a gap in the id s sequence is found, Akka queries the database for up to 10 seconds, checking if the gap hasn’t been filled. If this doesn’t, the gap is assumed to be “genuine”. As all of the transactions are short-lived and fail only if there’s a connection failure (there are no constraints, which might fail), this rarely happens.

Alternatively, you could look at the https://apidocs.axoniq.io/3.2/org/axonframework/eventsourcing/eventstore/GapAwareTrackingToken.html

In the above the listener is aware of potential gaps in the streams they're monitoring and checks every so often if they're filled (as commits become visible).

eugene-khyst commented 1 year ago

@hjohn, I got your point. In the article a different DB schema is proposed (only events table without aggregates table to acquire lock on it), so there is a risk or building invalid projection from the events.

  • transactions might be committed out-of-order, i.e. a slower transaction might insert an event with a lower sequence number; another transaction might insert another event with the next sequence number and commit; and only later the first transaction might commit

Because of this, reconstructing consistent projections using the order defined by sequence numbers can still yield different results than what you'd see on a "live" system, or even worse, miss some events altogether.

In my sample, the problem you describe is relevant to the Transaction Outbox pattern implementation only. I will change the implementation of the Transaction Outbox pattern to not rely on the "offset" concept (LAST_ID) but instead track all records individually. The order of events within the same aggregate is guaranteed but not between events of different aggregates. So, I don't have to do anything with ordering, only with the possible lost of event notifications.

Thanks for raising the issue.

hjohn commented 1 year ago

In my sample, the problem you describe is relevant to the Transaction Outbox pattern implementation only.

Yes, exactly.

I will change the implementation of the Transaction Outbox pattern to not rely on the "offset" concept (LAST_ID) but instead track all records individually. The order of events within the same aggregate is guaranteed but not between events of different aggregates. So, I don't have to do anything with ordering, only with the possible lost of event notifications.

Yeah, the lock per aggregate ensures that there never will be an event visible that is a newer version of the same aggregate as an unfilled gap. As soon as the gap is filled though, you must put it in the outbox before any other events :)

eugene-khyst commented 1 year ago

Fixed and described the solution https://github.com/evgeniy-khist/postgresql-event-sourcing/tree/main#4-7-1