apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.81k stars 4.23k forks source link

[Bug]: IcebergIO cannot write Timestamp columns #32680

Open DanielMorales9 opened 1 day ago

DanielMorales9 commented 1 day ago

What happened?

When Trying to write timestamp data into an Iceberg Table I get the following exception:

Caused by: java.lang.ClassCastException: class java.lang.Long cannot be cast to class java.time.OffsetDateTime (java.lang.Long and java.time.OffsetDateTime are in module java.base of loader 'bootstrap')
        at org.apache.iceberg.data.parquet.BaseParquetWriter$TimestamptzWriter.write(BaseParquetWriter.java:281)
        at org.apache.iceberg.parquet.ParquetValueWriters$OptionWriter.write(ParquetValueWriters.java:356)
        at org.apache.iceberg.parquet.ParquetValueWriters$StructWriter.write(ParquetValueWriters.java:589)
        at org.apache.iceberg.parquet.ParquetWriter.add(ParquetWriter.java:135)
        at org.apache.iceberg.io.DataWriter.write(DataWriter.java:71)
        at org.apache.beam.sdk.io.iceberg.RecordWriter.write(RecordWriter.java:105)
        at org.apache.beam.sdk.io.iceberg.RecordWriterManager$DestinationState.write(RecordWriterManager.java:144)
        at org.apache.beam.sdk.io.iceberg.RecordWriterManager.write(RecordWriterManager.java:234)
        at org.apache.beam.sdk.io.iceberg.WriteUngroupedRowsToFiles$WriteUngroupedRowsToFilesDoFn.processElement(WriteUngroupedRowsToFiles.java:224)
        Suppressed: java.lang.IllegalArgumentException: Expected all data writers to be closed, but found 1 data writer(s) still open

Source code:

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.Row;
import org.joda.time.Instant;

import java.util.Arrays;
import java.util.List;
import java.util.Map;

public class HardcodedDataToIcebergPipeline {

    public static void main(String[] args) {
        // Define Beam pipeline options
        PipelineOptions options =
                PipelineOptionsFactory.fromArgs(args).withValidation().as(PipelineOptions.class);
        Pipeline pipeline = Pipeline.create(options);

        // Define Beam schema with a timestamp field
        Schema beamSchema = Schema.builder()
                .addStringField("id")
                .addDateTimeField("event_timestamp")
                .build();

        Map<String, Object> catalogConfig = new HashMap<String, Object>() {{
            put("warehouse", "<warehouse-dir>");
            put("uri", "<thrift-uri>");
            put("type", "iceberg");
            put("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem");
            put("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS");
        }};

        Map<String, Object> icebergConfig = new HashMap<String, Object>() {{
            put("catalog_name", appConfig.get("catalog"));
            put("catalog_properties", catalogConfig);
        }};
        String table = String.format("%s.%s", "myschema", "test");
        icebergConfig.put("table", table);

        // Create hardcoded data with timestamp
        List<Row> hardcodedData = Arrays.asList(
                Row.withSchema(beamSchema)
                        .addValues("record-1", Instant.now()) // Use Joda-Time Instant
                        .build(),
                Row.withSchema(beamSchema)
                        .addValues("record-2", Instant.now().minus(3600 * 1000)) // 1 hour ago
                        .build(),
                Row.withSchema(beamSchema)
                        .addValues("record-3", Instant.now().minus(24 * 3600 * 1000)) // 1 day ago
                        .build()
        );

        // Build the pipeline using the hardcoded data
        pipeline
                .apply("CreateHardcodedData", Create.of(hardcodedData).withRowSchema(beamSchema)) // Hardcoded 
                .apply("WriteToIceberg", Managed.write(Managed.ICEBERG).withConfig(icebergConfig));

        // Run the pipeline
        pipeline.run().waitUntilFinish();
    }
}

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

ahmedabu98 commented 1 day ago

Thanks for the feedback @DanielMorales9. Opened a PR to add support for this: #32688

P.S. Iceberg API works with Java time library, not Joda. After the PR is merged, you should be able to write timestamps if you change your schema to:

import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
...
        // Define Beam schema with a timestamp field
        Schema beamSchema = Schema.builder()
                .addStringField("id")
                .addLogicalTypeField("event_timestamp", SqlTypes.DATETIME)
                .build();

and your Row values to:

import java.time.LocalDateTime;
...
        List<Row> hardcodedData = Arrays.asList(
                Row.withSchema(beamSchema)
                        .addValues("record-1", LocalDateTime.now()) // Use Java-Time LocalDateTime
                        .build(),
                        ....
ahmedabu98 commented 1 day ago

@DanielMorales9 Scratch that -- we can add support for both Joda and Java libraries when writing to Iceberg (so your existing code should still work).

However, when reading from Iceberg, I'm afraid we will have to stick with just the Java time library (meaning the output Row field type will be SqlType.DATETIME)

DanielMorales9 commented 10 hours ago

Hi @ahmedabu98, thank you for you quick reply 🙏
I left a few comments on the PR.