electric-sql / electric

Sync little subsets of your Postgres data into local apps and services.
https://electric-sql.com
Apache License 2.0
6.39k stars 152 forks source link

Periodically compact the server shape log #1582

Open KyleAMathews opened 2 months ago

KyleAMathews commented 2 months ago

Every insert, update, delete operation is written to the log. Eventually this means every shape log will hold a lot of redundant information e.g. 10 updates to the same row which could be merged so there's just one message per row.

At some point, these redundant updates would significantly increase the amount of time spent loading and processing the shape log.

To avoid this, we'll implementing compaction. If you're familiar with Kafka, this is a similar feature — https://docs.confluent.io/kafka/design/log_compaction.html

We'll want the same guarantees — https://docs.confluent.io/kafka/design/log_compaction.html#compaction-guarantees

We'll want to ship something very simple and minimal first. So no configuration.

We'll need to figure out what's the trigger for starting compaction and then how to do it as efficiently as possible with zero downtime.

Kafka uses what they call the "dirty ratio" to trigger compaction i.e. what % of "dirty" messages to to total messages (or for us, extra operation messages for a row).

Their default ratio is 50% which seems reasonable to start with.

kevin-dp commented 2 months ago

One thing to keep in mind is the change we made in https://github.com/electric-sql/electric/pull/1555 which works because we know we have the full log (i.e. don't do any compaction) and thus can resume from any point. I can imagine a compaction algorithm where this approach still works because we only remove redundant information. But let's keep this in mind.

I'm not sure if Kafka's guarantees are enough for Electric, especially the data consistency guarantees provided by the compaction algorithm are a bit weak. Say we have the following log (time goes from top to bottom)

INSERT INTO issues VALUES (1, 'title 1', 'some description');
UPDATE issues SET title = 'issue 1 title' WHERE id = 1;
INSERT INTO issues VALUES (2, 'title 2', 'foo');
UPDATE issues SET description = 'issue 1 description' WHERE id = 1;

Now say we compact this to:

INSERT INTO issues VALUES (1, 'issue 1 title', 'issue 1 description');
INSERT INTO issues VALUES (2, 'title 2', 'foo');

With the compacted log, the end view of the data is the same as with the original log, but we can get inconsistent intermediate views on the data. For instance, we can observe a DB state where issue 1 has the updated title and updated description and issue 2 does not yet exist. In the original log, you can only observe the fully updated issue 1 after issue 2 was inserted.

Imagine for a second that the last update to issue 1 added an FK that points to issue 2, then after compaction we would be able to see a state of the DB where referential integrity is broken.

We could try to alleviate this issue by switching the inserts in the compacted log, but that leads to a similar problem where we can observe issue 2 without issue 1 which was also not possible in the original log.

Kafka's algorithm seems to have that "problem", probably it's not a problem for them, as they only guarantee that users will at least see the final state of all records, and don't provide any further consistency guarantees.

If we want to avoid such inconsistencies, we can only compact sequences of operations that affect the same row. So for the previous example:

INSERT INTO issues VALUES (1, 'issue 1 title', 'some description');
INSERT INTO issues VALUES (2, 'title 2', 'foo');
UPDATE issues SET description = 'issue 1 description' WHERE id = 1;
thruflo commented 2 months ago

Is this only a problem if you have a compaction boundary mid transaction? If compaction always compacts to the end of a transaction and anyone requesting a previous offset gets the later data (which is valid and just a faster way for them to catch up than getting the whole log) then it's all fine?

samwillis commented 2 months ago

Yep, this is where having transaction boundaries in the log would be useful. Essentially everything within the compacted segment of the log becomes a single transaction.

samwillis commented 2 months ago

Also worth noting that the current theory is that most of applications don't really care for transactional consistency. That's why we don't yet have transaction boundaries in the log. They would be useful for transactional databases like PGlite, but we can work around not having them by only determining that we have consistency when we are "up to date".

kevin-dp commented 2 months ago

