apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.35k stars 2.2k forks source link

Flink: Not Writing #8916

Open a8356555 opened 11 months ago

a8356555 commented 11 months ago

Apache Iceberg version

1.4.0

Query engine

Flink

Please describe the bug 🐞

I'm using following Dockerfile as my environment:

FROM alpine:3.17.0 AS builder

# Download required jars
WORKDIR /tmp/download/my-jars
RUN wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-flink-runtime-1.16/1.4.0/iceberg-flink-runtime-1.16-1.4.0.jar
RUN wget https://repo1.maven.org/maven2/software/amazon/awssdk/bundle/2.17.263/bundle-2.17.263.jar
RUN wget https://repo1.maven.org/maven2/software/amazon/awssdk/url-connection-client/2.17.263/url-connection-client-2.17.263.jar

FROM apache/flink:1.16.2-scala_2.12-java8 AS runtime

# Install Python 3.8 & git & PyFlink
RUN apt-get update && apt-get install -y software-properties-common &&  \
    add-apt-repository -y ppa:deadsnakes/ppa &&  \
    apt-get remove -y software-properties-common && apt-get autoremove -y && apt-get clean
RUN apt-get update && apt-get install -y python3.8 python3-pip python3.8-distutils git && apt-get clean
RUN python3.8 -m pip install --upgrade pip
RUN python3.8 -m pip install apache-flink==1.16.2 --no-cache-dir

Install Hadoop & export Hadoop classpath
WORKDIR /tmp/download/my-hadoop
RUN wget https://dlcdn.apache.org/hadoop/common/hadoop-3.3.4/hadoop-3.3.4.tar.gz &&  \
    tar xzf hadoop-3.3.4.tar.gz &&  \
    mv hadoop-3.3.4 /opt/hadoop-3.3.4 &&  \
    rm hadoop-3.3.4.tar.gz
ENV HADOOP_HOME=/opt/hadoop-3.3.4
ENV HADOOP_CLASSPATH=/opt/hadoop-3.3.4/etc/hadoop:/opt/hadoop-3.3.4/share/hadoop/common/lib/*:/opt/hadoop-3.3.4/share/hadoop/common/*:/opt/hadoop-3.3.4/share/hadoop/hdfs:/opt/hadoop-3.3.4/share/hadoop/hdfs/lib/*:/opt/hadoop-3.3.4/share/hadoop/hdfs/*:/opt/hadoop-3.3.4/share/hadoop/mapreduce/*:/opt/hadoop-3.3.4/share/hadoop/yarn:/opt/hadoop-3.3.4/share/hadoop/yarn/lib/*:/opt/hadoop-3.3.4/share/hadoop/yarn/*

Copy jars from builder stage
COPY --from=builder /tmp/download/my-jars/. /opt/flink/lib/.

Here is my pyflink code (job.py)


import os

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
from pyflink.common import Types

ICEBERG_GLUE_WAREHOUSE = os.environ['ICEBERG_GLUE_WAREHOUSE']
ICEBERG_GLUE_DATABASE_NAME_SRC = os.environ['ICEBERG_GLUE_DATABASE_NAME_SRC']
ICEBERG_GLUE_DATABASE_NAME_DST = os.environ['ICEBERG_GLUE_DATABASE_NAME_DST']
ICEBERG_GLUE_TABLE_NAME_SRC = os.environ['ICEBERG_GLUE_TABLE_NAME_SRC']
ICEBERG_GLUE_TABLE_NAME_DST = os.environ['ICEBERG_GLUE_TABLE_NAME_DST']

env = StreamExecutionEnvironment.get_execution_environment()
env.disable_operator_chaining()
env.set_parallelism(1)
env.enable_checkpointing(60000) # 60s
checkpoint_config = env.get_checkpoint_config()
checkpoint_config.set_checkpoint_storage_dir('file:///tmp/checkpoint')

stenv = StreamTableEnvironment.create(env)
stenv.get_config().get_configuration().set_string('table.local-time-zone', 'UTC')

stenv.execute_sql(f'''
CREATE TEMPORARY TABLE `mytable` (
                `t`             TIMESTAMP,
                `table`         STRING,
                `op`            STRING,
                `before`        MAP<STRING, STRING>,
                `after`         MAP<STRING, STRING>,
                `_kc_tx`        TIMESTAMP,
                `_kc_source`    MAP<STRING, STRING>,
                `_kafka`        ROW<`topic` STRING, `partition` INT, `offset` BIGINT, `timestamp` TIMESTAMP>
            ) WITH (
                'connector' = 'iceberg',
                'warehouse' = '{ICEBERG_GLUE_WAREHOUSE}',
                'catalog-impl' = 'org.apache.iceberg.aws.glue.GlueCatalog',
                'io-impl' = 'org.apache.iceberg.aws.s3.S3FileIO',  
                'catalog-name' = 'mycatalog',
                'catalog-database' = '{ICEBERG_GLUE_DATABASE_NAME_SRC}',
                'catalog-table' = '{ICEBERG_GLUE_TABLE_NAME_SRC}'
              );
''')

stenv.execute_sql(f'''
CREATE CATALOG `mycatalog` 
        WITH (
            'type' = 'iceberg',
            'catalog-impl' = 'org.apache.iceberg.aws.glue.GlueCatalog',
            'io-impl' = 'org.apache.iceberg.aws.s3.S3FileIO',  
            'warehouse' = '{ICEBERG_GLUE_WAREHOUSE}'
        );
''')

stenv.execute_sql(f'''
CREATE TABLE IF NOT EXISTS `mycatalog`.`{ICEBERG_GLUE_DATABASE_NAME_DST}`.`{ICEBERG_GLUE_TABLE_NAME_DST}`
        (
            `t` TIMESTAMP,
            `table` STRING,
            `op` STRING,
            PRIMARY KEY (`table`) NOT ENFORCED
        ) 
        PARTITIONED BY (`table`)
        WITH (
            'format-version'='2',
            'write.upsert.enabled'='true'
        );
''')

type_info_datom = Types.ROW_NAMED(
                                    field_names=[
                                        't',
                                        'table',
                                        'op'
                                    ],
                                    field_types=[
                                        Types.SQL_TIMESTAMP(),
                                        Types.STRING(),
                                        Types.STRING()
                                    ]
                                )

sql = f'''
    select 
        `t`,
        `table`,
        `op`
    from `table_src` 
    /*+ OPTIONS('streaming'='true', 'monitor-interval'='300s')*/
