apache / paimon

Apache Paimon is a lake format that enables building a Realtime Lakehouse Architecture with Flink and Spark for both streaming and batch operations.
https://paimon.apache.org/
Apache License 2.0
2.43k stars 955 forks source link

[Feature] Allow to pack the UPDATE_BEFORE and UPDATE_AFTER message when streaming reading the audit table #4505

Closed Aitozi closed 1 week ago

Aitozi commented 1 week ago

Search before asking

Motivation

Flink breaks the ub and ua into two records, however, user may want to compare the ub and ua message in the udf to find the diff column. So, I proposal to add a mode in aduit_table to support consume the ub and ua message in one row.

We could make the original column type to an array type.

for example:

    public void testAuditTablePackUbUa() throws Exception {
        sql(
                "CREATE TABLE T (a INT, b INT, primary key (a) NOT ENFORCED) with ('changelog-producer' = 'lookup', 'bucket' = '2', 'scan.pack.update-before-after.enabled' = 'true')");
        BlockingIterator<Row, Row> iterator =
                streamSqlBlockIter(
                        "SELECT * FROM T$audit_log /*+ OPTIONS('scan.mode' = 'latest') */");
        sql("INSERT INTO T VALUES (1, 2)");
        sql("INSERT INTO T VALUES (1, 3)");
        sql("INSERT INTO T VALUES (2, 2)");
        List<Row> rows = iterator.collect(3);
        System.out.println(rows);
        iterator.close();
    }

This will produce

[+I[+I, [1, null], [2, null]], +I[+U, [1, 1], [2, 3]], +I[+I, [2, null], [2, null]]]

If the column has two index, then the first index is update_before value, and the second index is update_after value.

Solution

No response

Anything else?

No response

Are you willing to submit a PR?