apache / pulsar-adapters

Apache Pulsar Adapters
https://pulsar.apache.org/
24 stars 31 forks source link

[fix] Pulsar spout fails to unsubscribe and close consumers on a shared subscription #56

Open rdhabalia opened 11 months ago

rdhabalia commented 11 months ago

Fixes: https://github.com/apache/pulsar/issues/21451

Motivation

Apache Pulsar provides a messaging queue using a Shared subscription to process unordered messages in parallel using multiple connected consumers. Shared subscription is also commonly used in data processing pipelines where they need to forcefully unsubscribe from the subscription after processing messages on the topic. One example is Pulsar-Storm adapter where Pulsar spout creates Pulsar consumers on a shared subscription for distributed processing and then unsubscribe on the topic. However, PulsarSpout always fails to unsubscribe shared subscriptions and it also doesn't close the pulsar consumers if there is more than one consumer connected to the subscription which causes a leaked subscription and consumer for that application.

Modifications

PR:21687 has introduced force-unsubscribe to support unsubscribe for shared subscription which can be used by Pulsar-Spout to unsubscribe on spout at the end of the cycle.

Verifying this change

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

Documentation