delta-io / delta-rs

A native Rust library for Delta Lake, with bindings into Python
https://delta-io.github.io/delta-rs/
Apache License 2.0
2.21k stars 394 forks source link

Document how to configure dynamodb lock client #1091

Open wjones127 opened 1 year ago

wjones127 commented 1 year ago

Description

Although we have an error message telling users to configure the Lock client if they want concurrent writes with S3, we don't have any documentation on how to do that. We should also provide general advice on concurrency, like not mixing different connectors in concurrent writers.

See conversation: https://delta-users.slack.com/archives/C013LCAEB98/p1674435354811639

Use Case

Related Issue(s)

We probably shouldn't do this until we improve the conflict resolution, though. https://github.com/delta-io/delta-rs/issues/593

wjones127 commented 1 year ago

@MrPowers this would probably be a good thing to blog about once the conflict resolution is improved. Concurrent writes is definitely something you can't do with plain Parquet tables. 😉

LucaSoato commented 1 year ago

Let me know if I can help you in this, we'll need this feature. 🙂

MrPowers commented 1 year ago

@wjones127 - feel free to assign me to this issue. I will be happy to create the docs when #593 is finished.

hongbo-miao commented 1 year ago

Hi folks, is it possible to have a draft document first so that everyone can start to try and provide feedback? Or just wonder if there is already a guide somewhere else? Thanks! 😃

yuhanz commented 10 months ago

I'm looking for the documentation on how to setup the LockClient in Python as well.

yuhanz commented 10 months ago

In crates/deltalake-core/src/test_utils.rs, seems like it just need to setup an environment variable to point to a DynamoDB table by DYNAMO_LOCK_TABLE_NAME:

set_env_if_not_set(s3_storage_options::AWS_ACCESS_KEY_ID, "deltalake");
set_env_if_not_set(s3_storage_options::AWS_SECRET_ACCESS_KEY, "weloverust");
set_env_if_not_set("AWS_DEFAULT_REGION", "us-east-1");
set_env_if_not_set(s3_storage_options::AWS_REGION, "us-east-1");
set_env_if_not_set(s3_storage_options::AWS_S3_LOCKING_PROVIDER, "dynamodb");
set_env_if_not_set("DYNAMO_LOCK_TABLE_NAME", "test_table");
set_env_if_not_set("DYNAMO_LOCK_REFRESH_PERIOD_MILLIS", "100");
set_env_if_not_set("DYNAMO_LOCK_ADDITIONAL_TIME_TO_WAIT_MILLIS", "100");

In a different project, it documented the table schema of the dynamodb table: https://github.com/delta-io/kafka-delta-ingest#writing-to-s3

aws dynamodb create-table --table-name delta_rs_lock_table \
    --attribute-definitions \
        AttributeName=key,AttributeType=S \
    --key-schema \
        AttributeName=key,KeyType=HASH \
    --provisioned-throughput \
        ReadCapacityUnits=10,WriteCapacityUnits=10

(The same schema is documented in python/deltalake/writer.py as well)

However, the python documentation python/docs/source/usage.rst explicitly says to specify the options in storage_options . So the environment variable may not be required. I am going to give this one a try.

    >>> from deltalake import write_deltalake
    >>> df = pd.DataFrame({'x': [1, 2, 3]})
    >>> storage_options = {'AWS_S3_LOCKING_PROVIDER': 'dynamodb', 'DYNAMO_LOCK_TABLE_NAME': 'custom_table_name'}
    >>> write_deltalake('s3://path/to/table', df, 'storage_options'= storage_options)
danielgafni commented 8 months ago

@yuhanz hey, did you find the correct solution for Python?

Edit: this worked with deltalake 0.15.1

yuhanz commented 8 months ago

@danielgafni : I went with storage_options, and it worked well with deltalake 0.13.0.

