risingwavelabs / dbt-risingwave

Apache License 2.0
21 stars 6 forks source link

feat: sink feature2 #41

Closed MattiasMTS closed 8 months ago

MattiasMTS commented 8 months ago

Opening this to showcase how we can achieve the same thing as #38 without having to write the python code.


EDIT

Allow sinks to be configured via a more dbt'onic way. Below are some examples for AWS kinesis sink:

Example 1: declare config in model

{{
    config(
        materialized="sink",
        connector="kinesis",
        data_format="PLAIN",
        data_encode="JSON",
        connector_parameters={
            "primary_key": "pk,sk",
            "stream": var("kinesis_sink")[target.profile_name]["stream"],
            "aws.region": var("kinesis_sink")[target.profile_name]["region"],
            "aws.credentials.role.arn": var("kinesis_sink")[target.profile_name]["role_arn"],
            "aws.credentials.access_key_id": env_var('AWS_ACCESS_KEY_ID', ''),
            "aws.credentials.secret_access_key": env_var("AWS_SECRET_ACCESS_KEY", ''),

        },
        format_parameters={
            "force_append_only": true,
        },
    )
}}

-- do some wrangling here:
SELECT * FROM {{ ref("model1") }}

Example 2: declare config in schema yaml

  - name: sink_model1
    description: >
      Description for sink_model1.
    config: &sink_config
      enabled: "{{ var('sinks_enabled', false) }}"
      connector: "kinesis"
      data_format: "PLAIN"
      data_encode: "JSON"
      connector_parameters:
        "primary_key": "pk,sk"
        "aws.region": "{{ var('kinesis_sink')[target.profile_name]['region'] }}"
        "stream": "{{ var('kinesis_sink')[target.profile_name]['stream_name'] }}"
        "aws.credentials.role.arn": "{{ var('kinesis_sink')[target.profile_name]['role_arn'] }}"
        "aws.credentials.access_key_id": "{{ env_var('AWS_ACCESS_KEY_ID', '') }}"
        "aws.credentials.secret_access_key": "{{ env_var('AWS_SECRET_ACCESS_KEY', '') }}"
      format_parameters:
        force_append_only: true

Example 3: The option to declare the sql model as raw sql still exists. Just leave out the connector parameter in the config, as such:

{{ config(materialized="sink") }}

CREATE SINK s1 FROM t WITH (
 connector = 'kinesis',
 stream = 'kinesis-sink-demo',
 aws.region = 'us-east-1',
 aws.credentials.access_key_id = 'your_access_key',
 aws.credentials.secret_access_key = 'your_secret_key'
)
FORMAT DEBEZIUM ENCODE JSON;
MattiasMTS commented 8 months ago

LGTM! @MattiasMTS Could you please add a new example of how to configure the sink in this PR description? It will be added to the release note later as well.

Done!