kanidm / concread

Concurrently Readable Data Structures for Rust
Mozilla Public License 2.0
343 stars 15 forks source link

Feature request: atomic operations #71

Closed ckaran closed 2 years ago

ckaran commented 2 years ago

This feature request is still a little nebulous as I'm still trying to figure out how it really needs to work, so if it doesn't make complete sense yet, I apologize. Maybe we can figure it out completely together.

As an exercise, I'm sketching out something like an ECS by using a set of ARCache instances to hold columns of data, and UUIDs as keys going across each of the caches (so, very roughly, this will look like a table-oriented database, where columns are ARCache instances, and rows are calculated at runtime via the UUIDs). The issue that I'm having is that when I want to do an update I need something like a compare & swap; either all of the writes commit at the same time, or none of them do. The easy way to do this is to wrap the whole thing in a RwLock, but I don't think that's the most efficient way because different threads are probably operating on disjoint regions and won't affect, or be affected, by the parts I'm operating on. So what I'm really after is something like a write transaction that works across groups of trees and keys within those trees. The basic plan would be something like the following:

Roughly speaking, this would be like having a read/write lock on every element of the cache, where you acquire a write lock on every element of interest before you attempt to do any mutations. But if there is a better way of doing this, I'm all ears!

Firstyear commented 2 years ago

This is a really interesting problem. I'd model it a bit differently. We have a similar system in https://github.com/kanidm/kanidm with the index caching that occurs here https://github.com/kanidm/kanidm/blob/master/kanidmd/src/lib/be/idl_arc_sqlite.rs#L64 and https://github.com/kanidm/kanidm/blob/master/kanidmd/src/lib/be/idxkey.rs#L12

This turns the problem from many caches, to one cache with compound keys of structs/tuples. So this idl cache actually holds caches from multiple "tables" underneath.

During a write operation, we can hold the write in memory and only flush it on commit. And then in the write process we can write down each dirty item to it's respective table. So everything happens, or none of it during the write txn. Which sounds like what you want with your transaction here. Have a look at https://github.com/kanidm/kanidm/blob/master/kanidmd/src/lib/be/idl_arc_sqlite.rs#L562 for how we do that.

So I'd like to ask you more about what you are building here, because I think that you have to remember this a cache, so eviction of items will happen over time. You'll need to store the entities in a lower database or system for persistence. Still the offer to talk more about your design and help you out with it is there :) can discuss here or we can email or have a call if that would help you!

ckaran commented 2 years ago

That's pretty much exactly what I'm trying to accomplish! Let me give a more complete outline of the kinds of problems I face regularly.

I'm a roboticist, and for my research I write a lot of simulations. Most of my simulations can be parallelized, in some cases to a ridiculous degree (e.g., every robot is doing its own thing independently of every other robot). This is great as I have access to a machine that is able to run 256 hardware threads. The issue is that the machine is a shared machine, and the job controller is horrible; you request a certain amount of runtime beforehand, and when that time is up, it kills your process. There's no signaling and no warning, and it may be a month or more before your job comes up to run again.

To get around that, I periodically take snapshots of my simulations and write them to disk, with the idea that I can then restore from the latest snapshot at a later date. This sort-of works, and would likely be improved by using a proper database like sled. Unfortunately, sled doesn't (yet) have a typed interface, which means that to compare keys to one another in unusual1 ways you need pay the cost of deserialization so that the comparison can be done correctly. What's more, I don't need to durably store every single update, I just need to take periodic snapshots of stuff that changed since the last time I wrote out a snapshot.

This is where something like ARCache can come in handy. I can have a bunch of threads operating on properly typed objects that are held in a collection of ARCache instances. While they are running, a background snapshot thread is also running, atomically reading everything in the collection of caches at the same time. It can collect everything that's mutated and write it all out to the backing database, whatever that may be. If the whole thing is killed by the job controller, I can then reload from the database as I know that every write to the collection of ARCache instances was written as a transaction, so the database must hold a consistent snapshot of the universe.

But there's more! With the large number of hardware threads I have available, I've started experimenting with tentative execution; that is, a thread may read the current state, work to create an update (which can be expensive to calculate), and only if the initial conditions that affect that particular update haven't changed is the execution committed to the current state. This can be remarkably effective for some use cases, and since ARCache supports this type of use directly, it makes it easy to implement.

