palantir / atlasdb

Transactional Distributed Database Layer
https://palantir.github.io/atlasdb/
Apache License 2.0
50 stars 9 forks source link

Store data in SweepableTimestamps more efficiently #7193

Open jeremyk-91 opened 1 month ago

jeremyk-91 commented 1 month ago

Sweepable timestamps is a table that is used to track which rows and columns of the sweepable cells table actually contain interesting data, since the queue may be sparse if there is a fast forward or timelock instability. To quote the ADR:

This is an auxiliary table for locating the next row of the sweepableCells table to read since the timestamp partitions can be sparse, and therefore requiring many lookups to locate a nonempty row. Each non-dedicated row of sweepableCells is represented by a single cell in this table.

The last statement is not true in practice. The current implementation actually stores each partition by a single (Atlas-level) cell - if one reviews the schema, one will notice that the granularity the description claims is impossible.

Row components: shard: a VAR_LONG containing the shard for which the row has entries for. timestamp_partition: a VAR_LONG corresponding to the coarse timestamp partition of all entries in the row. sweep_conservative: a boolean (encoded as a BLOB) specifying if the row contains entries for thorough (0x00) or conservative sweep (0x01). Column components: timestamp_modulus is the fine timestamp partition of a row in SweepableCells that falls into the coarse partition specified in the timestamp_partition row component. Value: unused empty byte array.

This schema does match the implementation. However, it is inefficient: suppose we are dealing with the fine partition 1 in shard 0 for the thorough sweep strategy. If a transaction at timestamp 50,000 writes to a thorough table, we write to the cell (0 || 0 || 0x00, 1) in sweepable timestamps, but with an Atlas timestamp of 50,000. In other words, as far as Cassandra is concerned, the "key" we use is 0 || 0 || 0x00 || 1 || 50000. Then, if a transaction at timestamp 50,001 writes, we would use 0 || 0 || 0x00 || 1 || 50001, and so on.

Given the read patterns of this table, this is very unnecessary. The queues themselves are read with getRowsColumnRange at SweepQueueUtils.READ_TS = Long.MAX_VALUE. We correctly use a batch size of 1 for SweepableTimestamps, and the Atlas code does figure out that to get the next page we should just skip over the existing column.

However, this seems to do a lot of unnecessary work/bookkeeping (in the worst case 50000x; figures on my internal test stacks seem to still suggest thousands of writes of different cells here, when one might suffice). There's also a bit of fragility here: if someone accidentally changes the batch size of reading this table, they might be in for a surprise - a small number yields no gain and wasted work, and a big number might cause a surprising OOM or timeout.

Deletes from this table take place at a fresh timelock timestamp. This would be a semantic change in situations where we deleted an entry from sweepable timestamps before a subsequent writer at a higher Atlas timestamp came in (because its sweepableTimestamps entry is now lower than we would have previously written). But that shouldn't be possible: the notional subsequent writer must have a timestamp below the sweep timestamp (since we're trying to delete from sweepable timestamps for a cell it is writing to), and that is by definition less than a fresh timelock timestamp.

The only reason I'm not immediately fixing this (e.g., by setting the write timestamp for sweepable timestamps to always be zero) is that I suspect this is bad for DbKVS (though maybe @CVdV-au might be able to comment on that). Switching from "writes to a bunch of different cells" to "repeated writes to one cell" in a relational world without having some precautions around doing the write only if it doesn't exist, or similar, seems risky.

CVdV-au commented 1 month ago

Repeated inserts with the same primary key will trigger a PK violation. This can be problematic depending on where the relational txn boundary is - we don't want the PK violation in the sweep insertion to rollback other work.

Even if we split the sweep insert from the rest of the work, we still need to deal with the row already existing.

The way around that is to use UPSERTS, which use row (and escalate to table if the row can't be found) to serialise access.

Eg in Oracle:

MERGE INTO my_table t USING (SELECT 1 AS id, 'Alice' AS name FROM dual) s ON (t.id = s.id) WHEN NOT MATCHED THEN INSERT (id, name) VALUES (s.id, s.name);

Postgres: INSERT INTO employees (employee_id, employee_name) VALUES (1, 'John Doe') ON CONFLICT (employee_id) DO NOTHING;

jeremyk-91 commented 1 month ago

@CVdV-au Makes sense, thanks. Assuming we do put in the work for that, I'm trying to get a feel for if/how much the end state would be better: my intuition would be that storage wise this would have less churn as we'd do much fewer actual writes and deletes, but I don't know if there are runtime cost implications. Currently, every AtlasDB write transaction currently writes either 1 or 2 distinct cells in this table that are eventually deleted when sweep progresses past it, and we'd change it so every AtlasDB write transaction does 1 or 2 of the aforementioned upserts to a cell that changes roughly every 3000 transactions or so, and these are eventually deleted when sweep progresses past it.

CVdV-au commented 1 month ago

Fewer writes will likely trump the extra cost of the lock contention - every writer needs to take a row level exclusive lock since the state of the row being upserted needs to be controlled so this may introduce a point of serialisation that we don't suffer from in the current design. If that proves to be a problem (I doubt it will be) we mitigate most of the badness by keeping a very small cache of rows that are known to have been committed. Before issuing the upsert, we can test that cache. Obviously, that won't be perfect since we've got multiple nodes attempting the same insert, but it will avoid most collisions. The complication in doing this is that the cache can only be populated after the rel DB txn is successfully committed.