risingwavelabs / risingwave

Best-in-class stream processing, analytics, and management. Perform continuous analytics, or build event-driven applications, real-time ETL pipelines, and feature stores in minutes. Unified streaming and batch. PostgreSQL compatible.
https://go.risingwave.com/slack
Apache License 2.0
6.86k stars 569 forks source link

feat: support webhook sink #18606

Open lmatz opened 1 week ago

lmatz commented 1 week ago

Is your feature request related to a problem? Please describe.

Webhook Sink let users send data to a downstream endpoint via standard HTTP post request.

These downstream in practice are mostly user-facing applications, e.g. Slack, Email, and all kinds of CRM tools, etc. Popular ones can be found here: https://www.getcensus.com/integrations?search=&integration-type=Destination&category= Some of the detinations also appear in the sink connectors RW supports.

Take an example, once we enable the webhook of Slack: https://api.slack.com/messaging/webhooks we hope that the user can do in RW:

CREATE TABLE slack (
    user_id varchar,
    notification varchar
) WITH (
    connector = 'webhook',
    endpoint = 'https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX'
);
  1. format: in theory it can be any. json is the most popular one, but nothing prevents it from using xml. We can either put it into With as format = 'json', or reuse the format plain encode json. xml support looks unnecessary. Reusing format plain encode json is a bit abusive but it seems there is no conflict with any other semantics regarding webhook sink.

  2. request type: in theory it can use request types other than post, e.g. put delete. But, in practice it seems only necessary to use post, i.e. append-only. We can discard all the deletes and updatedeletes if the upstream of the sink is non append-only.

  3. header: such as 'content-type: application/json', Authorization: Basic dXNlcjEyMzpwYXNzMTIz where the last part is the base64-encoded username:password string. (This is just one most common authentication method, aka basic authentication, there are many others that we can support on demand in the future). Users can put anything into it. I suggest we leave a field for user to fill in.

We expect users to define as follows.

CREATE TABLE slack (
    user_id varchar,
    notification varchar
) WITH (
    connector = 'webhook',
    endpoint = 'https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX',
    header = '......',
) format plain encode json;

user_id and notification will be translated to two fields in HTTP json payload.

Describe the solution you'd like

No response

Describe alternatives you've considered

No response

Additional context

Quite many popular modern stream processing systems support Webhook sink, either natively or via plugin.

tabVersion commented 1 week ago

header: such as 'content-type: application/json', Authorization: Basic dXNlcjEyMzpwYXNzMTIz where the last part is the base64-encoded username:password string. (This is just one most common authentication method, aka basic authentication, there are many others that we can support on demand in the future). Users can put anything into it. I suggest we leave a field for user to fill in.

Actually header is a hashmap with type string -> string, shall we enforce the users to do it with a jsonb column or a map column?

Similar to this request https://github.com/risingwavelabs/risingwave/issues/17850, @xiangjinwu and I plan to enforce such a column as message header.

lmatz commented 1 week ago

shall we enforce the users to do it with a jsonb column or a map column?

It enables dynamically setting the header, which is more powerful. Not a bad idea.

I think we can provide both options, static definitions in the with clause for common cases, and a dynamic one for advanced/premium usage.