apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.49k stars 2.24k forks source link

org.apache.iceberg.exceptions.NoSuchTableException after creating table #11445

Open petartushev opened 2 weeks ago

petartushev commented 2 weeks ago

Query engine

I'm using flink as my engine.

Question

I have a java data streaming app that uses flink to process records from kafka that I want to store in iceberg tables with minio as blob storage using a REST iceberg catalog. I used part of the spark docker-compose from the iceberg docs to set up my iceberg and minio services, and my own kafka and flink services that are defined as:

version: "3"

services:
    jobmanager:
        image: flink:1.19.1-java11
        expose:
            - "6121"
        ports:
            - "8081:8081"
        command: jobmanager
        environment:
            - JOB_MANAGER_RPC_ADDRESS=jobmanager

    taskmanager:
        image: flink:1.19.1-java11
        expose:
            - "6122"
            - "6123"
        depends_on:
            - jobmanager
        command: taskmanager
        links:
            - "jobmanager:jobmanager"
        environment:
            - JOB_MANAGER_RPC_ADDRESS=jobmanager

    zookeeper:
        image: wurstmeister/zookeeper:latest
        expose:
            - "2181"

    kafka:
        image: wurstmeister/kafka:2.13-2.8.1
        depends_on:
            - zookeeper
        ports:
            - "9092:9092"
        expose:
            - "9093"
        environment:
            KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
            KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
            KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
            KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
            KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE

    rest:
        image: tabulario/iceberg-rest
        container_name: iceberg-rest
        networks:
            iceberg_net:
        ports: 
            - 8181:8181
        environment:
            - AWS_ACCESS_KEY_ID=admin
            - AWS_SECRET_ACCESS_KEY=password
            - AWS_REGION=us-east-1
            - CATALOG_WAREHOUSE=s3://warehouse/
            - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
            - CATALOG_S3_ENDPOINT=http://minio:9000
    minio:
        image: "minio/minio"
        container_name: "minio"
        environment:
            - MINIO_ROOT_USER=admin
            - MINIO_ROOT_PASSWORD=password
            - MINIO_DOMAIN=minio
        networks:
            iceberg_net:
                aliases:
                    - warehouse.minio
        ports:
            - 9001:9001
            - 9000:9000
        volumes:
            - minio-data:/data
        command: ["server", "/data", "--console-address", ":9001"]

    mc:
        depends_on: 
            - minio
        image: minio/mc
        container_name: mc
        networks:
            iceberg_net:
        environment:
            - AWS_ACCESS_KEY_ID=admin
            - AWS_SECRET_ACCESS_KEY=password
            - AWS_REGION=us-east-1
        entrypoint: >
            /bin/sh -c "
            until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo 'Waiting...' && sleep 1; done;
            /usr/bin/mc rm -r --force minio/warehouse;
            /usr/bin/mc mb minio/warehouse;
            /usr/bin/mc policy set public minio/warehouse;
            tail -f /dev/null
            "

networks:
    iceberg_net:

volumes:
    minio-data:

My flink streaming application is defined as follows:

package org.example;

import com.google.gson.Gson;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.iceberg.Table;
import org.apache.iceberg.aws.s3.S3FileIO;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.hadoop.conf.Configuration;
import pojo.Transaction;
import timestamp_utils.TransactionWatermarkStrategy;

import java.util.HashMap;
import java.util.Map;

public class ProcessTransactions {
    public static void main(String[] args) throws Exception {

        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers("localhost:9092")
                .setTopics("transaction")
                .setGroupId("transactions_group")
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.getConfig().setAutoWatermarkInterval(2000L);
        env.enableCheckpointing(30000);  // e.g., 60000 for every minute
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);

        tableEnvironment.executeSql(
                "CREATE CATALOG bank_transactions_catalog WITH (" +
                        "  'type'='iceberg'," +
                        "  'catalog-impl'='org.apache.iceberg.rest.RESTCatalog'," +
                        "  'uri'='http://localhost:8181'," +
                        "  'warehouse'='s3://warehouse/'," +
                        "  'io-impl'='org.apache.iceberg.aws.s3.S3FileIO'," +
                        "  's3.endpoint'='http://localhost:9000'," +
                        "  's3.access-key-id'='admin'," +
                        "  's3.secret-access-key'='password'," +
                        "  's3.path-style-access'='true'" +
                        ")"
        );
        DataStream<String> dataStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka source");

        DataStream<Transaction> transactionDataStream = dataStream.map(value ->
                        new Gson().fromJson(value, Transaction.class))
                .assignTimestampsAndWatermarks(new TransactionWatermarkStrategy());

