apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.44k stars 2.22k forks source link

`org.apache.iceberg.exceptions.NoSuchTableException` / Flink / Rest Catalog #10821

Closed Al-Moatasem closed 3 months ago

Al-Moatasem commented 3 months ago

Apache Iceberg version

1.6.0 (latest release)

Query engine

Flink

Please describe the bug 🐞

Hello,

I am trying to write data from PyFlink/SQL API to Iceberg / Minio, the PyFlink script can create the metadata file metadata/metadata.json successfully, however, the data files are not created. The exception I got org.apache.iceberg.exceptions.NoSuchTableException: Table does not exist

The setup on my end Docker Compose

version: '3'
services:
    rest:
    image: tabulario/iceberg-rest:1.5.0
    container_name: iceberg-rest
    ports:
        - 8181:8181
    environment:
        - AWS_ACCESS_KEY_ID=admin
        - AWS_SECRET_ACCESS_KEY=password
        - AWS_REGION=us-east-1
        - CATALOG_WAREHOUSE=s3://warehouse/
        - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
        - CATALOG_S3_ENDPOINT=http://minio:9000
    networks:
        flink_iceberg:

    minio:
    image: minio/minio:RELEASE.2024-05-10T01-41-38Z
    container_name: minio
    environment:
        - MINIO_ROOT_USER=admin
        - MINIO_ROOT_PASSWORD=password
        - MINIO_DOMAIN=minio
    ports:
        - 9001:9001
        - 9000:9000
    command: [ "server", "/data", "--console-address", ":9001" ]
    networks:
        flink_iceberg:
        aliases:
            - warehouse.minio 

    mc:
    depends_on:
        - minio
    image: minio/mc:RELEASE.2024-05-09T17-04-24Z
    container_name: mc
    environment:
        - AWS_ACCESS_KEY_ID=admin
        - AWS_SECRET_ACCESS_KEY=password
        - AWS_REGION=us-east-1
    entrypoint: |
        /bin/sh -c "
        until (/usr/bin/mc config host add minio http://minio:9000 admin password)
        do
            echo '...waiting...' && sleep 1;
        done;
        /usr/bin/mc rm -r --force minio/warehouse;
        /usr/bin/mc mb minio/warehouse;
        /usr/bin/mc policy set public minio/warehouse;
        tail -f /dev/null
        "

    networks:
        flink_iceberg:

networks:
    flink_iceberg:

PyFlink Code

# requirements.txt
apache-flink==1.19.1
pyiceberg[pyarrow,duckdb,pandas,sql-sqlite]==0.6.1
from pyflink.table import EnvironmentSettings, TableEnvironment

table_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(environment_settings=table_settings)

t_env.get_config().set("parallelism.default", "1")

table_name = "admin_users"

t_env.execute_sql(
    """
    CREATE CATALOG rest_catalog
    WITH (
        'type'                  = 'iceberg',
        'catalog-impl'            = 'org.apache.iceberg.rest.RESTCatalog',
        'warehouse'             = 's3://warehouse',
        'uri'                   = 'http://localhost:8181/'
    )
    """
)

t_env.execute_sql(stmt=f"CREATE DATABASE IF NOT EXISTS rest_catalog.db;")
t_env.execute_sql(stmt=f"USE rest_catalog.db;")

t_env.execute_sql(stmt=f"DROP TABLE IF EXISTS {table_name};")

t_env.execute_sql(
    f"""
    CREATE TABLE IF NOT EXISTS {table_name}
    (
        user_id STRING,
        username STRING,
        email STRING,
        city STRING,
        age INT
    ) WITH (
        'warehouse'              = 's3://warehouse',
        'io-impl'                = 'org.apache.iceberg.aws.s3.S3FileIO',
        's3.endpoint'            = 'http://localhost:9000',
        's3.access-key-id'       = 'admin',
        's3.secret-access-key'   = 'password',
        's3.region'              = 'us-east-1'
    )
    ;
    """
)

t_env.execute_sql(
    f"""
    INSERT INTO {table_name} ( user_id, username, email, city, age )
    VALUES
        ('123', 'A', 'a@example.com', 'C1', 43),
        ('456', 'B', 'b.example.com', 'C2', 25),
        ('789', 'C', 'c.example.com', 'C3', 71)
    ;
    """
)

t_env.execute_sql("SHOW CATALOGS").print()
t_env.execute_sql("SHOW DATABASES").print()
t_env.execute_sql("SHOW TABLES").print()
t_env.execute_sql(f"DESCRIBE {table_name}").print()

The output

WARNING: All log messages before absl::InitializeLog() is called are written to STDERR
I0000 00:00:1722374060.892351   13672 config.cc:230] gRPC experiments enabled: call_status_override_on_cancellation, event_engine_client, event_engine_dns, event_engine_listener, http2_stats_fix, monitoring_experiment, pick_first_new, trace_record_callops, work_serializer_clears_time_cache
+-----------------+
|    catalog name |
+-----------------+
| default_catalog |
|    rest_catalog |
+-----------------+
2 rows in set
+---------------+
| database name |
+---------------+
|            db |
+---------------+
1 row in set
+-------------+
|  table name |
+-------------+
| admin_users |
+-------------+
1 row in set
+----------+--------+------+-----+--------+-----------+
|     name |   type | null | key | extras | watermark |
+----------+--------+------+-----+--------+-----------+
|  user_id | STRING | TRUE |     |        |           |
| username | STRING | TRUE |     |        |           |
|    email | STRING | TRUE |     |        |           |
|     city | STRING | TRUE |     |        |           |
|      age |    INT | TRUE |     |        |           |
+----------+--------+------+-----+--------+-----------+
5 rows in set   