Indeed, this is important wrt transaction boundaries. You could put the entire compacted portion of the log in 1 transaction to avoid this (given you defer FK constraints till you commit) but as pointed out by @samwillis we don't indicate transaction boundaries in the log yet. Also, there's another issue about serving logs in chunks. If we have transaction boundaries, then we need to respect them when chunking. All doable but need to keep in mind and may warrant further discussion.

samwillis commented 2 months ago

Until we have transactional boundary messages, can we have all messages that are part of a compaction have the same LSN offset? I.e. all messages prior to the end of the compacted segment have the LSN prefix of the last message in the campacted segment? That will ensure the logic we are planning for PGlite to determine "transactions" still works.

KyleAMathews commented 2 months ago

I think the theory we talked about in the past is that we only flush full transactions to the log & clients only process data once they get to end of the log so losing intermediate states won't matter.

We'd also only ever compact the entire log.

magnetised commented 2 months ago

Can I suggest an alternative approach?

In my experiments, I allowed for a form of compaction by replacing a single snapshot followed by a single operations log with the ability to have multiple snapshots and multiple operation logs.

So rather than compacting the operation log, at some point in time you instead stop the current operation log (O1) and start a new snapshot (S2) and operation log (O2). It's possible to do this consistently using the xmin of the new snapshot to end O1 and start O2 at the same point.

So for a client that's at the head, they can continue using the new operation log O2 to get updates as normal. For new clients they use the latest snapshot (S2) followed by the latest operation log O2.

For a client that's behind, there are multiple options: either use whatever bit they need from O1 followed by tailing O2 or -- if they're too far behind -- drop their state and resync from S2 followed by O2.

We'd need some metadata about a shape, to get the list of snapshots and operation logs with their offsets etc so the server can do some logic about how to handle a client. We'd also need to think about the logic of when to re-snapshot, but that's something along the lines of the rules around compaction.

If we put in some queue so that lots of shapes don't start re-snapshotting at the same time it means we can manage the load on pg.

We can also GC by dropping out-of-date snapshots and operation logs.

It's simpler than compaction I think because we avoid the consistency issues highlighted by Kevin and don't have to have separate compaction logic.

thruflo commented 2 months ago

Very elegant. So we kind of allow PG to do our compaction for us.

kevin-dp commented 2 months ago

@magnetised If i'm understanding your proposal correctly, you're saying that we create the new snapshot and the new log but also keep the old snapshot and log around (instead of throwing away the old ones). That means we will need to keep a range-addressable index of LSNs to snapshots such that when somebody comes in with a given LSN we know which snapshot to serve (cc @msfstef @balegas this seems to be the index approach we discussed last week).

msfstef commented 2 months ago

@magnetised @kevin-dp this was something I've briefly explored here and @balegas is also looking into it - I think the main issue for me was assigning an LSN to the snapshots for resuming people following from the existing log.

I suppose if you throw out the old log and start collecting the new one based on the xmin of the new snapshot that would work but I'm not sure if you would be able to guarantee that the old log and the new log "connect"/"overlap" unless you keep maintaining both for a bit.

Ideally if you could assign to the snapshots a global order/lsn then you only need to maintain a single operation log, to which all snapshots can "merge" to, and only need to trim it at the end when some of it needs to get collected. Compaction is just "use the latest available snapshot and resume to the log from there".

samwillis commented 1 month ago

Another alternative (similar to @magnetised, but with a permanent tombstone log):

When a client requests an offset in the past before the last wipe+snapshot we return the tombstone log from that point up to the new snapshot, then that and the new log.

There are two implementation options:

  1. Only save the deletes to the tombstone log, then we have to join the main log with the tombstone log when streaming out.
  2. Save the deletes to both the tombstone log and the main log, significantly simplifies the implementation at a small cost to storage.

(An efficient tombstone log also potentially let us support indefinite resume to shapes)

kevin-dp commented 1 month ago

@samwillis So, instead of compacting the log you're proposing to keep a set of PKs that are tombstones. On an initial fetch, we would have to first stream all the tombstones, then stream the log. Then for subsequent requests with a given offset that is in the current log, we cannot simply resume from that offset under option 1 because there may be a new delete that is in the tombstone set but not in the log. So we would have to stream the entire tombstone set before resuming from the given offset. This seems suboptimal to me. We can avoid this by keeping track which client has seen which tombstones but that's also suboptimal. So for that reason, i'd say only option 2 is viable.

