risingwavelabs / risingwave

Best-in-class stream processing, analytics, and management. Perform continuous analytics, or build event-driven applications, real-time ETL pipelines, and feature stores in minutes. Unified streaming and batch. PostgreSQL compatible.
https://go.risingwave.com/slack
Apache License 2.0
7.07k stars 581 forks source link

Tracking: Ad-hoc(batch) ingestion #18583

Open st1page opened 2 months ago

st1page commented 2 months ago

We will enhance the ad-hoc ingestion capability in subsequent releases, with the expectation that it will eventually be possible for users to read ad-hoc data if it is persisted on an external system.

Streaming storage

for the streaming storage, the predicate pushdown with the "offset" is required

lake

file source(object store)

Database

Currently we only support Create table with primary key on the CDC connector. To support it, we need design and introduce new syntax that CREATE source with CDC connector. In that case, the source can only be ad-hoc queried.

Misc

kwannoel commented 2 months ago

Hi, I will help with this issue, starting with TVFs.

xxchan commented 1 month ago

Have we reached consensus to support TVFs? To me, their use cases are duplicated with Sources, so they seem to be unnecessary.

I’d like to see rationales and examples where they are more useful than sources before adding them

st1page commented 1 month ago

Have we reached consensus to support TVFs? To me, their use cases are duplicated with Sources, so they seem to be unnecessary.

I’d like to see rationales and examples where they are more useful than sources before adding them

xxchan commented 1 month ago

Thanks for the explanation!

Currently we only support the CDC table and can not create a source on a external databases's table.

Makes me think whether also related with other shared source e.g., Kafka?

We can refer to the grammer of duckDB for the cases

Compared with duckDB

st1page commented 1 month ago

Currently we only support the CDC table and can not create a source on a external databases's table.

Makes me think whether also related with other shared source e.g., Kafka?

The issue is not related to "shared" but it is beacuse the CDC source contains multiple tables' changes. Actually that is a "CONNECTION"

st1page commented 1 month ago

Compared with duckDB

  • They don't have source at all. So it might be a little different
  • Their syntax contains a ATTACH, which looks like CREATE CONNECTION we might have in the future. So maybe we should design that first.
ATTACH 'dbname=postgresscanner' AS postgres_db (TYPE POSTGRES);
SELECT * FROM postgres_query('postgres_db', 'SELECT * FROM cars LIMIT 3');

Agree with that. cc @chenzl25. do we have plan to simplify the syntax of the TVF with connection?

chenzl25 commented 1 month ago

After the connection is supported, in my mind connection can be used in TVF directly like:

Connections contain the necessary information to allow TVF to query the external system. I think @tabVersion will support Connection in this Q.

tabVersion commented 1 month ago

After the connection is supported, in my mind connection can be used in TVF directly like:

  • read_parquet(s3_connection, 's3://bucket/path/xxxx.parquet')
  • read_csv(s3_connection, 's3://bucket/path/xxxx.parquet')
  • read_json(s3_connection, 's3://bucket/path/xxxx.parquet')
  • iceberg_scan(iceberg_connection, 'database_name.table_name')
  • postgres_query(pg_connection, 'select * from t')
  • mysql_quert(my_connection, 'select * from t')

Connections contain the necessary information to allow TVF to query the external system. I think @tabVersion will support Connection in this Q.

Yes for s3_connection and iceberg_connection. I still have some concerns about -cdc connectors. The CDC source is a CONNECTION in concept, it contains nearly the same info as CONNECTION.

CREATE SOURCE pg_mydb WITH (
    connector = 'postgres-cdc',
    hostname = '127.0.0.1',
    port = '8306',
    username = 'root',
    password = '123456',
    database.name = 'mydb',
    slot.name = 'mydb_slot',
    debezium.schema.history.internal.skip.unparseable.ddl = 'true'
);
tabVersion commented 1 month ago

@st1page Additionally, One case is for the Ad-hoc ingestion from databases. Currently we only support the CDC table and can not create a source on a external databases's table. So only TVF is clear defined method to do ad hoc ingest from Databases. We can refer to the grammer of duckDB for the cases duckdb.org/docs/extensions/postgres.html#the-postgres_query-table-function duckdb.org/docs/extensions/mysql#the-mysql_query-table-function

The main idea for CONNECTION is minimizing the user's effort when creating new sources/tables. It stores some props and applies to all sources/sinks/tables created from the CONNECTION.

Things get a little different here because MQs have relatively more loose ACL control than file systems, eg. S3. So I'd propose we must define BUCKET in fs CONNECTIONs.

In my prospective, we can draw a line here.

wcy-fdu commented 1 month ago

So I'd propose we must define BUCKET in fs CONNECTIONs.

+1 for this, we need bucket name to validate RisingWave can read from specific bucket or data directory. Here the bucket in fs_connection is like db_name in db_connection.