storage_options = {
    "AWS_DEFAULT_REGION": "us-east-1",
    "AWS_ACCESS_KEY_ID": AWS_ACCESS_KEY_ID,
    "AWS_SECRET_ACCESS_KEY": AWS_SECRET_ACCESS_KEY,
    # "AWS_S3_ALLOW_UNSAFE_RENAME": "true",
    'AWS_S3_LOCKING_PROVIDER': 'dynamodb',
    'DYNAMO_LOCK_TABLE_NAME': 'MyLockTable',
}
danielgafni commented 8 months ago

Thanks. I'm on 0.15.1. Just setting the environment variable "AWS_S3_LOCKING_PROVIDER" worked for me (with the default "delta_log" table name).

ale-rinaldi commented 8 months ago

I think it's also worth documenting the required permissions to work on a deltalake stored on AWS S3.

In my case, I needed:

MusKaya commented 6 months ago

@wjones127 when using an S3 compatible storage (other than AWS S3), one might have a set of access and secret key for the storage and another set for the DynamoDB. In this case, how to provide these two pairs (of access and secret keys) separately so one is used for storage and the other for DynamoDB?

ion-elgreco commented 6 months ago

@ale-rinaldi would you mind adding this info to our docs?

MusKaya commented 6 months ago

@ale-rinaldi would you mind adding this info to our docs?

@ion-elgreco you are not referring to this right? Right now we have a real use case for what I have described above (using different credentials for s3 and dynamodb) and I created #2287 for it. If it is already supported it would be great to have the documentation clarify it. Otherwise we need to accommodate separate set of credentials for dynamodb to unblock uncoupling dynamodb from s3.

ale-rinaldi commented 6 months ago

@ion-elgreco of course! I opened https://github.com/delta-io/delta-rs/pull/2393

kwodzicki commented 4 months ago

Experiencing some issues that may be related to this.

I set up a DynamoDB table using the following command:

aws dynamodb create-table \
    --table-name delta_rs_lock_table \
    --attribute-definitions AttributeName=key,AttributeType=S \
    --key-schema AttributeName=key,KeyType=HASH \
    --billing-mode PAY_PER_REQUEST

And running following example

import boto3
import pandas as pd

from deltalake import DeltaTable
from deltalake import writer

credentials = boto3.Session().get_credentials().get_frozen_credentials()

storage_options = {
    "AWS_ACCESS_KEY_ID": credentials.access_key,
    "AWS_SECRET_ACCESS_KEY": credentials.secret_key,
    "AWS_SESSION_TOKEN": credentials.token,
    "AWS_REGION": "us-east-1",
    "AWS_S3_LOCKING_PROVIDER": "dynamodb",
    "DYNAMO_LOCK_PARTITION_KEY_VALUE": "key",
    "DYNAMO_LOCK_TABLE_NAME": "delta_rs_lock_table",
}

df = pd.DataFrame(
    {"x": [1, 2, 3]},
)

output = f"s3://{bucket}/some_delta_lake"
writer.write_deltalake(output, df, storage_options=storage_options)

I receive the following error when running...

[2024-06-03T16:02:48Z ERROR deltalake_aws::logstore] dynamodb client failed to write log entry: GenericDynamoDb { source: Unhandled(Unhandled { source: ErrorMetadata { code: Some("ValidationException"), message: Some("One or more parameter values were invalid: Missing the key key in the item"), extras: Some({"aws_request_id": "******"}) }, meta: ErrorMetadata { code: Some("ValidationException"), message: Some("One or more parameter values were invalid: Missing the key key in the item"), extras: Some({"aws_request_id": "******"}) } }) }

Looking at the policies assigned to my AWS account, it seems that I have all the permissions/policies that have been discussed above.

Not sure what I am missing.

dhirschfeld commented 4 months ago

In the published documentation they specify the create-table command as:

aws dynamodb create-table \
    --table-name delta_log \
    --attribute-definitions AttributeName=tablePath,AttributeType=S AttributeName=fileName,AttributeType=S \
    --key-schema AttributeName=tablePath,KeyType=HASH AttributeName=fileName,KeyType=RANGE \
    --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5
kwodzicki commented 3 months ago

Thank you @dhirschfeld, this solved my issue.