However, why keep a tombstone set + snapshot + log if we can just keep a single compacted log which comprises all rows and all tombstones?

magnetised commented 1 month ago

@kevin-dp

That means we will need to keep a range-addressable index of LSNs to snapshots such that when somebody comes in with a given LSN we know which snapshot to serve

Did you mean index of LSNs to logs and "when somebody comes in with a given LSN we know which snapshotlog to serve ". If somebody is joining with a known LSN offset, then we need to know which one of the set of available logs to start serving them from. So we would need some metadata about each log, specifically its starting and ending LSN, in order to do that.

magnetised commented 1 month ago

@msfstef

I suppose if you throw out the old log and start collecting the new one based on the xmin of the new snapshot that would work but I'm not sure if you would be able to guarantee that the old log and the new log "connect"/"overlap" unless you keep maintaining both for a bit.

yeah, at the point of starting a new snapshot you do something like:

  1. put the current log on pause, so it accumulates txns temporarily
  2. start a new log, also paused so it accumulates txns
  3. start the new snapshot
  4. receive the xmin from the snapshot
  5. unpause the old log telling it to stop at xmin*
  6. unpause the new log telling it to start at xmin*

* all my ideas include a free off-by-one error

so for a while you have two logs running but they accept different txns.

the need for pausing depends on the implementation - e.g. 3 and 4 could happen "atomically" in the same sync function so there's no chance of receiving a txn between them, in which case you just need to make sure both logs start and end at the right point.

and maybe there's no reason to discard the logs. maybe they're not that huge. but there's definitely a point where it's going to be quicker for the client to drop everything and just start from the latest snapshot than pull in endless numbers of updates.

there're probably heuristics we can figure out that determine snapshot frequency and gc behaviour, which may just involve comparing file sizes.

kevin-dp commented 1 month ago

@kevin-dp

That means we will need to keep a range-addressable index of LSNs to snapshots such that when somebody comes in with a given LSN we know which snapshot to serve

Did you mean index of LSNs to logs and "when somebody comes in with a given LSN we know which ~snapshot~log to serve ". If somebody is joining with a known LSN offset, then we need to know which one of the set of available logs to start serving them from. So we would need some metadata about each log, specifically its starting and ending LSN, in order to do that.

Exactly that. Apologies for saying "snapshot" when i meant "log".

kevin-dp commented 1 month ago

but there's definitely a point where it's going to be quicker for the client to drop everything and just start from the latest snapshot than pull in endless numbers of updates.

@magnetised Entertaining the idea of a single log that can be periodically compacted, this would depend on how often we compact the log. One extreme would be to compact after every operation, meaning users always get just the deltas. Doing that for every operation is of course expensive, thus we need to strike a good balance.

magnetised commented 1 month ago

Thinking about something @icehaunter said to me, that we would be "implementing a bad postgres", isn't compacting a series of changes into a final result exactly what pg is doing? rather than re-implement this (badly), we could just query pg.

samwillis commented 1 month ago

@kevin-dp

Yes, however on initial fetch (new shape for the client) you do not need to send the tombstone log as they have no history to clean up. The tombstone log is only for clients doing a request from an offset further in the past that the last snapshot (but not offset = -1).

With option 1 the idea was to interweave both the main log and the tombstone log on the server when returning a response from a given offset. This is much more complex than just duplicating the deletes in the main log and returning that as in option 2. I much prefer option 2.

However, why keep a tombstone set + snapshot + log if we can just keep a single compacted log which comprises all rows and all tombstones?

Because there is no compaction process at all, just using Postgres to generate a new snapshot, it is much much simpler to implement. And the advantage over @magnetised idea is that we don't have to keep about the old log for people further in the past.

samwillis commented 1 month ago

@KyleAMathews

I think we need to clarify what guarantees we want to provided. In your initial message:

We'll want the same guarantees [as Kafka] — https://docs.confluent.io/kafka/design/log_compaction.html#compaction-guarantees

