restatedev / restate

Restate is the platform for building resilient applications that tolerate all infrastructure faults w/o the need for a PhD.
https://docs.restate.dev
Other
1.66k stars 38 forks source link

object store metastore #2309

Open igalshilman opened 1 week ago

igalshilman commented 1 week ago

Object store backed Metadata store

This PR introduces a new type of a metastore that uses S3 (any similar) object stores that support conditional updates.

end result

running the following:

export AWS_REGION=eu-central-1
export AWS_ACCESS_KEY_ID=
export AWS_SECRET_ACCESS_KEY=
export AWS_SESSION_TOKEN="

export AWS_ENDPOINT=https://s3.eu-central-1.amazonaws.com

export AWS_PROVIDER_NAME=Static
export RESTATE_METADATA_STORE_CLIENT__TYPE="object-store"
export RESTATE_METADATA_STORE_CLIENT__BUCKET="XXXXXXXX"
export RESTATE_METADATA_STORE_CLIENT__CREDENTIALS__TYPE="aws-env"

cargo run --example three_nodes_and_metadata -- --nocapture

starts a 3 node cluster that uses an s3 bucket for a metadata storage:

[nix-shell:~/work/restate]$ ./source.s3.sh 
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.73s
     Running `target/debug/examples/three_nodes_and_metadata --nocapture`
2024-11-27T14:38:34.545666Z  INFO restate_local_cluster_runner::cluster: Starting cluster test-cluster in /home/igal/work/restate/restate-data
2024-11-27T14:38:34.549226Z  INFO restate_local_cluster_runner::node: Started node metadata-node in /home/igal/work/restate/restate-data/metadata-node (pid 233735)
2024-11-27T14:38:35.825120Z  INFO three_nodes_and_metadata: admin node started: 2024-11-27T14:38:35.824212Z  INFO restate_node: Restate server is ready
2024-11-27T14:38:35.984871Z  INFO restate_local_cluster_runner::node: Node metadata-node admin check is healthy after 6 attempts
2024-11-27T14:38:35.989555Z  INFO restate_local_cluster_runner::node: Started node node-1 in /home/igal/work/restate/restate-data/node-1 (pid 233916)
2024-11-27T14:38:35.994266Z  INFO restate_local_cluster_runner::node: Started node node-2 in /home/igal/work/restate/restate-data/node-2 (pid 233917)
2024-11-27T14:38:35.998782Z  INFO restate_local_cluster_runner::node: Started node node-3 in /home/igal/work/restate/restate-data/node-3 (pid 233918)

listing the bucket yields the possible keys

[nix-shell:~]$ aws s3 ls s3://XXXX
2024-11-27 15:38:40       1221 bifrost_config
2024-11-27 15:38:38        950 nodes_config
2024-11-27 15:35:54        385 partition_table
2024-11-27 15:38:41        112 pp_epoch_0
2024-11-27 15:38:41        112 pp_epoch_1
2024-11-27 15:38:41        112 pp_epoch_2
2024-11-27 15:38:41        112 pp_epoch_3
2024-11-27 15:38:40        519 scheduling_plan

Additional configuration keys

[metadata-store-client]    
type = "object-store"    
bucket = "restate-metadata"    

[metadata-store-client.credentials]    
type = "aws-env"  

How does this work?

This is basically a client, each node runs an instance of this client, and the clients are using S3's optimistic concurrency control to move forward the different keys.

currently the only support cred type are AWS_X env variables, a followup referenced here in the conversation would be to plug in additional cred providers (unify the work done by @pcholakov )

tillrohrmann commented 6 days ago

I think we might have a better path to implement this, using Object Versioning. A quick search shows that this is supported across MinIO / Azure / GCP, here's the S3 documentation: https://docs.aws.amazon.com/AmazonS3/latest/userguide/Versioning.html.

Here's a sketch of the conditional PUT path:

async fn put(
    &self,
    key: ByteString,
    value: VersionedValue,
    precondition: Precondition,
) -> Result<(), WriteError> {
    let key = object_store::path::Path::from(key.to_string());

    match precondition {
        Precondition::MatchesVersion(version) => {
            // If we have a cached version, we can also add an if-modified-since condition to the GET.
            // Unfortunately we can't set the version explicitly, and a HEAD request doesn't return enough
            // information to determine our own metadata value version.
            let get_result = self.object_store.get(&key).await.map_err(|e| {
                WriteError::Internal(format!("Failed to check precondition: {}", e))
            })?;

            if extract_object_version(&get_result.payload) != version {
                return Err(WriteError::FailedPrecondition(
                    "Version mismatch".to_string(),
                ));
            }

            self.object_store
                .put_opts(
                    &key,
                    PutPayload::from_bytes(serialize_versioned_value(value)),
                    PutOptions::from(PutMode::Update(UpdateVersion::from(&get_result))),
                )
                .await
                .map_err(|e| WriteError::Internal(format!("Failed to update value: {}", e)))?;

            Ok(())
        }
        _ => todo!(),
    }
}

Unfortunately we don't control the versions; in S3 they are a monotonic numbered sequence but we don't get to set them directly - rather, S3 will set them for us. Our own API relies on explicit versions so I'm assuming we'll just serialize them along with the rest of the payload as the object body.

A normal GET request implicitly returns the latest version, no further tricks required. It's also possible to request a particular previous version, but that's not needed for our API. Might be useful for troubleshooting though! (And I'd seriously consider serializing the values as JSON for operations friendliness.)

The only requirement for this all to work is that we must use a bucket with object versioning enabled, which is not the default but is trivial to set. S3 and other stores also support automatic cleanup of old versions using object lifecycle policies, so we don't need to implement that ourselves.

Wouldn't the PutMode::Update require an If-Match header support from S3? I thought that it currently only supports If-None-Match natively and for PutMode::Update one would have to use DynamoDB? Or is this different when using versioned buckets?

pcholakov commented 4 days ago

@tillrohrmann you are right, I was wrong; this should work on Minio and Cloudflare R2, but not with S3. I got excited when I saw how elegant the object versioning API looked but it requires If-Match support by the underlying object store, which is a no-go.

ScmTble commented 3 days ago

@tillrohrmann you are right, I was wrong; this should work on Minio and Cloudflare R2, but not with S3. I got excited when I saw how elegant the object versioning API looked but it requires If-Match support by the underlying object store, which is a no-go.

https://aws.amazon.com/about-aws/whats-new/2024/11/amazon-s3-functionality-conditional-writes/ It seems support has been added

tillrohrmann commented 3 days ago

https://aws.amazon.com/about-aws/whats-new/2024/11/amazon-s3-functionality-conditional-writes/ It seems support has been added

Nice, this was quick 😄

pcholakov commented 2 days ago

Finally! 🎉😆