delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.29k stars 1.64k forks source link

[BUG] [FLINK] Delta log not updated with with the parquet files created after the last checkpoint before Flink Job completed #3057

Open galadrielwithlaptop opened 2 months ago

galadrielwithlaptop commented 2 months ago

Bug

Which Delta project/connector is this regarding?

Describe the problem

I have a delta flink program similar to this. https://learn.microsoft.com/en-us/azure/hdinsight-aks/flink/use-flink-delta-connector . I enabled checkpointing to 1s and job takes around 10 chkpoints and delta_log gets updated 10 times. But after the 10th chkpoint, one parquet files gets created and i can see one more chkpoint is invoked. Then the job got finished. But somehow, the delta log file is not updated. Please check logs here.

_2024-05-06 21:04:17.884 [] Sink: Global Committer (1/1)#0 INFO  flink io.delta.standalone.internal.OptimisticTransactionImpl 48 [tableId=34e8a131,txnId=cac93905] Committed delta #9 to abfs://appmode@xy.dfs.core.windows.net/delta-session3/_delta_log
2024-05-06 21:04:17.884 [] Sink: Global Committer (1/1)#0 INFO  flink flink.sink.internal.committer.DeltaGlobalCommitter 450 Successfully committed transaction (appId='45470aec-412d-4f74-87b0-22aa50a8f468', checkpointId='10')
2024-05-06 21:04:17.886 [] Sink: Global Committer (1/1)#0 INFO  flink apache.flink.runtime.taskmanager.Task 1084 Sink: Global Committer (1/1)#0 (6e0af40e48232d9d6a2de4a6a20d497b_306d8342cb5b2ad8b53f1be57f65bee8_0_0) switched from RUNNING to FINISHED._

I mean if there is some config , I am missing.

ABFS storage

image

image

scottsand-db commented 2 months ago

Hi @galadrielwithlaptop - is there a simple coding example you can create that can reproduce this issue?

galadrielwithlaptop commented 2 months ago
package contoso.example;

import io.delta.flink.sink.DeltaSink;
import io.delta.flink.source.DeltaSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;

public class DeltaSourceExample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Define the sink Delta table path
        String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";

        // Define the source Delta table path
        String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";

        // Define the source Delta table path
        RowType rowType = RowType.of(
                DataTypes.STRING().getLogicalType(),  // Date
                DataTypes.STRING().getLogicalType(),  // Time
                DataTypes.STRING().getLogicalType(),  // TargetTemp
                DataTypes.STRING().getLogicalType(),  // ActualTemp
                DataTypes.STRING().getLogicalType(),  // System
                DataTypes.STRING().getLogicalType(),  // SystemAge
                DataTypes.STRING().getLogicalType()   // BuildingID
        );

        // Create a bounded Delta source for all columns
        DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);

        createDeltaSink(deltaStream, deltaTablePath_sink, rowType);

        // Execute the Flink job
        env.execute("Delta datasource and sink Example");
    }

    public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
            StreamExecutionEnvironment env,
            String deltaTablePath) {

        DeltaSource<RowData> deltaSource = DeltaSource
                .forBoundedRowData(
                        new Path(deltaTablePath),
                        new Configuration())
                .build();

        return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
    }

    public static DataStream<RowData> createDeltaSink(
            DataStream<RowData> stream,
            String deltaTablePath,
            RowType rowType) {
        DeltaSink<RowData> deltaSink = DeltaSink
                .forRowData(
                        new Path(deltaTablePath),
                        new Configuration(),
                        rowType)
                .build();
        stream.sinkTo(deltaSink);
        return stream;
    }
}
galadrielwithlaptop commented 2 months ago

You can keep data of few data points only (say 100). and a checkpoint interval of lets just say 5 secs.

I am pretty sure that TMs are freed before DeltaGlobalCommiter could complete.

galadrielwithlaptop commented 2 months ago

TM logs