However Kafka compaction is giving stronger guarantees than we have previously discussed.

In our discussions the plan was to only guarantee correctness when the client reaches an "up-to-data" message. If that is the case we don't have to worry as much about reordering messages, and we can have a much simpler system that throws away all history and creates a new snapshot. The only history we need is deletes prior to a snapshot.

KyleAMathews commented 1 month ago

Very interesting. Compacting to just the deletes while creating a new snapshot for new people periodically.

So we have three logs?

  1. Delete log
  2. A log of all operations since the shape was first created for people to follow
  3. A snapshot log

This accounts for the following use cases:

  1. Fast initial load without many extra operations
  2. Existing users can subscribe to new operations and catch up when coming back from off-line.

It is suboptimal for large fast changing shapes as then catching up with the shape means downloading lots of operations or redownloading the entire shape.

It still seems to me that just compacting one log is simplest and fastest for all use cases.

KyleAMathews commented 1 month ago

So thinking about this again — why don't we just start with just tossing the shape when it gets too many duplicate keys? That'll be an easier stepping stone than all the complex options we've been discussing. We'll just need to periodically scan the shape log and count what % of keys are duplicates and then if it's greater than e.g. 50%, we delete the shape.

This obviously isn't as efficient as any of the many other solutions we've discussed but it's 90% easier to implement and still solves the problem just fine for 95% of use cases.

KyleAMathews commented 1 month ago

Premature optimizations are the root of all evil and all that.

kevin-dp commented 1 month ago

Reading back through all of this and my original RFC on Slab (i invite you to read it if you did not already). I think the confusion/disagreement comes from the problem statement.

When i wrote the RFC, the primary problem statement i had in mind was that the shape log grows unboundedly and will eventually become too big to keep around. Therefore, i proposed a very simple algorithm for removing redundant information from the log. This will do the job of keeping the shape log bounded to the number of rows there are in the table. As a side-effect, it can also speed up initial sync since users will only get 1 change per row.

What i realise now is that the other algorithms that have been proposed in this thread aim at solving a different problem: speeding up initial sync. For instance, @magnetised proposal boils down to having the shape log and periodically taking a snapshot such that new users can sync from the latest snapshot and need to catch up with less operations. While this speeds up initial sync, this does not solve the memory problem as it is still keeping the entire log.

So, i think we need to agree what is the problem we are trying to solve here. If it is to keep memory/disk usage within reasonable bounds, than the simple compaction algorithm proposed on Slab should do well. If it is about speeding up initial sync and we're fine keeping the entire log around forever, then periodically taking a new snapshot (and associating it to the right offset) is probably best. (cc @balegas @KyleAMathews @thruflo @magnetised @samwillis)

KyleAMathews commented 1 month ago

A compacted log would still be very fast to load — even if there's a several thousand delete tombstones. We probably want a heuristic for detecting when those have grown too large as well so we just cycle the shape altogether. But the main value of compaction at all is to keep initial sync time reasonable and allow subscribers to large shapes to not have to re-sync all the time.

balegas commented 1 month ago

The two competing approaches for compaction are: (1) traverse the existing log to remove old entries (2) requery the database and replace the initial query result.

Both solutions allow faster initial sync and keeping the log size bounded (even if that was not discussed for approach (2)).

Both solutions are strong and weak in different situations and a mature system would allot configuring the compaction strategy per-table.

However I think we should move with (1) because it is more friendly with a mix of shape patterns.

magnetised commented 1 month ago

(2) adds unpredictable extra load into it

actually it's predictable extra load in that you can schedule the new snapshot to run when you know demand is low and at a frequency that suits the data size.

balegas commented 1 month ago

depends on the workload, requires an heuristic and a scheduler and query result sizes still varies. it's not the load on electric my main concern, is the load on pg.

KyleAMathews commented 3 weeks ago

An interesting optimization we can do potentially (not on the initial pass for compaction) is sort rows during compaction by activity. A small % of rows tend to get the most updates. So put those at the front of the log and the less active ones at the back. That way we often do shorter compaction of just the front 20% of the log vs. needing to re-compact the entire log each time. This is most relevant for really big logs of course.