apache / paimon

Apache Paimon is a lake format that enables building a Realtime Lakehouse Architecture with Flink and Spark for both streaming and batch operations.
https://paimon.apache.org/
Apache License 2.0
2.25k stars 897 forks source link

[Feature] 1-1 mapping between paimon buckets and kafka partitions #3249

Open polyzos opened 4 months ago

polyzos commented 4 months ago

Search before asking

Motivation

When creating an Append for Message Queue table, as depicted in the screenshot here: Screenshot 2024-04-23 at 8 33 05 AM

we can notice the following:

  1. 5 buckets are specified, but unless data comes in the bucket is not created
  2. If you check the Kafka partitions; partition 3 has keys 2, 3 and 4
  3. These keys though end up in different buckets
  4. Paimon does a shuffle, even though the parallelism is the same because it doesn't do 1-1 mapping

Because it is a Kafka-like message queue functionality, some users are confused, as they expect the same partitioning to happen and overall have a 1-1 mapping, between a Kafka partition and a paimon bucket.

At the same time, I believe this is a really good enhancement and should also allow to remove the shuffle between the operators, thus improving performance.

Solution

No response

Anything else?

No response

Are you willing to submit a PR?

eric666666 commented 3 months ago

If you want 1->1 mapping,Paimon's bucket number should bigger than kafka partitions, and their should be shuffle by kafka partition id. I think paimon already can implement your thoughts. Here is a demo, you can define ddl like this Kafka source table:

CREATE TABLE KafkaTable (
  `event_time` TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,  -- from Debezium format
  `origin_table` STRING METADATA FROM 'value.source.table' VIRTUAL, -- from Debezium format
  `partition_id`  int METADATA FROM 'partition' VIRTUAL,  -- from Kafka connector
  `offset` BIGINT METADATA VIRTUAL,  -- from Kafka connector
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'value.format' = 'debezium-json'
);

Paimon sink table:

CREATE table if not exists sink_paimon_table
        WITH ('connector' = 'paimon',
        'bucket' = '3',  -- bucket number should bigger than kafka partitions 
        'bucket-key' = 'partition_id',     -- bucket key must be kafka partition_id
        'merge-engine' = 'deduplicate',
        'primary-key' = 'partition_id,offset' -- parimary key must be partition_id,offset
         )
        LIKE KafkaTable (EXCLUDING ALL)

So kafka source table‘s data insert into paimon table will shuffle by kafka partition_id,partion_id is a int data type which hashcode equal itself, This pipeline model will let kafka partition record 1->1 to paimon bucket.

polyzos commented 3 months ago

@eric666666 Thanks a lot for this. The problem here though is that I'm trying to use an Append for Message Queue table, so I can offload logs from Kafka, but your example suggests using a Primary Key table.

I also try using your example, but when creating the sink_paimon_table (paimon 0.7.0-incubating), Im getting

[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalStateException: Table column [id, a, b, dt] should include all primary key constraint [partition_id, offset]

Let me know if I'm missing something.