apache / paimon

Apache Paimon is a lake format that enables building a Realtime Lakehouse Architecture with Flink and Spark for both streaming and batch operations.
https://paimon.apache.org/
Apache License 2.0
2.44k stars 959 forks source link

[cdc] Supports taking a column of a table from MYsql and processing it as a table partition field for paimon table #1038

Closed gitfortian closed 1 year ago

gitfortian commented 1 year ago

Search before asking

Motivation

/bin/flink run \ -c org.apache.paimon.flink.action.FlinkActions \ /path/to/paimon-flink-**-0.4-SNAPSHOT.jar \ mysql-sync-table \ --warehouse hdfs:///path/to/warehouse \ --database test_db \ --table test_table \ --partition-keys pt \ --primary-keys pt,uid \ --mysql-conf hostname=127.0.0.1 \ --mysql-conf username=root \ --mysql-conf password=123456 \ --mysql-conf database-name=source_db \ --mysql-conf table-name='source_table_.*' \ --catalog-conf metastore=hive \ --catalog-conf uri=thrift://hive-metastore:9083 \ --table-conf bucket=4 \ --table-conf changelog-producer=input \ --table-conf sink.parallelism=4 As shown above, only existing columns of mysql table can be selected as partitioning fields of paimon table. However, in production, time partitioning is generally adopted, and the time field is usually of datetime type, which needs to be processed into dt/hr partitioning field ### Solution _No response_ ### Anything else? _No response_ ### Are you willing to submit a PR? - [ ] I'm willing to submit a PR!
yuzelin commented 1 year ago

My idea is:

  1. We can add a new argument for specifying computed column, for example:
    --partition-keys dt,hr
    --computed-column dt=date(ts)
    --computed-column hr=hour(ts)

    ts is exsiting column, we can specify that dt is day of ts and hr is hour of ts.

  2. The newly computed column can be specified as partition key and will be auto created in Paimon table.
  3. Introduce two class to handle computed column: class ComputedColumn: store column's name, type, argument column names, and aColumnValueComputer to compute value from given argument.

interface ColumnValueComputer: has a method Object computeValue(Object... input) to compute value. We can implement several concrete computers.

  1. Pass ComputedColumns to EventParser. Every time extractRow is invoked, generate computed column values.
yuzelin commented 1 year ago

Solved by #1109. If we need more processing functions, we can create other issues.