GreptimeTeam / greptimedb

An open-source, cloud-native, distributed time-series database with PromQL/SQL/Python supported. Available on GreptimeCloud.
https://greptime.com/
Apache License 2.0
3.97k stars 285 forks source link

Kafka connector integration #3398

Open killme2008 opened 4 months ago

killme2008 commented 4 months ago

What type of enhancement is this?

API improvement, User experience, Other

What does the enhancement do?

Kafka is a popular open-source messaging queue used by many applications for message and data transmission in big-data environments. It is supported as a data source ingestion by numerous databases and data warehouses.

For example, clickhouse supports Kafka engine https://clickhouse.com/docs/en/engines/table-engines/integrations/kafka

 CREATE TABLE queue (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');

Databend has an ingestor project https://github.com/databendcloud/bend-ingest-kafka

bend-ingest-kafka
  --kafka-bootstrap-servers="127.0.0.1:9092,127.0.0.2:9092"\
  --kafka-topic="Your Topic"\
  --kafka-consumer-group= "Consumer Group"\
  --databend-dsn="http://root:root@127.0.0.1:8000"\
  --databend-table="db1.tbl" \
  --data-format=”json“ \
  --batch-size=100000 \
  --batch-max-interval=300s

Apache Doris supports creating a routine load to import data from kafka https://doris.apache.org/docs/dev/data-operate/import/import-scenes/kafka-load/

CREATE ROUTINE LOAD demo.my_first_routine_load_job ON test_1
COLUMNS TERMINATED BY ",",
PROPERTIES
(
    "max_batch_interval" = "20",
    "max_batch_rows" = "300000",
    "max_batch_size" = "209715200",
)
FROM KAFKA
(
   "kafka_broker_list"= "broker1:9091,broker2:9091",
   "kafka_topic" = "my_topic",
   "property.security.protocol" = "ssl",
   "property.ssl.ca.location" = "FILE:ca.pem",
   "property.ssl.certificate.location" = "FILE:client.pem",
   "property.ssl.key.location" = "FILE:client.key",
   "property.ssl.key.password" = "abcdefg"
);

As a time series database (TSDB), I believe that greptimedb should also have support for a Kafka connector. This support can be implemented as an independent project or as a table extension similar to ClickHouse.

I am uncertain which approach is preferable, but personally, I lean towards the latter. I have opened this issue to initiate a discussion on the matter.

Implementation challenges

No response

tisonkun commented 4 months ago

as a table extension similar to ClickHouse

Currently, the CREATE TABLE clause in GreptimeDB doesn't support switching Engine? From the docs we can only define the columns with a "default engine"/

sunng87 commented 4 months ago

I'm +1 for a standalone ingester for better scalability, especially in share environment.