delta-io / delta-rs

A native Rust library for Delta Lake, with bindings into Python
https://delta-io.github.io/delta-rs/
Apache License 2.0
1.98k stars 364 forks source link

Unable to write to minio using python's write_deltalake #1250

Closed emanueledomingo closed 1 year ago

emanueledomingo commented 1 year ago

Environment

Delta-rs version: 1.64

Binding: Python

Environment: Python==3.10, deltalake==0.8.1, pyarrow==11.0.0


Bug

What happened:

thread '<unnamed>' panicked at 'not stream', [/root/.cargo/registry/src/github.com-1ecc6299db9ec823/object_store-0.5.5/src/aws/credential.rs:175:14](https://file+.vscode-resource.vscode-cdn.net/root/.cargo/registry/src/github.com-1ecc6299db9ec823/object_store-0.5.5/src/aws/credential.rs:175:14)
---------------------------------------------------------------------------
PanicException                            Traceback (most recent call last)
Cell In[6], line 6
      3 pydict = {'n_legs': n_legs, 'animals': animals}
      4 animals = pa.Table.from_pydict(pydict)
----> 6 write_deltalake(
      7     's3://test-bucket/animals',
      8     animals,
      9     storage_options={
     10         "AWS_ACCESS_KEY_ID": "TESTACCESSKEY12345",
     11         "AWS_SECRET_ACCESS_KEY":"ABCSECRETKEY",
     12         "AWS_REGION": "anctartica",
     13         "ENDPOINT_URL": "localhost:9000"
     14     }
     15 )

in write_deltalake(table_or_uri, data, schema, partition_by, filesystem, mode, file_options, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group, name, description, configuration, overwrite_schema, storage_options, partition_filters)
    142     else:
    143         data, schema = delta_arrow_schema_from_pandas(data)
--> 145 table, table_uri = try_get_table_and_table_uri(table_or_uri, storage_options)
    147 if schema is None:
    148     if isinstance(data, RecordBatchReader):

in try_get_table_and_table_uri(table_or_uri, storage_options)
    378     raise ValueError("table_or_uri must be a str, Path or DeltaTable")
    380 if isinstance(table_or_uri, (str, Path)):
--> 381     table = try_get_deltatable(table_or_uri, storage_options)
    382     table_uri = str(table_or_uri)
    383 else:

in try_get_deltatable(table_uri, storage_options)
    390 def try_get_deltatable(
    391     table_uri: Union[str, Path], storage_options: Optional[Dict[str, str]]
    392 ) -> Optional[DeltaTable]:
    393     try:
--> 394         return DeltaTable(table_uri, storage_options=storage_options)
    395     except PyDeltaTableError as err:
    396         # TODO: There has got to be a better way...
    397         if "Not a Delta table" in str(err):

in DeltaTable.__init__(self, table_uri, version, storage_options, without_files)
    109 """
    110 Create the Delta Table from a path with an optional version.
    111 Multiple StorageBackends are currently supported: AWS S3, Azure Data Lake Storage Gen2, Google Cloud Storage (GCS) and local URI.
   (...)
    119                       DeltaTable will be loaded with a significant memory reduction.
    120 """
    121 self._storage_options = storage_options
--> 122 self._table = RawDeltaTable(
    123     str(table_uri),
    124     version=version,
    125     storage_options=storage_options,
    126     without_files=without_files,
    127 )
    128 self._metadata = Metadata(self._table)

PanicException: not stream

What you expected to happen: table successfully written.

How to reproduce it:

from deltalake.writer import write_deltalake
import pyarrow as pa

n_legs = pa.array([2, 4, 5, 100])
animals = pa.array(["Flamingo", "Horse", "Brittle stars", "Centipede"])
pydict = {'n_legs': n_legs, 'animals': animals}
animals = pa.Table.from_pydict(pydict)

write_deltalake(
    's3://test-bucket/animals',
    animals,
    storage_options={
        "AWS_ACCESS_KEY_ID": "TESTACCESSKEY12345",
        "AWS_SECRET_ACCESS_KEY":"ABCSECRETKEY",
        "AWS_REGION": "anctartica",
        "ENDPOINT_URL": "localhost:9000"
    }
)
rtyler commented 1 year ago

I have done writes to Minio from Rust before, can you try setting AWS_ENDPOINT_URL rather than ENDPOINT_URL?

emanueledomingo commented 1 year ago

Tried with

storage_options={
    "AWS_ACCESS_KEY_ID": "TESTACCESSKEY12345",
    "AWS_SECRET_ACCESS_KEY":"ABCSECRETKEY",
    "AWS_REGION": "anctartica",
    "AWS_ENDPOINT_URL": "localhost:9000"
}

Same result: thread '<unnamed>' panicked at 'not stream', https://file+.vscode-resource.vscode-cdn.net/root/.cargo/registry/src/github.com-1ecc6299db9ec823/object_store-0.5.5/src/aws/credential.rs:175:14

mrjoe7 commented 1 year ago

Assuming standard localstack tcp port 4566 the code should be:

write_deltalake(
    's3://test-bucket/animals',
    animals,
    storage_options={
        "AWS_ACCESS_KEY_ID": "TESTACCESSKEY12345",
        "AWS_SECRET_ACCESS_KEY":"ABCSECRETKEY",
        "AWS_REGION": "us-east-1",
        "AWS_ENDPOINT_URL": "http://localhost:4566",
        "AWS_STORAGE_ALLOW_HTTP": "TRUE"
    }
)

Also, do not set region to a non-valid value (in your case anctartica) or you will run into the folloowing error:

 INFO --- [   asgi_gw_8] localstack.request.aws     : AWS s3.GetObject => 500 (InternalError)
ERROR --- [   asgi_gw_9] l.aws.handlers.logging     : exception during call chain: 'anctartica' is not a valid AWS region name for s3
emanueledomingo commented 1 year ago

Using AWS_STORAGE_ALLOW_HTTP works. Thanks for claryfing.

adriangb commented 2 months ago

Could we add AWS_STORAGE_ALLOW_HTTP to the documentation?

ion-elgreco commented 2 months ago

@adriangb feel free to make a PR