About the only thing that I need to ensure is that items aren't evicted before I have a chance to write them out. Not sure how to handle that at the moment...

1 In my case one of my keys is time, represented as a rational number. As a human, we know how to correctly sort 1/2, 1/3, and 2/3, but if you turn those into binary strings and sort lexicographically as sled does, then you get the wrong sort order. I know that ARCache doesn't offer a sorted interface, and I'm not asking for it (after all, that's what BptreeMap is for), this was just an example of the kinds of issues I'm facing.

Firstyear commented 2 years ago

That's pretty much exactly what I'm trying to accomplish! Let me give a more complete outline of the kinds of problems I face regularly.

I'm a roboticist, and for my research I write a lot of simulations. Most of my simulations can be parallelized, in some cases to a ridiculous degree (e.g., every robot is doing its own thing independently of every other robot). This is great as I have access to a machine that is able to run 256 hardware threads. The issue is that the machine is a shared machine, and the job controller is horrible; you request a certain amount of runtime beforehand, and when that time is up, it kills your process. There's no signaling and no warning, and it may be a month or more before your job comes up to run again.

To get around that, I periodically take snapshots of my simulations and write them to disk, with the idea that I can then restore from the latest snapshot at a later date. This sort-of works, and would likely be improved by using a proper database like sled. Unfortunately, sled doesn't (yet) have a typed interface, which means that to compare keys to one another in unusual1 ways you need pay the cost of deserialization so that the comparison can be done correctly. What's more, I don't need to durably store every single update, I just need to take periodic snapshots of stuff that changed since the last time I wrote out a snapshot.

This is where something like ARCache can come in handy. I can have a bunch of threads operating on properly typed objects that are held in a collection of ARCache instances. While they are running, a background snapshot thread is also running, atomically reading everything in the collection of caches at the same time. It can collect everything that's mutated and write it all out to the backing database, whatever that may be. If the whole thing is killed by the job controller, I can then reload from the database as I know that every write to the collection of ARCache instances was written as a transaction, so the database must hold a consistent snapshot of the universe.

But there's more! With the large number of hardware threads I have available, I've started experimenting with tentative execution; that is, a thread may read the current state, work to create an update (which can be expensive to calculate), and only if the initial conditions that affect that particular update haven't changed is the execution committed to the current state. This can be remarkably effective for some use cases, and since ARCache supports this type of use directly, it makes it easy to implement.

About the only thing that I need to ensure is that items aren't evicted before I have a chance to write them out. Not sure how to handle that at the moment...

That one I can help with!

In ARCache when you are in a write transaction, you can mark items as "dirty" IE un-written. Before you commit() the transaction, there is an iterator over the dirty items ( https://docs.rs/concread/0.2.19/concread/arcache/struct.ARCacheWriteTxn.html#method.iter_mut_mark_clean ). You use this to find what you need to write out - in this case, I'd suggest you make your items either inside the ARCache ARC so that you can cheaply copy them, or something similar. Then what you do in in your dirty iter, you clone() your dirty changes into a "write back" struct / vec.

From there you can have a background job that periodically goes over the writeback queue, and writes each one down to disk. Think of this as an async WAL.

To visualise it a bit better ....

       ┌─┐                                                        
       │I│──1. Insert                                             
       └─┘          │                                             
                    │                                             
                    ▼                                             
             ┌────────────┐                                       
             │            │                                       
             │  ARCache   │                                       
             │            │                      ┌───────┐        
             │┌─┐┌─┐┌─┐┌─┐│──┬2. Iter Dirty───▶  │ TXN   │        
             ││I││I││I││I││  │                   │┌─┐┌─┐ │        
             │└─┘└─┘└─┘└─┘│  │                   ││I││I│ │        
             └────────────┘  │                   │└─┘└─┘ │        
                    ▲        │                   └───────┘        
                    │        │                       │            
                    └──4. commit                     │            
                                                3. Append         
                                                     │            
                                                     │            
                                                     ▼            

                 ┌───────────────────────────────────────────────┐
                 │ Queue ┌───────┐┌───────┐┌───────┐             │
                 │       │ TXN   ││ TXN   ││ TXN   │             │
    5.           │       │┌─┐┌─┐ ││┌─┐┌─┐ ││┌─┐┌─┐ │             │
Writeback ───────│       ││I││I│ │││I││I│ │││I││I│ │             │
  Thread         │       │└─┘└─┘ ││└─┘└─┘ ││└─┘└─┘ │             │
     │           │       └───────┘└───────┘└───────┘             │
     │           └───────────────────────────────────────────────┘
     │                                                            
     ▼                                                            
┌────────┐                                                        
│   DB   │                                                        
│        │                                                        
└────────┘                                                        

I think this kind of design is possibly what you are looking for :) it will ensure you aren't evicted before you commit to a write, and it will keep everything ordered.

1 In my case one of my keys is time, represented as a rational number. As a human, we know how to correctly sort 1/2, 1/3, and 2/3, but if you turn those into binary strings and sort lexicographically as sled does, then you get the wrong sort order. I know that ARCache doesn't offer a sorted interface, and I'm not asking for it (after all, that's what BptreeMap is for), this was just an example of the kinds of issues I'm facing.

Yes, we can't offer sorting here sorry :(. In this case, if the keys are small, you could keep those in a set that is sorted, or a btreemap from concread that uses the same transactions as arcache so many threads can be reading from the key-set at the same time.

ckaran commented 2 years ago

That one I can help with!

I did see that, but at the time I thought that dirty items might get evicted before they were marked clean. Not sure how I came to that conclusion though... :confused:

That said, while this method works it has a small drawback; if a particular subset of elements is changed by the various threads often, then you'll get a lot of transactions that you need to process. What I really want is a snapshot in time of the committed results, ignoring when something was overwritten multiple times in the past. And I think I may have a method of doing so, based on the concurrent B+ tree you described in https://github.com/kanidm/concread/blob/master/CACHE.md. I'm figuring this out on the fly in the hopes that we can collaboratively come up with a better data structure that fits all of the needs.

Every node in the concurrent B+ tree will get a new field, the version_number. This is a u64, that is a copy of a AtomicU64 (I'd prefer u128, but they keep adding them as experimental, and then removing them a short time later... :man_shrugging:), which is a global object that can be mutated as needed. When you want to mutate the tree, you first do an atomic fetch_add(1, Ordering::AcqRel) on the global version number to get a new version number which will be used as the version number of every node you create during this transaction. The rest of the addition is just like it goes in the normal concurrent B+ tree.

So what does this get you? You now can do a 'happens-before' ordering cheaply. Imagine that you took a snapshot at version 3 of the tree. A bunch of mutations were committed, and now we're up to version 5. You can now descend from the root of the tree and commit all nodes with version numbers in the range [4, 5]. Earlier version numbers were already committed, so can be ignored. If there are concurrent writes happening, you won't reach them from the current tree, but since they will have a later version number, they'll get picked up during the next snapshot. If the version 5 transaction mutated every node in the tree, then no node with version 4 or below is reachable, and will be silently ignored. There are some additional tricks that can also be done, but that's the gist of the idea. Do you think this would work? Can it be added to concread easily?

Firstyear commented 2 years ago

I did see that, but at the time I thought that dirty items might get evicted before they were marked clean. Not sure how I came to that conclusion though... 😕

Well, it is a cache, so you know, it does evict things ... it's an easy assumption to make. But in this case, the ARCache will NOT evict anything that was inserted or dirty in this write txn.

You now can do a 'happens-before' ordering cheaply.

We already actually have all of that version information inside the concread b+tree. We actually require it to know when we have to perform a COW operation. The only thing we'd need to do is expose that in an api for differencing between two points in time IE between read txn A and B, what changes? We can't do it per-key, but we can do it per-node at the least.

The main issue I see here is "when do you remove a value from the B+tree knowing it's now stable and on disk?". IE imagine we are diffing between two RO txns, but the concurrent B+tree is still writing and updates some key X, that would be removed between the two RO. How do we make sure to not remove this? There can only be one writer in concread, so a delete to say "yep we've written this down" by it's nature needs a write, which will prevent other writers etc. That's why the queue is a nice stop gap because it allows each side to choose "when do I acknowledge new changes" and "I can always add new changes".

A similar approach could be the same queue suggestion I made, but before you write down the TXN struct to disk, your writer thread aggregates all the changes and dedups them to a tree. Then when you write down to disk, you just write the current tree/map state out, clear it, then start dequeing changes again. This would let you batch transactions together to a single known state and solve a lot of the desire you have.

It's always worth considering, what's the performance or problem you are trying to solve/unblock here? What are the resources you have? How can you approach it in an async and concurrent manner? I feel like for your example, you have a huge amount of parallel readers of some nature, and they can "write back" their results to a queueu and then something in dequeueing that to the ARCache, and that content in the ARCache eventually needs to be made persistent. So in that model, where you have so many parallel readers, by unblocking them to do more work (due to concread/txns) you are still likely better off than previous mutex/rwlock approaches, so you can afford to have one housekeeping thread/task that does the dirty wrtie back/ queue management/dedup work. It's also worth keeping it "simple" too, because it's easier to debug that way! The queue option also is nice in a way because it's kind of keeping the 'same model' you want for on disk - snapshots of transaction/db state over time, and your queueu is just that on a fine grained level, and you can combine that together to aggregate txns in the housekeeper.

Anyway, if you still want the "diff" capability in the b+tree, I think it's easy/possible to add, but remember, itt can only tell you "something in this node changed" not "what in this node changed".

ckaran commented 2 years ago

Well, it is a cache, so you know, it does evict things ... it's an easy assumption to make. But in this case, the ARCache will NOT evict anything that was inserted or dirty in this write txn.

I appreciate your trying to give me a graceful way out of this, but to be honest, I have no excuse. I have an undergrad and masters in computer engineering; caches are kind of a big thing in chip design, so having my brain do this to me is embarrassing!

We already actually have all of that version information inside the concread b+tree. We actually require it to know when we have to perform a COW operation. The only thing we'd need to do is expose that in an api for differencing between two points in time IE between read txn A and B, what changes? We can't do it per-key, but we can do it per-node at the least.

A simple iterator API is all you really need. Something like the following will work:

for (key, value) in tree.iter_versions(3..) {
    // Do something smart to write out the key and value in an ACID compliant manner.
    // Iterator may return key/value pairs that have not actually been mutated, but are 
    // just a part of a node in the tree whose version number is at least 3.
}

As an end user, it is my job to a) ensure that the tree stays alive long enough to write out the snapshot durably (I don't drop the tree until after the database transaction returns), and b) keep track of which snapshots I've already written out.

End users could do several different actions, depending on their use cases:

I actually started playing around writing up some code for this yesterday. See below for a snippet of what I'm thinking about.

The main issue I see here is "when do you remove a value from the B+tree knowing it's now stable and on disk?".

Yeah, this is where things get to be interesting. I assumed that the nodes would be kind of like the following:

use once_cell::sync::OnceCell;
use std::{fmt::Debug, sync::Arc};

/// Every node is in exactly one of these states.
#[derive(Debug)]
enum ConcurrentBPlusEntryState<T>
where
    T: Debug,
{
    /// The entry is officially empty.  This is similar to `Option::None`, and
    /// is required to make sure that the entries in the `Interior` variant can
    /// all be set, even when there's nothing sensible to set them to.
    Empty,

    /// A leaf node will point to the contents of the tree.  Once this is set,
    /// it can never be modified.  **WARNING!** Under absolutely **NO**
    /// circumstances can the contents of this variant be mutated after they
    /// have been set!  Doing so will mutate other trees behind their backs!
    Leaf(T),

    /// A collection of nodes.  Some of these may be empty, which is a waste of
    /// space, but it makes lookups faster to let them be empty than to try to
    /// compress them and deal with everything that goes with that.  
    /// **WARNING!** Under absolutely **NO** circumstances can the contents of
    /// this variant be mutated after they have been set!  Doing so will
    /// mutate other trees behind their backs!
    Interior([ConcurrentBPlusEntry<T>; 16]),
}

/// The contents of a node.
///
/// **WARNING!** Under absolutely **NO** circumstances can the contents of this
/// struct be mutated after they have been set!  Doing so will mutate other
/// trees behind their backs!
#[derive(Debug)]
struct ConcurrentBPlusEntryInterior<T>
where
    T: Debug,
{
    /// This comes from the concurrent B+ tree's atomic transaction number.
    /// You can use this to form a 'happens-before' scope, making it easy to
    /// decide which nodes you have already snapshotted, and can therefore skip
    /// over.
    transaction_number:  u64,

    /// The state of this node.  See the docs for ConcurrentBPlusEntryState for
    /// what all of this means.
    contents: ConcurrentBPlusEntryState<T>,
}

/// An actual node in the tree.
///
/// Once a node has been created and set, it **cannot** be mutated.  Doing so
/// would change other trees behind their backs and cause much gnashing of 
/// teeth, wailing, etc., etc., etc.  Don't do that.
#[derive(Debug)]
pub(crate) struct ConcurrentBPlusEntry<T: Debug>(
    Arc<OnceCell<ConcurrentBPlusEntryInterior<T>>>,
);

N.B.: OnceCell

The only trees that stay alive are those whose roots are held by something; otherwise their reference counts go to zero and they disappear. Conversely, if you're doing a snapshot you just need to hold the root of the tree. It can't be mutated nor can it be dropped (Arc prevents that), but it is both Send and Sync, so there can be an arbitrary number of readers.

To mutate the tree, you start with a root (any root) and create new ConcurrentBPlusEntry nodes as you see fit. Eventually you will have a new root (and therefore a new tree) which you can share with others. This tree can be any kind of mutation that you want, up to and including a tree that is now empty. The old tree's nodes are unaffected, so if you have the root of the old tree you can still do a snapshot, etc. on it.

The advantage of this style is that you can have a forest of trees. You can also have multiple concurrent writers; each writer will be creating their own copy of a tree (making it fully persistent). Deleting a node will have no effect on a concurrent thread that is doing a snapshot. If you need to snapshot the deletions as well, you just need the root of the tree that had the snapshot in it.

The biggest issues I have at the moment are the following:

The former issue is that creating a new tree for each mutation is costly. I would rather have some other type of node that is internal to the crate that is neither Send nor Sync which can be freely mutated, which can then be converted to a tree of ConcurrentBPlusEntry when the changes are all committed. This should be relatively easy to do, it just needs to be done.

I think that the latter can be done using something like the following (banged out on the fly, there are likely errors):

use std::sync::atomic::{AtomicU64, Ordering};

pub struct Tree<T>
where
    T: Debug,
{
    transaction_number: AtomicU64,
    root: ConcurrentBPlusEntry<T>,
}

impl<T> Tree<T>
where
    T: Debug,
{
    pub(crate) fn try_update(
        &mut self,
        new_root: ConcurrentBPlusEntry<T>,
    ) -> Result<(), Error> {
        match self.transaction_number.compare_exchange_weak(
            new_root.transaction_number() - 1,
            new_root.transaction_number(),
            Ordering::SeqCst,
            Ordering::Relaxed,
        ) {
            Ok(_) => {
                self.root = new_root;
                Ok(())
            },
            Err(_) => {
                Err(Error::new("Can't set the root, try merging again"))
            },
        }
    }
}

This 'works' but it isn't as elegant as I'd like. Ideally, we could have a full forest of roots so that the trees can be truly confluently persistent, but I haven't yet figured out how to do that in a lock-free manner... Any thoughts?

Firstyear commented 2 years ago

Well, it is a cache, so you know, it does evict things ... it's an easy assumption to make. But in this case, the ARCache will NOT evict anything that was inserted or dirty in this write txn.

I appreciate your trying to give me a graceful way out of this, but to be honest, I have no excuse. I have an undergrad and masters in computer engineering; caches are kind of a big thing in chip design, so having my brain do this to me is embarrassing!

Just to call out this one statement, it doesn't matter. You can know so much, have as many degrees as you want, and we still make mistakes or whatever. There is no shame here, only more to learn. So stop being so harsh on yourself.

PS: CPU caches/chips have a different way to handle dirty/coherency that is just dark magic, and also very different behaviour in this case.

Firstyear commented 2 years ago

@ckaran Hey mate I emailed you about scheduling a call/chat so we can go through this a bit easier, let me know if you didn't get it :)

ckaran commented 2 years ago

Just checked, yes I got it... but like I said, I just checked because of your message here. Like I said earlier, I'm VERY slow in checking my emails. My family is used to texting me that they emailed me because of how slow I am...

ckaran commented 2 years ago

Also, I just discovered the arc-swap crate, which does atomic operations on Arcs. That solves the confluently persistent part, making it possible to have something like git's HEAD pointer. I have some other work I have to catch up on in the next few days, but I'd like to bounce more ideas off of you, if that's OK with you. I think some of these could be used within concread.

Firstyear commented 2 years ago

Sounds good. Email me about some of your ideas, it's easier there I think :)