2024-05-09 15:46:34.259 [] Source: delta-source -> Sink: Writer -> Sink: Committer (1/1)#0 INFO flink flink.sink.internal.committer.DeltaCommitter 103 Committing delta committable locally: appId=2ee0131e-257c-4466-8523-3b3df0333e88 checkpointId=13 deltaPendingFile=DeltaPendingFile(fileName=part-b7a731e2-610c-4035-b35d-ee5052c3cae7-12.snappy.parquet lastUpdateTime=1715269592562 fileSize=147467 recordCount=115818 partitionSpec={}) 2024-05-09 15:46:34.291 [] Sink: Global Committer (1/1)#0 INFO flink io.delta.standalone.internal.DeltaLogImpl 48 Loading version 10 starting from checkpoint version 10. 2024-05-09 15:46:34.291 [] Sink: Global Committer (1/1)#0 INFO flink io.delta.standalone.internal.SnapshotImpl 48 [tableId=28bbe224-a5af-413d-ab4c-edee66f9d2b7] Created snapshot io.delta.standalone.internal.SnapshotImpl@1c9609eb 2024-05-09 15:46:34.332 [] Source: delta-source -> Sink: Writer -> Sink: Committer (1/1)#0 INFO flink connector.base.source.reader.SourceReaderBase 265 Closing Source Reader. 2024-05-09 15:46:34.336 [] Source: delta-source -> Sink: Writer -> Sink: Committer (1/1)#0 INFO flink apache.flink.runtime.taskmanager.Task 1084 Source: delta-source -> Sink: Writer -> Sink: Committer (1/1)#0 (1d84e090e73071fc3a4eb1dc41bb9833_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from RUNNING to FINISHED. 2024-05-09 15:46:34.336 [] Source: delta-source -> Sink: Writer -> Sink: Committer (1/1)#0 INFO flink apache.flink.runtime.taskmanager.Task 829 Freeing task resources for Source: delta-source -> Sink: Writer -> Sink: Committer (1/1)#0 (1d84e090e73071fc3a4eb1dc41bb9833_cbc357ccb763df2852fee8c4fc7d55f2_0_0). 2024-05-09 15:46:34.336 [] flink-akka.actor.default-dispatcher-17 INFO flink apache.flink.runtime.taskexecutor.TaskExecutor 1970 Un-registering task and sending final execution state FINISHED to JobManager for task Source: delta-source -> Sink: Writer -> Sink: Committer (1/1)#0 1d84e090e73071fc3a4eb1dc41bb9833_cbc357ccb763df2852fee8c4fc7d55f2_0_0. 2024-05-09 15:46:34.415 [] Sink: Global Committer (1/1)#0 INFO flink org.apache.parquet.hadoop.InternalParquetRecordReader 190 RecordReader initialized will read a total of 14 records. 2024-05-09 15:46:34.415 [] Sink: Global Committer (1/1)#0 INFO flink org.apache.parquet.hadoop.InternalParquetRecordReader 124 at row 0. reading next block 2024-05-09 15:46:34.435 [] Sink: Global Committer (1/1)#0 INFO flink apache.hadoop.io.compress.CodecPool 184 Got brand-new decompressor [.snappy] 2024-05-09 15:46:34.437 [] Sink: Global Committer (1/1)#0 INFO flink org.apache.parquet.hadoop.InternalParquetRecordReader 133 block read in memory in 22 ms. row count = 14 2024-05-09 15:46:34.461 [] Sink: Global Committer (1/1)#0 INFO flink io.delta.standalone.internal.SnapshotImpl 48 Using the protocol from the protocolMetadataHint: Protocol(1,2) 2024-05-09 15:46:34.461 [] Sink: Global Committer (1/1)#0 INFO flink io.delta.standalone.internal.SnapshotImpl 48 Using the metadata from the protocolMetadataHint: Metadata(28bbe224-a5af-413d-ab4c-edee66f9d2b7,null,null,Format(parquet,Map()),{"type":"struct","fields":[{"name":"f0","type":"string","nullable":true,"metadata":{}},{"name":"f1","type":"integer","nullable":true,"metadata":{}},{"name":"f2","type":"boolean","nullable":true,"metadata":{}}]},List(),Map(),Some(1715269582326)) 2024-05-09 15:46:34.462 [] Sink: Global Committer (1/1)#0 INFO flink io.delta.standalone.internal.DeltaLogImpl 48 Updated snapshot to io.delta.standalone.internal.SnapshotImpl@1c9609eb 2024-05-09 15:46:34.462 [] Sink: Global Committer (1/1)#0 INFO flink flink.sink.internal.committer.DeltaGlobalCommitter 461 0 files to be committed to the Delta table for appId=2ee0131e-257c-4466-8523-3b3df0333e88 checkpointId=12. 2024-05-09 15:46:34.463 [] Sink: Global Committer (1/1)#0 INFO flink flink.sink.internal.committer.DeltaGlobalCommitter 446 Attempting to commit transaction (appId='2ee0131e-257c-4466-8523-3b3df0333e88', checkpointId='12') 2024-05-09 15:46:34.464 [] Sink: Global Committer (1/1)#0 INFO flink io.delta.standalone.internal.OptimisticTransactionImpl 48 [tableId=28bbe224,txnId=21cbc92d] Attempting to commit version 11 with 3 actions with Serializable isolation level 2024-05-09 15:46:34.660 [] Sink: Global Committer (1/1)#0 INFO flink io.delta.standalone.internal.DeltaLogImpl 48 Loading version 11 starting from checkpoint version 10. 2024-05-09 15:46:34.660 [] Sink: Global Committer (1/1)#0 INFO flink io.delta.standalone.internal.SnapshotImpl 48 [tableId=28bbe224-a5af-413d-ab4c-edee66f9d2b7] Created snapshot io.delta.standalone.internal.SnapshotImpl@24561802 2024-05-09 15:46:34.754 [] Sink: Global Committer (1/1)#0 INFO flink org.apache.parquet.hadoop.InternalParquetRecordReader 190 RecordReader initialized will read a total of 14 records. 2024-05-09 15:46:34.754 [] Sink: Global Committer (1/1)#0 INFO flink org.apache.parquet.hadoop.InternalParquetRecordReader 124 at row 0. reading next block 2024-05-09 15:46:34.770 [] Sink: Global Committer (1/1)#0 INFO flink org.apache.parquet.hadoop.InternalParquetRecordReader 133 block read in memory in 16 ms. row count = 14 2024-05-09 15:46:34.775 [] Sink: Global Committer (1/1)#0 INFO flink io.delta.standalone.internal.SnapshotImpl 48 Using the protocol from the protocolMetadataHint: Protocol(1,2) 2024-05-09 15:46:34.775 [] Sink: Global Committer (1/1)#0 INFO flink io.delta.standalone.internal.SnapshotImpl 48 Using the metadata from the protocolMetadataHint: Metadata(28bbe224-a5af-413d-ab4c-edee66f9d2b7,null,null,Format(parquet,Map()),{"type":"struct","fields":[{"name":"f0","type":"string","nullable":true,"metadata":{}},{"name":"f1","type":"integer","nullable":true,"metadata":{}},{"name":"f2","type":"boolean","nullable":true,"metadata":{}}]},List(),Map(),Some(1715269582326)) 2024-05-09 15:46:34.775 [] Sink: Global Committer (1/1)#0 INFO flink io.delta.standalone.internal.DeltaLogImpl 48 Updated snapshot to io.delta.standalone.internal.SnapshotImpl@24561802 2024-05-09 15:46:34.775 [] Sink: Global Committer (1/1)#0 INFO flink io.delta.standalone.internal.OptimisticTransactionImpl 48 [tableId=28bbe224,txnId=21cbc92d] Committed delta #11 to abfs://appmode@xx.dfs.core.windows.net/delta-session3/_delta_log 2024-05-09 15:46:34.775 [] Sink: Global Committer (1/1)#0 INFO flink flink.sink.internal.committer.DeltaGlobalCommitter 450 Successfully committed transaction (appId='2ee0131e-257c-4466-8523-3b3df0333e88', checkpointId='12') 2024-05-09 15:46:34.777 [] Sink: Global Committer (1/1)#0 INFO flink apache.flink.runtime.taskmanager.Task 1084 Sink: Global Committer (1/1)#0 (1d84e090e73071fc3a4eb1dc41bb9833_306d8342cb5b2ad8b53f1be57f65bee8_0_0) switched from RUNNING to FINISHED. 2024-05-09 15:46:34.777 [] Sink: Global Committer (1/1)#0 INFO flink apache.flink.runtime.taskmanager.Task 829 Freeing task resources for Sink: Global Committer (1/1)#0 (1d84e090e73071fc3a4eb1dc41bb9833_306d8342cb5b2ad8b53f1be57f65bee8_0_0). 2024-05-09 15:46:34.778 [] flink-akka.actor.default-dispatcher-15 INFO flink apache.flink.runtime.taskexecutor.TaskExecutor 1970 Un-registering task and sending final execution state FINISHED to JobManager for task Sink: Global Committer (1/1)#0 1d84e090e73071fc3a4eb1dc41bb9833_306d8342cb5b2ad8b53f1be57f65bee8_0_0. 2024-05-09 15:46:35.896 [] flink-akka.actor.default-dispatcher-15 INFO flink flink.runtime.taskexecutor.slot.TaskSlotTableImpl 439 Free slot TaskSlot(index:0, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1, taskHeapMemory=1.793gb (1925342788 bytes), taskOffHeapMemory=0 bytes, managedMemory=1.534gb (1647648462 bytes), networkMemory=392.830mb (411912115 bytes)}, allocationId: 837710e86c63b1ede1d4018b8e363911, jobId: d7c6078f8d1229e4dee6a953c570bdf5). 2024-05-09 15:46:35.898 [] flink-akka.actor.default-dispatcher-15 INFO flink apache.flink.runtime.taskexecutor.DefaultJobLeaderService 170 Remove job d7c6078f8d1229e4dee6a953c570bdf5 from job leader monitoring. 2024-05-09 15:46:35.898 [] flink-akka.actor.default-dispatcher-15 INFO flink apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService 106 Stopping DefaultLeaderRetrievalService. 2024-05-09 15:46:35.898 [] flink-akka.actor.default-dispatcher-15 INFO flink apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver 91 Stopping KubernetesLeaderRetrievalDriver{configMapName='flink-6b6e31dcb2094950ab531094e84fd1b5-cluster-config-map'}. 2024-05-09 15:46:35.898 [] KubernetesClient-Informer-thread-1 INFO flink flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer 178 Stopped to watch for 6b6e31dcb2094950ab531094e84fd1b5/flink-6b6e31dcb2094950ab531094e84fd1b5-cluster-config-map, watching id:8d643051-f6a5-4e2c-a913-024c655e5487 2024-05-09 15:46:35.899 [] flink-akka.actor.default-dispatcher-15 INFO flink apache.flink.runtime.taskexecutor.TaskExecutor 1746 Close JobManager connection for job d7c6078f8d1229e4dee6a953c570bdf5.