I tried to execute the same script without using the warehouse settings, but got the same results (ref)

The list of JAR files stored in lib directory

aws-core-2.26.25.jar
aws-java-sdk-bundle-1.12.648.jar
aws-java-sdk-s3-1.12.765.jar
commons-configuration2-2.1.1.jar
flink-s3-fs-hadoop-1.19.1.jar
flink-sql-connector-kafka-3.2.0-1.19.jar
hadoop-auth-3.3.4.jar
hadoop-aws-3.3.4.jar
hadoop-common-3.3.4.jar
hadoop-hdfs-3.3.4.jar
hadoop-hdfs-client-3.3.4.jar
hadoop-mapreduce-client-core-3.3.4.jar
hadoop-shaded-guava-1.1.1.jar
iceberg-aws-1.6.0.jar
iceberg-flink-runtime-1.19-1.6.0.jar
iceberg-parquet-1.6.0.jar
s3-2.26.25.jar
stax2-api-4.2.1.jar
woodstox-core-5.3.0.jar   

the logs from iceberg-rest catalog

2024-07-30T21:23:31.941 ERROR [org.apache.iceberg.rest.RESTCatalogServlet] - Error processing REST request
org.apache.iceberg.exceptions.RESTException: Unhandled error: ErrorResponse(code=404, type=NoSuchTableException, message=Table does not exist: db.admin_users)
org.apache.iceberg.exceptions.NoSuchTableException: Table does not exist: db.admin_users
        at org.apache.iceberg.BaseMetastoreCatalog.loadTable(BaseMetastoreCatalog.java:55)
        at org.apache.iceberg.rest.CatalogHandlers.loadTable(CatalogHandlers.java:269)
        at org.apache.iceberg.rest.RESTCatalogAdapter.handleRequest(RESTCatalogAdapter.java:372)      
        at org.apache.iceberg.rest.RESTServerCatalogAdapter.handleRequest(RESTServerCatalogAdapter.java:42)
        at org.apache.iceberg.rest.RESTCatalogAdapter.execute(RESTCatalogAdapter.java:527)
        at org.apache.iceberg.rest.RESTCatalogServlet.execute(RESTCatalogServlet.java:100)
        at org.apache.iceberg.rest.RESTCatalogServlet.doGet(RESTCatalogServlet.java:66)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:687)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
        at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:799)
        at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:554)
        at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143)
        at org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:772)
        at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
        at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:235)
        at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1440)
        at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
        at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:505)
        at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
        at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1355)
        at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
        at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
        at org.eclipse.jetty.server.Server.handle(Server.java:516)
        at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:487)
        at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:732)
        at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:479)
        at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:277)
        at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)        at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:105)
        at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104)
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:338)     
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:315)   
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:173)  
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:131)
        at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:409)
        at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:883)
        at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1034)
        at java.base/java.lang.Thread.run(Thread.java:840)
        at org.apache.iceberg.rest.RESTCatalogAdapter.execute(RESTCatalogAdapter.java:544)
        at org.apache.iceberg.rest.RESTCatalogServlet.execute(RESTCatalogServlet.java:100)
        at org.apache.iceberg.rest.RESTCatalogServlet.doGet(RESTCatalogServlet.java:66)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:687)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
        at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:799)
        at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:554)
        at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143)
        at org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:772)
        at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
        at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:235)
        at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1440)
        at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
        at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:505)
        at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
        at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1355)
        at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
        at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
        at org.eclipse.jetty.server.Server.handle(Server.java:516)
        at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:487)
        at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:732)
        at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:479)
        at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:277)
        at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)        at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:105)
        at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104)
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:338)     
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:315)
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:173)  
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:131)
        at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:409)
        at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:883)
        at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1034)      
        at java.base/java.lang.Thread.run(Thread.java:840)
