vectordotdev / vector

A high-performance observability data pipeline.
https://vector.dev
Mozilla Public License 2.0
17.52k stars 1.54k forks source link

feat: sink from Vector to Risingwave #21308

Open lmatz opened 6 days ago

lmatz commented 6 days ago

A note for the community

Use Cases

The user can use vector.dev as the source of RW to easily re-ingest/replay events into Risingwave on demand.

Risingwave is a streaming database that tries to be compatible with PostgreSQL as much as possible.

We have a working example in RW's repo: https://github.com/risingwavelabs/risingwave/tree/main/integration_tests/vector

Attempted Solutions

No response

Proposal

RW has forked Vector in https://github.com/risingwavelabs/vector.

The way the sink inserts data into RW is done via the "insert into" statement: https://github.com/risingwavelabs/vector/blob/f9c186f01b1a84ac402b6657e48d83e7af01b0c4/src/sinks/risingwave/service.rs#L133

Then it issues a flush command, which is a special command in RW, to ask RW to commit the data just inserted. https://github.com/risingwavelabs/vector/blob/f9c186f01b1a84ac402b6657e48d83e7af01b0c4/src/sinks/risingwave/service.rs#L145 After flush successfully returns, in case of a failure of RW, the data will still be stored in RW after recovery.

We have verified this in RW's customers' production environment.

References

No response

Version

No response

jorgehermo9 commented 6 days ago

If risingwave aims to be postgres compatible, can’t this be done with the postgres sink?

I’ve opened a PR for that https://github.com/vectordotdev/vector/pull/21248

Also, it seems that your sink encodes the payload as Bytes, it does not support structured data in arbitrary table schemas, right? With the use of jsonb_populate_record (https://docs.risingwave.com/docs/current/sql-function-json/#jsonb_populate_record) as I did in the postgres sink, will allow to support structured events easier.

Does it have sense to have a completely separate sink if risingwave seems postgres-compatible @jszwedko ?

lmatz commented 6 days ago

Thank you @jorgehermo9 for the pointer! I agree that almost all the logic in #21248 can be and should be reused for RW.

By using the approach in PR #21248, in RW we can do the following:

dev=> CREATE TABLE IF NOT EXISTS t (host text, message text, payload jsonb);
CREATE_TABLE

dev=> insert into t SELECT * FROM jsonb_populate_record(
dev(> null::struct<host varchar, message varchar, payload jsonb>,
dev(> '{"host": "abc", "message": "bcd", "payload": {"d": 4, "e": "zzz"}}');
INSERT 0 1

dev=> select * from t;
 host | message |       payload        
------+---------+----------------------
 abc  | bcd     | {"d": 4, "e": "zzz"}
(1 row)

Unfortunately,

jsonb_populate_record(null::{table}, ...)

is not supported in RW yet. But we can also support it in RW if this is required by Vector's PostgreSQL sink.

Another difference between RW and Postgres is that RW needs a flush to make data durable while we just wait for the insert statement to succeed in PG. Without flush, I wonder if we may lose some data in case of the failure, and breaking the At-least-once delivery semantics, and achieve best-effort only.

Instead of issuing an explicit flush command each time when we insert, we can also set the session variable https://docs.risingwave.com/docs/next/sql-set-rw-implicit-flush/ to make it automatic. Maybe we can add this before inserting along a special code path for RW? (Default is false, in this case we want this session variable to be true)

lmatz commented 2 days ago

https://github.com/risingwavelabs/risingwave/issues/18601

We will support jsonb_populate_record PG's syntax properly