getindata / dbt-flink-adapter

Adapter for dbt that executes dbt pipelines on Apache Flink
Apache License 2.0
83 stars 10 forks source link

Add support for primary key definitions in source #32

Open gliter opened 1 year ago

gliter commented 1 year ago

We should be able to define primary key for source table https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#primary-key It is for example needed for event time temporal joins https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#event-time-temporal-join

Example:

select *
> from clickstream
> left join trx for SYSTEM_TIME as of clickstream.event_timestamp
> on clickstream.user_id = trx.user_id
> ;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Temporal Table Join requires primary key in versioned table, but no primary key can be found. The physical plan is:
FlinkLogicalJoin(condition=[AND(=($1, $4), __INITIAL_TEMPORAL_JOIN_CONDITION($0, $3, __TEMPORAL_JOIN_LEFT_KEY($1), __TEMPORAL_JOIN_RIGHT_KEY($4)))], joinType=[left])
  FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, clickstream, watermark=[event_timestamp]]], fields=[event_timestamp, user_id, event])
  FlinkLogicalSnapshot(period=[$cor1.event_timestamp])
    FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, trx, watermark=[event_timestamp]]], fields=[event_timestamp, user_id, source, target, amount, deposit_balance_after_trx, credit_balance_after_trx])