risingwavelabs / risingwave

Best-in-class stream processing, analytics, and management. Perform continuous analytics, or build event-driven applications, real-time ETL pipelines, and feature stores in minutes. Unified streaming and batch. PostgreSQL compatible.
https://go.risingwave.com/slack
Apache License 2.0
6.79k stars 561 forks source link

delete Kafka consumer group when job is dropped #18416

Open xxchan opened 1 week ago

xxchan commented 1 week ago

A user complaint that RisingWave created too many consumer groups and exceeded the limit in Kafka. They have to manually clean dead groups.

Although we only have 1 consumer group for 1 MV now (A long time ago, it was consumer-{current_time} , then it was rw-{fragment_id}-{actor_id}, now {prefix:rw}-{fragment_id}), it may still increase quickly: They have ~50 sources, but multiple MVs on the same source. So when they recreate MVs, the number of consumer groups will increase a lot.

There are several solutions

  1. (This is what I suggested them to do now.) Use TABLE instead of SOURCE. So recreating MVs will only have MV backfill, and won't consume from Kafka. Drawback: storage.
  2. Allow specifying custom group id. (Now we supported custom group id prefix, and will still generate a suffix.) So when they drop and recreate a job, they can reuse the same group.

    Flink supports this. But this may be error-prone. If we have multiple MVs on the same source, different MVs' Kafka consumers will report offset to the same consumer group, and will make it a mess. (Note: this only affect offset monitoring, but not correctness.) Since multiple MVs on the same source is a natural use case in RisingWave, I don't suggest to do it. (If really necessary, we might add a config like unsafe_exact_group_id=xxx)

  3. Delete consumer group on drop. This is reasonable, and perhaps is good for most users. Some possible problems:

    Users may want to retain the consumer group after drop? I guess this is unlikely, since the groups are dedicated to RisingWave.

    More importantly, deleting consumer group requires higher authorization permission. Note that currently the creation is automatically done by Kafka consumers implicitly, instead of manual creating via admin API.

    We might introduce a config option delete_on_drop, and make sure delete failure won't block DROP.

Note: Shared source (#16003) may look like 1 (i.e., materialized source). But it will do source backfill, which will also create consumer groups.

pjpringlenom commented 1 week ago

Same story for pulsar. But with Pulsar inactive consumers will eventually consume the backlog quota preventing new data being written.