        TableSchema tableSchema = TableSchema.builder()
                .field("transactionId", DataTypes.STRING())
                .field("sendingClientAccountNumber", DataTypes.STRING())
                .field("receivingClientAccountNumber", DataTypes.STRING())
                .field("amount", DataTypes.DOUBLE())
                .build();

        Map<String, String> catalogProperties = new HashMap<>();
        catalogProperties.put("uri", "http://localhost:8181");  // REST Catalog URI
        catalogProperties.put("warehouse", "s3://warehouse/");  // S3 warehouse location
        catalogProperties.put("io-impl", S3FileIO.class.getName());  // Use S3 File IO
        catalogProperties.put("s3.endpoint", "http://minio:9000");  // MinIO endpoint
        catalogProperties.put("s3.access-key", "admin");  // MinIO access key
        catalogProperties.put("s3.secret-key", "password");  // MinIO secret key
        catalogProperties.put("s3.path-style-access", "true");  // Required for MinIO

        Configuration hadoopConf = new Configuration();
        hadoopConf.set("fs.s3a.access.key", "admin");
        hadoopConf.set("fs.s3a.secret.key", "password");
        hadoopConf.set("fs.s3a.endpoint", "http://minio:9000");
        hadoopConf.set("fs.s3a.path.style.access", "true");

        CatalogLoader catalogLoader = CatalogLoader.custom(
                "bank_transactions_catalog",  // Name of the catalog
                catalogProperties,  // Catalog properties (e.g., S3 config)
                hadoopConf,  // Hadoop configuration
                "org.apache.iceberg.rest.RESTCatalog"  // Catalog implementation class (REST)
        );

        tableEnvironment.executeSql(
                "CREATE TABLE transactions (" +
                        "transactionId STRING NOT NULL, " +
                        "sendingClientAccountNumber STRING NOT NULL, " +
                        "receivingClientAccountNumber STRING NOT NULL, " +
                        "amount FLOAT NOT NULL " +
                        ")" +
                        "WITH (" +
                        " 'connector' = 'iceberg', " +
                        " 'catalog-impl' = 'org.apache.iceberg.rest.RESTCatalog', " +
                        " 'catalog-name' = 'bank_transactions_catalog', " +
                        " 'database-name' = 'financial_transactions', " +
                        " 'table-name' = 'transactions', " +
                        " 'format' = 'parquet', " +
                        " 'warehouse' = 's3://warehouse/', " +
                        " 'io-impl' = 'org.apache.iceberg.aws.s3.S3FileIO', " +
                        " 'catalog-rest.endpoint' = 'http://localhost:8181', " +
                        " 's3.endpoint' = 'http://minio:9000', " +
                        " 's3.access-key-id' = 'admin', " +
                        " 's3.secret-access-key' = 'password', " +
                        " 's3.path-style-access' = 'true' " +
                        ")"
        );

        TableLoader tableLoader = TableLoader.fromCatalog(catalogLoader, TableIdentifier.of("financial_transactions", "transactions"));

        DataStream<Row> rowDataStream = transactionDataStream.map(
                transaction -> Row.of(transaction.getTransactionId(),
                        transaction.getSendingClientAccountNumber(),
                        transaction.getReceivingClientAccountNumber(),
                        transaction.getAmount())
        );

        FlinkSink.forRow(rowDataStream, tableSchema)
                .tableLoader(tableLoader)
                .overwrite(false)
                .append();

        try{
            env.execute(ProcessTransactions.class.getName());
        }
        catch (Exception e){
            e.printStackTrace();
        }

    }
}

If I run the application in debug mode, and I run the command: tableEnvironment.executeSql("SELECT * FROM transactions").collect() I get the error:

Unable to create a source for reading table 'bank_transactions_catalog.financial_transactions.transactions'.

Table options are:

'catalog-impl'='org.apache.iceberg.rest.RESTCatalog'
'catalog-name'='bank_transactions_catalog'
'catalog-rest.endpoint'='http://localhost:8181'
'connector'='iceberg'
'database-name'='financial_transactions'
'format'='parquet'
'io-impl'='org.apache.iceberg.aws.s3.S3FileIO'
's3.access-key-id'='admin'
's3.endpoint'='http://minio:9000'
's3.path-style-access'='true'
's3.secret-access-key'='******'
'table-name'='transactions'
'warehouse'='s3://warehouse/'

with the cause being: java.lang.NullPointerException: Invalid uri for http client: null. However, if the program is run without debug mode it gives me the error:

Exception in thread "main" org.apache.iceberg.exceptions.NoSuchTableException: Table does not exist: financial_transactions.transactions

This is a newbie problem, but for some time I cannot figure how to debug this. Any help or general advice towards resolving this issue is welcome.

nastra commented 2 weeks ago

I believe 'catalog-rest.endpoint' = 'http://localhost:8181' should rather be uri' = 'http://localhost:8181'