spring-projects / spring-kafka

Provides Familiar Spring Abstractions for Apache Kafka
https://projects.spring.io/spring-kafka
Apache License 2.0
2.14k stars 1.54k forks source link

consumer group specific offset seeking for AbstractConsumerSeekAware #2302

Closed akemalsaglam closed 1 month ago

akemalsaglam commented 2 years ago

Expected Behavior

We want to be able to seek offset for specific consumer group by using AbstractConsumerSeekAware.

Current Behavior

regarding to below implementation it is clear that we can seek offset for all assigned partitions in a topic regardless of different consumer group ids.

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        super.onPartitionsAssigned(assignments, callback);
    }

    @Override
    public void seekToTimestamp(long time) {
        getSeekCallbacks().forEach((tp, callback) -> {
            callback.seekToTimestamp(tp.topic(), tp.partition(), time);
        });
    }

Context For our use case there might be more than one group instance which is assigned same partition in a topic. Below example might be useful to describe our case:

  1. we have one topic let's name it: product.feed.fullexport, it has 12 partitions.
  2. 10 different micro-services with different group ids are listening same topic and each has 12 concurrent consumers.
  3. when we want to seek offset by using above ConsumerSeekCallback implementation for one of the micro-service, it affects all assigned partitions and listening consumer instances regardless to expected group id.

Is there any way to seek offset in a partition but only for specific group id?

artembilan commented 2 years ago

I don't see a behavior you are describing:

@SpringBootApplication
public class KafkaGh2302Application {

    public static void main(String[] args) {
        SpringApplication.run(KafkaGh2302Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("seekExample", 3, (short) 1);
    }

    @Component
    public static class Listener extends AbstractConsumerSeekAware {

        @KafkaListener(id = "seekExample", topics = "seekExample", concurrency = "3")
        public void listen(String payload) {
            System.out.println("Listener received: " + payload);
        }

        public void seekToStart() {
            getSeekCallbacks().forEach((tp, callback) -> callback.seekToBeginning(tp.topic(), tp.partition()));
        }

    }

    @Component
    public static class Listener2 extends AbstractConsumerSeekAware {

        @KafkaListener(id = "seekExample2", topics = "seekExample", concurrency = "3")
        public void listen(String payload) {
            System.out.println("Listener2 received: " + payload);
        }

        public void seekToStart() {
            getSeekCallbacks().forEach((tp, callback) -> callback.seekToBeginning(tp.topic(), tp.partition()));
        }

    }

}

As you see I have two @KafkaListener classes with different ids which is, essentially, pointing to different consumer groups.

In the unit test I do:

@SpringBootTest
@EmbeddedKafka(bootstrapServersProperty = "spring.kafka.bootstrap-servers")
@DirtiesContext
class KafkaGh2302ApplicationTests {

    @Autowired
    KafkaGh2302Application.Listener listener;

    @Autowired
    KafkaTemplate<String, String> template;