'''

cdc_datom_stream = stenv.to_append_stream(
    stenv.sql_query(sql),
    type_info_datom
)

stenv.create_temporary_view('event', cdc_datom_stream)

stenv.execute_sql(f'''
INSERT INTO `mycatalog`.`{ICEBERG_GLUE_DATABASE_NAME_DST}`.`{ICEBERG_GLUE_TABLE_NAME_DST}` /*+ OPTIONS('upsert-enabled'='true', 'write-parallelism'='1') */
    SELECT 
        `t`, `table` ,`op`
    FROM mytable;

''')

I'm running this code using

flink run -pyexec /usr/bin/python3.8 -pyclientexec /usr/bin/python3.8 -py job.py

However, the job never write data into my iceberg in s3 here is the dashboard

截圖 2023-10-24 下午7 20 03

Why is Iceberg StreamWriter stuck? There's no error in the jobmanager's log

parrik commented 5 months ago

@a8356555 have a similar issue - how did you unblock yourself?

beingRealFrank commented 5 months ago

I am seeing a very similar issue when trying to write to S3 from pyflink using a query like the following.

INSERT INTO iceberg.devdb.table_name
SELECT * FROM transformed_metrics;

I can write to S3 fine with a manual/static insert of a single row using pyflink, but when I do this it silently fails. I'm unclear if I'm just using the platform wrong or if there's a bug. Since there are no glaring issues in the logs.

parrik commented 5 months ago

https://github.com/apache/iceberg/pull/1515 fwiw

pvary commented 5 months ago

@parrik: The issue you have linked is for the cases where the checkpointing is not enabled. If I read the python code correctly, this is not the case here. Still, the issue seems similar, likely the checkpointing is not triggered for the committer. Maybe the missing part is, that the notifyCheckpointComplete is not called. It would be good to see the DEBUG logs for the IcebergFilesCommitter to see what is happening.