getindata / dbt-flink-adapter

Adapter for dbt that executes dbt pipelines on Apache Flink
Apache License 2.0
83 stars 10 forks source link

Add watermark support #27

Closed gliter closed 1 year ago

gliter commented 1 year ago

Watermarks are defined in DDL as part of column list https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/time_attributes/

CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  user_action_time TIMESTAMP(3),
  -- declare user_action_time as event time attribute and use 5 seconds delayed watermark strategy
  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
  ...
);

Initial idea could be to provide watermark config in source like:

version: 2
sources:
  - name: kafka
    tables:
      - name: clickstream
        config:
          connector_properties:
            connector: 'kafka'
            'properties.bootstrap.servers': 'kafka:29092'
            'topic': 'clickstream'
            'scan.startup.mode': 'earliest-offset'
            'value.format': 'json'
            'properties.group.id': 'dbt'
            'value.json.encode.decimal-as-plain-number': 'true'
        columns:
          - name: event_timestamp
            data_type: TIMESTAMP(3)
          - name: user_id
            data_type: DECIMAL
          - name: balance
            data_type: DECIMAL
          - name: loan_balance
            data_type: DECIMAL
          - name: event
            data_type: STRING
        watermark:
          column: event_timestamp
# strategies defined https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#watermark
          strategy: event_timestamp - INTERVAL '5' SECOND

Currently we have no mechanism to provide something similar.