    @Test
    void contextLoads() throws InterruptedException {

        for (int i = 0; i < 10; i++) {
            this.template.send("seekExample", i % 3, "some_key", "test#" + i);
        }

        Thread.sleep(1000);

        this.listener.seekToStart();

        Thread.sleep(10000);
    }

}

So, after sending some data, I just call seekToStart() on one of the listener services. The output is like this:

Listener2 received: test#2
Listener received: test#1
Listener2 received: test#0
Listener2 received: test#5
Listener received: test#2
Listener received: test#4
Listener received: test#5
Listener received: test#7
Listener2 received: test#1
Listener2 received: test#8
Listener received: test#0
Listener2 received: test#3
Listener2 received: test#4
Listener received: test#8
Listener2 received: test#6
Listener received: test#3
Listener2 received: test#9
Listener2 received: test#7
Listener received: test#6
Listener received: test#9
Listener received: test#0
Listener received: test#3
Listener received: test#6
Listener received: test#9
Listener received: test#2
Listener received: test#5
Listener received: test#1
Listener received: test#4
Listener received: test#7
Listener received: test#8

This confirms that seeking really happens only in one consumer group and it does not effect other groups on the same topic.

garyrussell commented 2 years ago

Correct; seeks only affect the current group.

bky373 commented 3 months ago

@artembilan @garyrussell Hello, I've found that the current behavior can occur when using multiple listeners with different group IDs in the same class.

Current Behavior - described above

it is clear that we can seek offset for all assigned partitions in a topic regardless of different consumer group ids.

Test

I have modified the code provided above slightly.

...

- **Result**
  -  Since callbacks are registered per thread in a single listener class that implements `AbstractConsumerSeekAware`, seeking offsets is performed regardless of the consumer group ID. (In this case, `Listener(id: seekExample)` and `Listener 3(id: seekExample3)`)
```java
...
Listener received: test#0
Listener3 received: test#3
Listener2 received: test#16
Listener2 received: test#19
Listener2 received: test#22
Listener received: test#29
Listener2 received: test#25
Listener2 received: test#28
Listener received: test#28
Listener received: test#21
Listener received: test#24
Listener received: test#27
Listener2 received: test#23
Listener2 received: test#26
Listener3 received: test#16
========
Listener received: test#0
Listener received: test#3
Listener received: test#24
Listener received: test#27
Listener received: test#2
Listener3 received: test#1
Listener received: test#5
Listener3 received: test#4
Listener received: test#8
Listener3 received: test#7
Listener received: test#11
Listener3 received: test#10
...

IMHO, If we want to seek offset for a specific consumer group only, we can use the following methods:

If you have any other solutions for seeking offsets based on a specific consumer group ID, please let me know. I would appreciate hearing them. Thank you!

garyrussell commented 3 months ago

I am no longer involved with the project, but what you are suggesting is incorrect.

Each listener method is invoked by a different listener container and, therefore, on different threads.

So, if there is a problem, it is not related to any thread-based state.

sobychacko commented 3 months ago

@bky373 Thanks for reporting. As @garyrussell pointed out, this looks like it is non-thread-state related, but it looks like some bug (Thanks, Gary, for chiming in!! :) ). We will look at this today. Do you have any sample application for us to reproduce? (that would be easier). Otherwise, we can look into creating one since you provided some snippets.

bky373 commented 3 months ago

@garyrussell

I am no longer involved with the project

Oh I didn't know that! Thanks for your comment!! 🙇

Each listener method is invoked by a different listener container and, therefore, on different threads.

You are totally right. Threads are different. I didn't mean to say that it's an issue with thread state. If my suggestion seemed like it was due to a thread state issue, I'm afraid I wrote it wrong.

I just wanted to report that in a class that have listeners with different consumer group IDs and implements AbstractConsumerSeekAware, it's difficult to find the offset by specifying the consumer group ID. ~(This might not be the problem).~

bky373 commented 3 months ago

@bky373 Thanks for reporting. As @garyrussell pointed out, this looks like it is non-thread-state related, but it looks like some bug (Thanks, Gary, for chiming in!! :) ). We will look at this today. Do you have any sample application for us to reproduce? (that would be easier). Otherwise, we can look into creating one since you provided some snippets.

@sobychacko

Sure! the code is so simple so I'll leave it in the comments here. Thank you for your time and reply!!

@SpringBootApplication
public class KafkaGh2302Application {

    public static void main(String[] args) {
        SpringApplication.run(KafkaGh2302Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("seekExample", 3, (short) 1);
    }

    @Component
    public static class Listener extends AbstractConsumerSeekAware {

        @KafkaListener(id = "seekExample", topics = "seekExample", concurrency = "3")
        public void listen(String payload) {
            System.out.println("Listener received: " + payload);
        }

        @KafkaListener(id = "seekExample3", topics = "seekExample", concurrency = "3")
        public void listen3(String payload) {
            System.out.println("Listener3 received: " + payload);
        }

        public void seekToStart() {
            getSeekCallbacks().forEach((tp, callback) -> callback.seekToBeginning(tp.topic(), tp.partition()));
        }
    }

    @Component
    public static class Listener2 extends AbstractConsumerSeekAware {

        @KafkaListener(id = "seekExample2", topics = "seekExample", concurrency = "3")
        public void listen(String payload) {
            System.out.println("Listener2 received: " + payload);
        }

        public void seekToStart() {
            getSeekCallbacks().forEach((tp, callback) -> callback.seekToBeginning(tp.topic(), tp.partition()));
        }
    }
}
@SpringBootTest
@EmbeddedKafka(bootstrapServersProperty = "spring.kafka.bootstrap-servers")
@DirtiesContext
class KafkaGh2302ApplicationTest {

    @Autowired
    KafkaGh2302Application.Listener listener;

    @Autowired
    KafkaTemplate<String, String> template;

    @Test
    void contextLoads() throws InterruptedException {

        for (int i = 0; i < 50; i++) {
            this.template.send("seekExample", i % 3, "some_key", "test#" + i);
        }

        Thread.sleep(1000);
        System.out.println("====================================");
        this.listener.seekToStart();
        Thread.sleep(10000);
    }
}
sobychacko commented 3 months ago

I think the best course of action is to have a single consumer (KafkaListener) per class that extends AbstractConsumerSeekAware. If there are multiple listeners, the callbacks are applied against all the listeners in that particular class. We will see if we can come up with a solution, such as making an API level change to better accommodate group id's.

garyrussell commented 3 months ago

@sobychacko FYI, the thread's associated group is available in KafkaUtils, if that helps.

https://github.com/spring-projects/spring-kafka/blob/4a5a8495fcb4169071822fa6699cc15aaf477fcd/spring-kafka/src/main/java/org/springframework/kafka/support/KafkaUtils.java#L110-L117

bky373 commented 3 months ago

@sobychacko @garyrussell Thanks for your comments!

As you mentioned, the method listeners within the class will apply the callback identically regardless of consumer group ID. So it seems necessary to execute callbacks differently for each consumer group since the intended behavior may vary between consumer groups. (Of course, we can work around this for now by keeping our classes separate.)

I'll also keep looking for ways to do it.

I'm so grateful for your help!

sobychacko commented 3 months ago

We will try to make some changes to accommodate this before the GA.

sobychacko commented 3 months ago

@bky373 After looking at this further, we realized this is a bit more involved from the framework perspective since we need to introduce some breaking changes at the API level. Therefore, we recommend your workaround in this and prior versions of Spring Kafka (since we are so close to the 3.2.0 GA release), i.e., stick with a single class / per listener for this use case. We will table the proper fixes for this issue for now and consider this for the next version of the framework, 3.3.0.

bky373 commented 3 months ago

@sobychacko

Thank you for taking the time to research and respond!

I'm curious to know what you think of the solution.

In either case, I'm hesitant to say, as it would be a big change, I'd like to hear your thoughts and see if there's anything I can contribute.

sobychacko commented 3 months ago

@bky373 We had an internal discussion on this with @artembilan yesterday. We need to make some changes similar to your line of thinking. Some API methods in ConsumerSeekAware need to be modified to take some new information about the consumer group. We believe that relying on the ConsumerSeekCallback can get the group ID information; we need to look further. We can make these changes when we switch the main branch to 3.3.0-SNAPSHOT after the GA release. I have marked this issue for 3.3.0-M1 milestone. If you want to work on a PR for this, you are certainly welcome to do so.

bky373 commented 3 months ago

@sobychacko

Yes, thank you!

Personally, I'd like to take this on and work on it a bit more. However, before I start working on the code, it would be great to have a discussion and get some feedback on the direction of the work. If that's okay with you, I'll do a little more research and get back to you after I've organized things!

Off the top of my head, as you said, if we can get the consumerGroupId from the ConsumerSeekCallback properly using KafkaUtils.getConsumerGroupId(), we can define the behavior per groupId when creating the callback. But that needs to be tested.

sobychacko commented 3 months ago

@bky373 Feel free to work on it. Before you start coding, if you want us to confirm the design, please continue discussing it here, and we can review it. Thanks!

bky373 commented 2 months ago

@sobychacko

Hi, I apologize for reaching out after such a long time. I've revisited the issue and thought about potential solutions.

Before diving into the details, let me briefly summarize the problem since it has been a while.

Here are the approaches I've considered:

1. Passing consumerGroupId as a parameter from the outside.

2. Setting seek allow flag per @KafkaListener (e.g. Boolean consumerSeekAllowed)

Thank you for reading through this long message. Feel free to share any thoughts or feedback. Thanks!

bky373 commented 2 months ago

@sobychacko Hi, if you have some time, could I get your feedback on the above? I would like to know what you have in mind. If a different approach is needed, I would like to consider that as well. Thanks!

sobychacko commented 2 months ago

sure @bky373. Sorry for the delay. We will get back to you soon on this.

sobychacko commented 2 months ago

@bky373 I like your second approach, as this is a minimally invasive set of changes and doesn't require any API changes in ConsumerSeekAware or related classes. On the other hand, with this approach, users need to be mindful that they need to disable seeks in the listeners in classes that extend from AbstractConsumerSeekAware that contains multiple listeners.

@artembilan do you have any thoughts on changing KafkaListener like this vs making/breaking API level changes in ConsumerSeekAware?

Also, I wonder if there is a valid use case that might benefit others where they need to drive seeking offsets based on the group-id?

artembilan commented 2 months ago

Well, this new consumerSeekAllowed feels more like a workaround for what we cannot do right now. And it is a bit awkward: we do ConsumerSeekAware, but then consumerSeekAllowed = false 🤷 But at the same time we already have a more reasonable workaround via splitting. I'd say this is more robust and logical workaround: in the end we develop micorservices, so as simple logic as possible is the best approach. Without any paradox of choice.

Currently, it's just groupId, but there might be future requests to register callbacks based on other types of values.

So, just don't mix up many listeners in a single class. Why make your life so complicated, if we can simply just split and reuse logic via delegation to some other common service (if any)?

Sorry for some rude language, but if we go this way, I'd prefer groupId propagation. Or we can chose to fail fast, if same ConsumerSeekAware is used for many listeners.

sobychacko commented 2 months ago

Each approaches have pros and cons. While it is an easier solution to add this as a new flag, adding a top-level property like this to KafkaListener might not bode that well, given that there is a workaround. I guess we have 2 options if we exclude the KafkaListener flag.

  1. API changes - Adding a new method that specifically takes a group ID so that only the listener with that ID is involved in the seek operation.
  2. Fail fast when you have multiple listeners in a class that implements ConsumerSeekAware. The exception thrown can suggest the users split the listeners into multiple classes.
garyrussell commented 2 months ago

There's another option; add a new (default) method to ConsumerSeekAware

default boolean seekByGroupId() {
    return false;
}

Then, use KafkaUtils.getConsumerGroupId() when seeking, and when building callback tables.

https://github.com/spring-projects/spring-kafka/issues/2302#issuecomment-2096572637

No breaking API changes, no @KafkaListener changes.

But it would only work when seeking on the listener thread.

sobychacko commented 2 months ago

Thanks, @garyrussell, for that great insight. @ bky373 We should look into the idea Gary suggested for this issue as the solution.

artembilan commented 2 months ago

Thanks, Gary!

I see the logic in ListenerConsumer.initialize():

            setupSeeks();
            KafkaUtils.setConsumerGroupId(this.consumerGroupId);

Which probably has to be swapped to make that groupId available for registration.

bky373 commented 2 months ago

@sobychacko Thanks for your reply!

Also, I wonder if there is a valid use case that might benefit others where they need to drive seeking offsets based on the group-id?

I think this might be the case.

@Component
public class DeliveryListener extends AbstractConsumerSeekAware {

    ...

    @KafkaListener(groupId = "delivery-status-group", topics = "delivery-topic")
    void listenForStatusUpdates(String message) {
        // Update delivery status in DB
        updateService.update(message)
    }

    @KafkaListener(groupId = "delivery-notification-group", topics = "delivery-topic")
    void listenForNotifications(String message) {
        // Notify the customer
       notificationService.notify(message)
    }

    @KafkaListener(groupId = "delivery-analytics-group", topics = "delivery-topic")
    void listenForAnalytics(String message) {
        // Process the delivery message for analytics
       analyticsService.analyze(message)
    }
}

If the error only occurs in the delivery-status-group listener and we want to adjust the offset of that listener, we should make sure that we only adjust the delivery-status-group. Trying to seek by groupId might be a good idea in this case (currently, all listeners are affected).

In fact, in cases like the above, it's probably best to separate the classes, as the internal logic would be rather complicated (which is what I want most!). But one thing to remember is that it can be implemented in code anyway, and users can use it without realising the side effects.

bky373 commented 2 months ago

Well, this new consumerSeekAllowed feels more like a workaround for what we cannot do right now. And it is a bit awkward: we do ConsumerSeekAware, but then consumerSeekAllowed = false 🤷 But at the same time we already have a more reasonable workaround via splitting. I'd say this is more robust and logical workaround: in the end we develop micorservices, so as simple logic as possible is the best approach. Without any paradox of choice.

@artembilan Yes, I completely agree with your comment. I hope there's no misunderstanding. I just think we want to prevent things that are implementable in the code, and if there are users who continue to use it unaware of the unintended behaviour (or even unaware of the side-effects), then we should prevent it with new guidance or a new implementation.

bky373 commented 2 months ago

@garyrussell Thanks again for the good suggestion! I'll try to find a solution using your suggestions.

sobychacko commented 2 months ago

@bky373 I think it is better to go with Gary's suggestion on this. Can you think about a design along the lines of what he suggested?

bky373 commented 2 months ago

@sobychacko Yes, I'll look into this and get back to you.

bky373 commented 2 months ago

b93ff92 Here's what I did. @sobychacko @artembilan

sobychacko commented 2 months ago

I think approach 1 is cleaner. I suggest making that a formal PR and submitting it so we can do more reviews.

bky373 commented 2 months ago

Alright! I'm going to create PR.