apache / iceberg-rust

Apache Iceberg
https://rust.iceberg.apache.org/
Apache License 2.0
585 stars 133 forks source link

Scan does not work as expected #495

Closed ndrluis closed 3 weeks ago

ndrluis commented 1 month ago

I'm testing using the iceberg rest image from Tabular as a catalog.

Here's the docker-compose.yml file:

version: '3.8'

services:
  rest:
    image: tabulario/iceberg-rest:0.10.0
    environment:
      - AWS_ACCESS_KEY_ID=admin
      - AWS_SECRET_ACCESS_KEY=password
      - AWS_REGION=us-east-1
      - CATALOG_WAREHOUSE=s3://warehouse/
      - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
      - CATALOG_S3_ENDPOINT=http://minio:9000
    depends_on:
      - minio
    ports:
      - "8181:8181"
    networks:
      iceberg_net:

  minio:
    image: minio/minio:RELEASE.2024-03-07T00-43-48Z
    environment:
      - MINIO_ROOT_USER=admin
      - MINIO_ROOT_PASSWORD=password
      - MINIO_DOMAIN=minio
    networks:
      iceberg_net:
        aliases:
          - warehouse.minio
    expose:
      - 9001
      - 9000
    ports:
      - "9000:9000"
      - "9001:9001"
    command: [ "server", "/data", "--console-address", ":9001" ]

  mc:
    depends_on:
      - minio
    image: minio/mc:RELEASE.2024-03-07T00-31-49Z
    environment:
      - AWS_ACCESS_KEY_ID=admin
      - AWS_SECRET_ACCESS_KEY=password
      - AWS_REGION=us-east-1
    entrypoint: >
      /bin/sh -c "
        until (/usr/bin/mc config host add minio http://minio:9000 admin password) do
          echo '...waiting...' && sleep 1;
        done;
        /usr/bin/mc mb minio/warehouse;
        /usr/bin/mc policy set public minio/warehouse;
        tail -f /dev/null
      "
    networks:
      iceberg_net:

networks:
  iceberg_net:

I created some data with PyIceberg:

from pyiceberg.catalog import load_catalog
import pyarrow as pa
from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, StringType, DoubleType

catalog = load_catalog(
    "demo",
    **{
        "type": "rest",
        "uri": "http://localhost:8181",
        "s3.endpoint": "http://localhost:9000",
        "s3.access-key-id": "admin",
        "s3.secret-access-key": "password",
        "warehouse": "demo",
    },
)

catalog.create_namespace_if_not_exists("default")

schema = Schema(
    NestedField(1, "city", StringType(), required=False),
    NestedField(2, "lat", DoubleType(), required=False),
    NestedField(3, "long", DoubleType(), required=False),
)

tbl = catalog.create_table_if_not_exists("default.cities", schema=schema)

df = pa.Table.from_pylist(
    [
        {"city": "Amsterdam", "lat": 52.371807, "long": 4.896029},
        {"city": "San Francisco", "lat": 37.773972, "long": -122.431297},
        {"city": "Drachten", "lat": 53.11254, "long": 6.0989},
        {"city": "Paris", "lat": 48.864716, "long": 2.349014},
    ],
)

tbl.append(df)

And queried with PyIceberg to verify if it's okay:

from pyiceberg.catalog import load_catalog
from pyiceberg.table import Table

catalog = load_catalog(
    "demo",
    **{
        "type": "rest",
        "uri": "http://localhost:8181/",
        "s3.endpoint": "http://localhost:9000",
        "s3.access-key-id": "admin",
        "s3.secret-access-key": "password",
        "warehouse": "demo",
    },
)

tbl: Table = catalog.load_table("default.cities")

res = tbl.scan().to_arrow()

print(len(res))

It returns 4.

And then with the Rust implementation:

use std::collections::HashMap;

use futures::TryStreamExt;
use iceberg::{
    io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY},
    Catalog, TableIdent,
};
use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig};

#[tokio::main]
async fn main() {
    // Create catalog
    let config = RestCatalogConfig::builder()
        .uri("http://localhost:8181".to_string())
        .warehouse("demo".to_string())
        .props(HashMap::from([
            (S3_ENDPOINT.to_string(), "http://localhost:9000".to_string()),
            (S3_ACCESS_KEY_ID.to_string(), "admin".to_string()),
            (S3_SECRET_ACCESS_KEY.to_string(), "password".to_string()),
            (S3_REGION.to_string(), "us-east-1".to_string()),
        ]))
        .build();

    let catalog = RestCatalog::new(config);

    let table = catalog
        .load_table(&TableIdent::from_strs(["default", "cities"]).unwrap())
        .await
        .unwrap();

    let scan = table.scan().select_all().build().unwrap();
    let batch_stream = scan.to_arrow().await.unwrap();

    dbg!(scan);

    let batches: Vec<_> = batch_stream.try_collect().await.unwrap();

    dbg!(batches.len());
}

Its returning nothing.

We have to define the S3 configurations because the Tabular image does not return the S3 credentials during the get config process.

ndrluis commented 1 month ago

I performed another test using the Tabular catalog, attempting to scan the sandbox warehouse in the examples namespace, specifically targeting the nyc_taxi_yellow table, but it returned no results.