2024-07-30T21:23:32.639 INFO  [org.apache.iceberg.BaseMetastoreCatalog] - Table properties set at catalog level through catalog properties: {}
2024-07-30T21:23:32.646 INFO  [org.apache.iceberg.BaseMetastoreCatalog] - Table properties enforced at catalog level through catalog properties: {}
2024-07-30T21:23:35.593 INFO  [org.apache.iceberg.BaseMetastoreTableOperations] - Successfully committed to table db.admin_users in 2863 ms
2024-07-30T21:23:35.605 INFO  [org.apache.iceberg.BaseMetastoreTableOperations] - Refreshing table metadata from new version: s3://warehouse/db/admin_users/metadata/00000-014dd243-3ddc-4f53-a600-6d492e079819.metadata.json
2024-07-30T21:23:35.937 INFO  [org.apache.iceberg.BaseMetastoreTableOperations] - Refreshing table metadata from new version: s3://warehouse/db/admin_users/metadata/00000-014dd243-3ddc-4f53-a600-6d492e079819.metadata.json
2024-07-30T21:23:35.969 INFO  [org.apache.iceberg.BaseMetastoreCatalog] - Table loaded by catalog: rest_backend.db.admin_users
2024-07-30T21:23:36.255 INFO  [org.apache.iceberg.BaseMetastoreTableOperations] - Refreshing table metadata from new version: s3://warehouse/db/admin_users/metadata/00000-014dd243-3ddc-4f53-a600-6d492e079819.metadata.json
2024-07-30T21:23:36.277 INFO  [org.apache.iceberg.BaseMetastoreCatalog] - Table loaded by catalog: rest_backend.db.admin_users
2024-07-30T21:23:41.944 INFO  [org.apache.iceberg.BaseMetastoreTableOperations] - Refreshing table metadata from new version: s3://warehouse/db/admin_users/metadata/00000-014dd243-3ddc-4f53-a600-6d492e079819.metadata.json
2024-07-30T21:23:41.994 INFO  [org.apache.iceberg.BaseMetastoreCatalog] - Table loaded by catalog: rest_backend.db.admin_users

The objects created on MinIO image

Willingness to contribute

nastra commented 3 months ago

@Al-Moatasem is it mainly about the NoSuchTableException in the log? That exception can happen when tableExists(..) is called but it is generally not an issue. According to 2024-07-30T21:23:35.593 INFO [org.apache.iceberg.BaseMetastoreTableOperations] - Successfully committed to table db.admin_users in 2863 ms it seems this works as expected and commits to that table succeed.

Do you have a data folder at the same level as the metadata folder?

Al-Moatasem commented 3 months ago

@nastra

Do you have a data folder at the same level as the metadata folder?

No, there is no data directory