rabbitmq / rabbitmq-stream-java-client

RabbitMQ Stream Java Client
https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/
Other
57 stars 15 forks source link

Can we add a stream information to SubscriptionListener.SubscriptionContext? #540

Closed laststem closed 6 months ago

laststem commented 6 months ago

Is your feature request related to a problem? Please describe.

It will be nice if SubscriptionListener have stream information(stream name, consumer name... etc) in addition to offset

Describe the solution you'd like

Add a information to SubscriptionListener.SubscriptionContext and it must be readonly

Describe alternatives you've considered

No response

Additional context

No response

acogoluegnes commented 6 months ago

What would be the use case?

laststem commented 6 months ago

Like a ConsumerUpdateListener.Context, can provide more information to the library user for improving usability.

for example, In the subscription implementation(MyStreamSubscriptionListener), it is not possible to distinguish which stream (my-stream ? or my-stream2 ?) is being controlled.

myStreamSubscriptionListener = new MyStreamSubscriptionListener();

environment.consumerBuilder()
    .stream("my-stream")
    .subscriptionListener(myStreamSubscriptionListener)
environment.consumerBuilder()
    .stream("my-stream2")
    .subscriptionListener(myStreamSubscriptionListener)

if there are more information in SubscriptionListener.SubscriptionContext, MyStreamSubscriptionListener can distinguish stream like this.

class MySubscriptionListener : SubscriptionListener {

    private val externalOffsetSource = mapOf<String, Long>()

    override fun preSubscribe(subscriptionContext: SubscriptionListener.SubscriptionContext) {
        val externalOffset = externalOffsetSource[subscriptionContext.stream()]!!
        subscriptionContext.offsetSpecification(OffsetSpecification.offset(externalOffset))
    }
}

And i have multiple stream consumer in single application. but i can't distinguish which stream offset is for from below log (ConsumersCoordinator.add). only printed offset in the log

              LOGGER.info(
                  "Requested offset specification {} not used in favor of stored offset found for reference {}",
                  offsetSpecification,
                  offsetTrackingReference);

Yes. i can attach lambda subscriptionListener on each consumerBuilder() to distinguish stream. and i can print log offset with stream in lambda on each consumerBuilder().

But like a ConsumerUpdateListener.Context, it will provide more information to the library user for improving usability. thank you

acogoluegnes commented 6 months ago

I added the stream name to the context, you can try with snapshots.