ndrluis commented 1 month ago

I found the problem. I don’t know how to solve it, but I will try.

The while let Some(Ok(task)) = tasks.next().await statement is hiding some errors. In my previous attempt, I was trying to run it without the S3 credentials and was not receiving the access denied error. This happens because tasks.next() returns the error but does not expose it to the user.

While testing with Tabular, I'm receiving a 403 error from S3. So, we have two issues to solve.

One is to expose the reading errors to the user, and the other is to understand why we are getting these access denied errors.

ndrluis commented 1 month ago

For the Tabular example, I encountered an 'access denied' problem. The FileIO does not work with remote signing. For the MinIO example, the problem was solved when I added a match statement to return the error while tasks.next().

ndrluis commented 1 month ago

To scan with remote-signing we need to implement this

https://github.com/apache/iceberg/blob/main/aws/src/main/java/org/apache/iceberg/aws/s3/signer/S3V4RestSignerClient.java

Xuanwo commented 1 month ago

Hi, does remote signing means presign in s3?

Xuanwo commented 1 month ago

I'm guessing https://github.com/apache/iceberg-rust/pull/498 should close this issue. Would you like to verify it?

ndrluis commented 1 month ago

@Xuanwo

Hi, does remote signing means presign in s3?

Yes and no. I'm not sure if this is the flow, because I haven't found any documentation; this is based on my understanding from reading the Python implementation.

It's a presign process, but it's not the client's responsibility to presign. The get config will return the s3.signer.uri, and the load table will return s3.remote-signing-enabled as true along with some other S3 configurations. With that, we need to "presign" using the token returned in the load table.

This is the specification for the server responsible for the signing: s3-signer-open-api.yaml

I'm guessing https://github.com/apache/iceberg-rust/pull/498 should close this issue. Would you like to verify it?

I'm not comfortable closing this issue without a regression test that guarantees the expected behavior.

liurenjie1024 commented 1 month ago

I'm not comfortable closing this issue without a regression test that guarantees the expected behavior.

+1 on this. Currently we don't have regression tests on the whole reading progress, which involves integrating with external systems such as spark.

Xuanwo commented 1 month ago

It's a presign process, but it's not the client's responsibility to presign. The get config will return the s3.signer.uri, and the load table will return s3.remote-signing-enabled as true along with some other S3 configurations. With that, we need to "presign" using the token returned in the load table.

Got it. So, we need to support presign in the REST catalog. Could you help by creating an issue for this? I'll review this section and draft a plan for its implementation.

Currently we don't have regression tests on the whole reading progress, which involves integrating with external systems such as spark.

I think we can start with very basic tests like just scan the whole table.

liurenjie1024 commented 1 month ago

I think we can start with very basic tests like just scan the whole table.

The reason I didn't start this yet is that I want to do it after integration with datafusion. Me and @ZENOTME did integration tests in icelake before, and I have to say that without sql engine support, it's painful to maintain those tests.

Xuanwo commented 1 month ago

Me and @ZENOTME did integration tests in icelake before, and I have to say that without sql engine support, it's painful to maintain those tests.

I agree that we need a SQL engine to make testing easier.

However, maintaining basic unit tests based on fs or memory should be straightforward, right? We don't need separate test modules; just implement them as unit tests in the REST catalog. For example, it could be as simple as...

// catalog / file io setup, balbalba
let table = balabala();

let scan = table.scan().select_all().build().unwrap();
let batch_stream = scan.to_arrow().await.unwrap();

dbg!(scan);

let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
ndrluis commented 1 month ago

Got it. So, we need to support presign in the REST catalog. Could you help by creating an issue for this? I'll review this section and draft a plan for its implementation.

Issue #504 created

liurenjie1024 commented 1 month ago

Me and @ZENOTME did integration tests in icelake before, and I have to say that without sql engine support, it's painful to maintain those tests.

I agree that we need a SQL engine to make testing easier.

However, maintaining basic unit tests based on fs or memory should be straightforward, right? We don't need separate test modules; just implement them as unit tests in the REST catalog. For example, it could be as simple as...

// catalog / file io setup, balbalba
let table = balabala();

let scan = table.scan().select_all().build().unwrap();
let batch_stream = scan.to_arrow().await.unwrap();

dbg!(scan);

let batches: Vec<_> = batch_stream.try_collect().await.unwrap();

Correctly writing data into iceberg is not supported yet, so we need external systems such as spark to ingest data. Putting pre generated parquet files maybe an approach, but that requires maintaining binaries in repo.

sdd commented 1 month ago

Correctly writing data into iceberg is not supported yet, so we need external systems such as spark to ingest data. Putting pre generated parquet files maybe an approach, but that requires maintaining binaries in repo.

I've got some code in the perf testing branch that might help. It downloads NYC taxi data, and uses minio, the rest catalog and a spark container to create a table and insert NYC taxi data into it.

https://github.com/apache/iceberg-rust/pull/497

sdd commented 1 month ago

I have fixed the issue where errors were not returned to the user, in https://github.com/apache/iceberg-rust/pull/535

Xuanwo commented 3 weeks ago

I believe this should have been fixed. Please feel free to open new issues if still exists.