streamnative / kop

Kafka-on-Pulsar - A protocol handler that brings native Kafka protocol to Apache Pulsar
https://streamnative.io/docs/kop
Apache License 2.0
451 stars 137 forks source link

[FEATURE] cursor replication with kop #1778

Open nareshv opened 1 year ago

nareshv commented 1 year ago

Is your feature request related to a problem? Please describe. One of the good things solved by pulsar is the subscription cursor replication for native pulsar applications. When kop is used there is same understanding, But in practice things are a bit different with kafka's consumer groups

Describe the solution you'd like kafka consumer group offset replication should work across clusters similar to the native subscription replication for consumers

Describe alternatives you've considered None

Additional context We have 2 clusters running kop with namespace level replication setup.

  1. start the kafka producer to cluster-scus on topic-1 for 10900 messages
  2. start the kafka consumer to cluster-scus on topic-1, stop at 10900 offset, lag 0
  3. stop the kafka consumer to cluster-scus
  4. start the kafka consumer to cluster-wus on topic-1, consumer stats at 500 offset, lag 10400
  5. stop the kafka consumer to cluster-wus
  6. start the kafka consumer to cluster-scus on topic-1, consumer resumes from 10900 offset, lag 0
Consumer Logs
-- on SCUS ---

Kafka version: 2.8.0 Cluster ID: cluster-scus
Setting offset for partition topic-1-0 to the committed offset FetchPosition{offset=10900, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[x.x.x.x:9092 (id: 1 rack: null)], epoch=absent}}

-- on WUS ---

Kafka version: 2.8.0 Cluster ID: cluster-wus
Setting offset for partition topic-1-0 to the committed offset FetchPosition{offset=500, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[y.y.y.y:9092 (id: 2 rack: null)], epoch=absent}}

--- on SCUS ---
Kafka version: 2.8.0 Cluster ID: cluster-scus
Setting offset for partition topic-1-0 to the committed offset FetchPosition{offset=10900, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[x.x.x.x:9092 (id: 683016547 rack: null)], epoch=absent}}
BewareMyPower commented 1 year ago

KoP does not use cursor to store the committed offsets. Like Kafka, KoP just stores the offsets in the offset topic, which is public/__kafka/__consumer_offsets by default. If this topic was replicated successfully, the offsets should also be loaded successfully.

I think the reason is that KoP reads the committed offset from the memory cache directly. KoP only loads the cache from the offset topic when starting.

nareshv commented 1 year ago

are there any configs to setup for the consumer offsets to be replicated or the default pulsar things will work ? How to know the cache of KoP and flush if needed for a topic/consumer group ?

BewareMyPower commented 1 year ago

TL; DR, current KoP does not work well with geo-replication, we might need to improve it later.

are there any configs to setup for the consumer offsets to be replicated or the default pulsar things will work ?

You only needs to replicate the public/__kafka/__consumer_offsets topic.

How to know the cache of KoP and flush if needed for a topic/consumer group ?

All Kafka admin operations just query the cache of KoP.

To understand this issue, you might need to know some implementation details:

  1. KoP loads the metadata from the offset topic to the memory cache only when: 1.1 The first time when KoP handles a request that queries group or offset metadata (e.g. the consumer subscribed) 1.2 Namespace bundle ownership changes (e.g. a broker is down)
  2. When receiving some requests (e.g. offset commit) from Kafka clients, KoP updates the memory cache and persist the metadata into the offset topic.

Assuming you replicated the offset topic from cluster-A to cluster-B, if a Kafka consumer connected to KoP when the offset topic is not fully replicated, KoP might not load the latest metadata, e.g.

Then the consumer to cluster-B will start consuming from offset 201, not 1001.

What's worse is, if you triggered the metadata loading, the metadata won't update. e.g.

  1. Consumer in group-A connected to KoP
  2. KoP loaded metadata from the offset topic and maintain the (group-A, 201) pair in memory.
  3. Consumer did nothing and exited.
  4. Wait for a while, the __consumer_offsets in cluster-B was fully replicated.
  5. Consumer in group-A connected to KoP again.
  6. KoP would still return offset 201 to the consumer, while the metadata in the __consumer_offsets is 1001.