apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.23k stars 2.17k forks source link

Flink: OverflowError: value too large to convert to int32_t #8874

Open a8356555 opened 11 months ago

a8356555 commented 11 months ago

Apache Iceberg version

1.4.0 (latest release)

Query engine

Flink

Please describe the bug šŸž

2023-10-20 13:26:02,295 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]
2023-10-20 13:26:02,419 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]
2023-10-20 13:26:02,556 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]
2023-10-20 13:26:02,727 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 267, in _execute
    response = task()
  File "/usr/local/lib/python3.8/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 340, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
  File "/usr/local/lib/python3.8/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 580, in do_instruction
    return getattr(self, request_type)(
  File "/usr/local/lib/python3.8/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 618, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/usr/local/lib/python3.8/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 995, in process_bundle
    input_op_by_transform_id[element.transform_id].process_encoded(
  File "/usr/local/lib/python3.8/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 221, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 346, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 348, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 169, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 194, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 104, in pyflink.fn_execution.beam.beam_operations_fast.IntermediateOutputProcessor.process_outputs
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 169, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 194, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 92, in pyflink.fn_execution.beam.beam_operations_fast.NetworkOutputProcessor.process_outputs
  File "pyflink/fn_execution/beam/beam_coder_impl_fast.pyx", line 101, in pyflink.fn_execution.beam.beam_coder_impl_fast.FlinkLengthPrefixCoderBeamWrapper.encode_to_stream
  File "pyflink/fn_execution/coder_impl_fast.pyx", line 273, in pyflink.fn_execution.coder_impl_fast.IterableCoderImpl.encode_to_stream
  File "pyflink/fn_execution/coder_impl_fast.pyx", line 401, in pyflink.fn_execution.coder_impl_fast.RowCoderImpl.encode_to_stream
  File "pyflink/fn_execution/coder_impl_fast.pyx", line 401, in pyflink.fn_execution.coder_impl_fast.RowCoderImpl.encode_to_stream
  File "pyflink/fn_execution/coder_impl_fast.pyx", line 528, in pyflink.fn_execution.coder_impl_fast.IntCoderImpl.encode_to_stream
OverflowError: value too large to convert to int32_t

After this error showed up, my job was stopped and only the data/ folder had some data inserted. The metadata/ folder had nothing inserted.

I'm using pyflink to write data to s3 bucket in iceberg format.

My insert into query:

stenv.execute_sql(f'''
    INSERT INTO mycatalog.mydb.mytable  /*+ OPTIONS('upsert-enabled'='true') */
    SELECT 
        *
    FROM event
''')

The event table has been checked through the print connector:

stenv.execute_sql('''CREATE TABLE print_table (
                        *sql_schema_as_target_table*
                    ) WITH ('connector' = 'print')''')

stenv.execute_sql('''
    INSERT INTO print_table
    SELECT 
        *
    FROM event
''') 

# part of log:
# +I[2023-10-16T17:31:58, 1697448718, 2318, , WS, APP, Unknown, ZZ, 2023-10-16T09:31:58, false]
# +I[2023-10-16T11:24:18, 1697426658, 2469, 2D3B7326-944B-44C1-A2B2-923B44B0AD5, WS, APP, Unknown, ZZ, 2023-10-16T03:24:18, false]
# +I[2023-10-16T11:24:25, 1697426665, 3287, 2D3B7326-944B-44C1-A2B2-923B44B0AD5, WS, APP, Unknown, ZZ, 2023-10-16T03:24:25, false]
# +I[2023-10-16T11:53:01, 1697428381, 3287, 2D3B7326-944B-44C1-A2B2-923B44B0AD5, WS, APP, Unknown, ZZ, 2023-10-16T03:53:01, false]
# +I[2023-10-17T16:19:16, 1697530756, 2201, A164D39D-B432-4ECA-8F24-456D28A8AC68, WS, APP, Unknown, ZZ, 2023-10-17T08:19:16, false]
# 2023-10-19 04:17:17,433 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]
# 2023-10-19 04:17:17,588 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]
# 2023-10-19 04:17:17,751 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]

Here is my create catalog and create table code

stenv.execute_sql('''
    CREATE CATALOG `mycatalog` 
    WITH (
        'type' = 'iceberg',
        'catalog-impl' = 'org.apache.iceberg.aws.glue.GlueCatalog',
        'io-impl' = 'org.apache.iceberg.aws.s3.S3FileIO',  
        'warehouse' = 's3://mywarehouse'
    )
''')
stenv.execute_sql('''
    CREATE TABLE IF NOT EXISTS mycatalog.mydb.mytable (
        *sql_schema*,
    PRIMARY KEY(`id`,`user_id`) NOT ENFORCED
    ) 
    PARTITIONED BY (`user_id`)
    WITH(
            'format-version'='2',
            'write.upsert.enabled'='true',
            'write.parquet.compression-codec'='gzip',
            'write.metadata.delete-after-commit.enabled'='true',
            'write.delete.mode'='merge-on-read',
            'write.update.mode'='merge-on-read',
            'write.merge.mode'='merge-on-read',
            'write.parquet.row-group-size-bytes'='33554432',
            'write.target-file-size-bytes'='134217728'
    )
''')

Does anyone have any idea?

I'm running my code in a docker container

Here is my Dockerfile:

FROM alpine:3.17.0 AS builder

WORKDIR /tmp/download/my-jars
RUN wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-flink-runtime-1.16/1.4.0/iceberg-flink-runtime-1.16-1.4.0.jar
RUN wget https://repo1.maven.org/maven2/software/amazon/awssdk/bundle/2.17.263/bundle-2.17.263.jar
RUN wget https://repo1.maven.org/maven2/software/amazon/awssdk/url-connection-client/2.17.263/url-connection-client-2.17.263.jar

FROM apache/flink:1.16.2-scala_2.12-java8 AS runtime

RUN apt-get update && apt-get install -y software-properties-common &&  \
    add-apt-repository -y ppa:deadsnakes/ppa &&  \
    apt-get remove -y software-properties-common && apt-get autoremove -y && apt-get clean
RUN apt-get update && apt-get install -y python3.8 python3-pip python3.8-distutils git && apt-get clean
RUN python3.8 -m pip install --upgrade pip
RUN python3.8 -m pip install apache-flink==1.16.2 --no-cache-dir

WORKDIR /tmp/download/my-hadoop
RUN wget https://dlcdn.apache.org/hadoop/common/hadoop-3.3.4/hadoop-3.3.4.tar.gz &&  \
    tar xzf hadoop-3.3.4.tar.gz &&  \
    mv hadoop-3.3.4 /opt/hadoop-3.3.4 &&  \
    rm hadoop-3.3.4.tar.gz
ENV HADOOP_HOME=/opt/hadoop-3.3.4
ENV HADOOP_CLASSPATH=/opt/hadoop-3.3.4/etc/hadoop:/opt/hadoop-3.3.4/share/hadoop/common/lib/*:/opt/hadoop-3.3.4/share/hadoop/common/*:/opt/hadoop-3.3.4/share/hadoop/hdfs:/opt/hadoop-3.3.4/share/hadoop/hdfs/lib/*:/opt/hadoop-3.3.4/share/hadoop/hdfs/*:/opt/hadoop-3.3.4/share/hadoop/mapreduce/*:/opt/hadoop-3.3.4/share/hadoop/yarn:/opt/hadoop-3.3.4/share/hadoop/yarn/lib/*:/opt/hadoop-3.3.4/share/hadoop/yarn/*

COPY --from=builder /tmp/download/my-jars/. /opt/flink/lib/.
github-actions[bot] commented 6 days ago

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.