spring-projects / spring-kafka

Provides Familiar Spring Abstractions for Apache Kafka
https://projects.spring.io/spring-kafka
Apache License 2.0
2.16k stars 1.54k forks source link

Rebalancing problem between Spring Kafka consumers with versions <=2.3.13.RELEASE and >=2.4.0.RELEASE (included 2.8.6) #2277

Closed ferblaca closed 2 years ago

ferblaca commented 2 years ago

I am having problems with kafka consumers of applications with different versions of Spring-kafka, specifically between 2.3.13.RELEASE and >=2.4.0.RELEASE (included 2.8.6).

We have noticed that when performing a blue/green deployment, in which we upgrade consumer applications that were on Spring-Kafka version 2.3.13.RELEASE to [2.4.0.RELEASE,2.8.6], and that for a while both versions of the applications are deployed and active at the same time, the applications with Spring-Kafka version [2.4.0.RELEASE,2.8.6] do not get any partitioning under the following circumstances:

The result is that, for example, if the topic has 10 partitions, the application with Spring-Kafka version 2.3.13.RELEASE is assigned all 10 partitions, while the application with Spring-Kafka [2.4.0.RELEASE,2.8.6] is not assigned any partition.

No matter in which order the applications are deployed, the end result is always the same.

When the Spring-Kafka 2.3.13.RELEASE application stops, the [2.4.0.RELEASE,2.8.6] application is assigned all 10 partitions correctly.

For more detailed information on application logs, see the following link.

Would it be possible by configuration to avoid this behaviour?

I attach the apps that reproduce this described behavior: springKafkaIssue.zip

garyrussell commented 2 years ago

This has nothing to do with Spring; all rebalancing activity is handled by Kafka itself.

Recent versions of the kafka-clients changed the partition assignor; in your first log

partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]

In the second log:

 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]

The sticky assignor tries to keep partitions with the same instance.

ferblaca commented 2 years ago

@garyrussell I have tried forcing the RangeAssignor strategy for both versions:

partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor].

with the same result.

On the other hand, I have also tested without Spring, only with the Kafka-client for the implied versions simulating concurrency and it didn't reproduce either.

garyrussell commented 2 years ago

Although, according to their docs for changes in the 3.0.0 client:

The default partition.assignment.strategy is changed to "[RangeAssignor, CooperativeStickyAssignor]", which will use the RangeAssignor by default, but allows upgrading to the CooperativeStickyAssignor with just a single rolling bounce that removes the RangeAssignor from the list. Please check the client upgrade path guide here for more detail.

https://kafka.apache.org/documentation/#consumerconfigs_partition.assignment.strategy

So it's not clear why the sticky assignor is being used.

garyrussell commented 2 years ago

I'll play with your example to see if I can see what's going on, but I don't see how it can be a Spring issue.

garyrussell commented 2 years ago

But, in general, as Tomaz replied on SO, Kafka just sees 20 consumers; he doesn't know that they are 2 sets of 10, so there is no guarantee that each instance will get 5; it's generally better to over-provision the number of topics for this kind of situation.

ferblaca commented 2 years ago

But, in general, as Tomaz replied on SO, Kafka just sees 20 consumers; he doesn't know that they are 2 sets of 10, so there is no guarantee that each instance will get 5; it's generally better to over-provision the number of topics for this kind of situation.

yes.. but in all the tests I have performed, which have been many, the partitions are always evenly distributed (5 partitions for each consumer of each App), as long as the Spring-kafka versions of the applications are not those mentioned above.

The tests performed only with the kafka-client (without Spring), creating 10 concurrent consumers for each App for 2.4.1 and 3.1.0 versions of kafka-client does not reproduce in any case the behavior described in the issue, that is to say, always in all cases an equal distribution of partitions for each App is performed.

It is a strange behavior that I can't understand which I have been able to reproduce from a local environment until we are managed by K8s.

garyrussell commented 2 years ago

It is strange, if I run 2 copies of consumer1 or 2 copies of consumer 2 each gets 5 partitions. But if I run one of each, then consumer1 retains all 10.

However, I see the same behavior when there is no Spring involvement; I changed your mains as follows:

public static void main(String[] args) throws InterruptedException {
    // SpringApplication.run(DemoApplication.class, args);
    Map<String, Object> config = Map.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
            ConsumerConfig.GROUP_ID_CONFIG, "foo",
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    SimpleAsyncTaskExecutor exec = new SimpleAsyncTaskExecutor();
    for (int i = 0; i < 10; i++) {
        exec.execute(() -> {
            KafkaConsumer consumer = new KafkaConsumer(config);
            consumer.subscribe(List.of("spring.kafka.test"), new ConsumerRebalanceListener() {

                @Override
                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                    log.info("Revoked: " + partitions);
                }

                @Override
                public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                    log.info("Assigned: " + partitions);
                }
            });
            while (true) {
                consumer.poll(Duration.ofSeconds(5));
            }
        });
    }
    Thread.sleep(60_000);
    System.exit(0);
}

With the same result; consumer1:

11:17:27.212 [SimpleAsyncTaskExecutor-6] INFO  c.s.kafka.sample.DemoApplication - Assigned: [spring.kafka.test-3]
11:17:27.212 [SimpleAsyncTaskExecutor-4] INFO  c.s.kafka.sample.DemoApplication - Assigned: [spring.kafka.test-8]
11:17:27.212 [SimpleAsyncTaskExecutor-5] INFO  c.s.kafka.sample.DemoApplication - Assigned: [spring.kafka.test-5]
11:17:27.212 [SimpleAsyncTaskExecutor-9] INFO  c.s.kafka.sample.DemoApplication - Assigned: [spring.kafka.test-2]
11:17:27.212 [SimpleAsyncTaskExecutor-10] INFO  c.s.kafka.sample.DemoApplication - Assigned: [spring.kafka.test-1]
11:17:27.212 [SimpleAsyncTaskExecutor-7] INFO  c.s.kafka.sample.DemoApplication - Assigned: [spring.kafka.test-9]
11:17:27.212 [SimpleAsyncTaskExecutor-8] INFO  c.s.kafka.sample.DemoApplication - Assigned: [spring.kafka.test-4]
11:17:27.212 [SimpleAsyncTaskExecutor-3] INFO  c.s.kafka.sample.DemoApplication - Assigned: [spring.kafka.test-0]
11:17:27.212 [SimpleAsyncTaskExecutor-2] INFO  c.s.kafka.sample.DemoApplication - Assigned: [spring.kafka.test-6]
11:17:27.212 [SimpleAsyncTaskExecutor-1] INFO  c.s.kafka.sample.DemoApplication - Assigned: [spring.kafka.test-7]
11:17:33.211 [SimpleAsyncTaskExecutor-2] INFO  c.s.kafka.sample.DemoApplication - Revoked: [spring.kafka.test-6]
11:17:33.212 [SimpleAsyncTaskExecutor-4] INFO  c.s.kafka.sample.DemoApplication - Revoked: [spring.kafka.test-8]
11:17:33.212 [SimpleAsyncTaskExecutor-7] INFO  c.s.kafka.sample.DemoApplication - Revoked: [spring.kafka.test-9]
11:17:33.212 [SimpleAsyncTaskExecutor-6] INFO  c.s.kafka.sample.DemoApplication - Revoked: [spring.kafka.test-3]
11:17:33.213 [SimpleAsyncTaskExecutor-9] INFO  c.s.kafka.sample.DemoApplication - Revoked: [spring.kafka.test-2]
11:17:33.213 [SimpleAsyncTaskExecutor-1] INFO  c.s.kafka.sample.DemoApplication - Revoked: [spring.kafka.test-7]
11:17:33.213 [SimpleAsyncTaskExecutor-8] INFO  c.s.kafka.sample.DemoApplication - Revoked: [spring.kafka.test-4]
11:17:33.214 [SimpleAsyncTaskExecutor-5] INFO  c.s.kafka.sample.DemoApplication - Revoked: [spring.kafka.test-5]
11:17:33.215 [SimpleAsyncTaskExecutor-3] INFO  c.s.kafka.sample.DemoApplication - Revoked: [spring.kafka.test-0]
11:17:33.216 [SimpleAsyncTaskExecutor-10] INFO  c.s.kafka.sample.DemoApplication - Revoked: [spring.kafka.test-1]
11:17:33.239 [SimpleAsyncTaskExecutor-5] INFO  c.s.kafka.sample.DemoApplication - Assigned: [spring.kafka.test-5]
11:17:33.239 [SimpleAsyncTaskExecutor-10] INFO  c.s.kafka.sample.DemoApplication - Assigned: [spring.kafka.test-1]
11:17:33.239 [SimpleAsyncTaskExecutor-8] INFO  c.s.kafka.sample.DemoApplication - Assigned: [spring.kafka.test-4]
11:17:33.239 [SimpleAsyncTaskExecutor-9] INFO  c.s.kafka.sample.DemoApplication - Assigned: [spring.kafka.test-2]
11:17:33.239 [SimpleAsyncTaskExecutor-6] INFO  c.s.kafka.sample.DemoApplication - Assigned: [spring.kafka.test-3]
11:17:33.239 [SimpleAsyncTaskExecutor-7] INFO  c.s.kafka.sample.DemoApplication - Assigned: [spring.kafka.test-9]
11:17:33.239 [SimpleAsyncTaskExecutor-1] INFO  c.s.kafka.sample.DemoApplication - Assigned: [spring.kafka.test-7]
11:17:33.239 [SimpleAsyncTaskExecutor-3] INFO  c.s.kafka.sample.DemoApplication - Assigned: [spring.kafka.test-0]
11:17:33.239 [SimpleAsyncTaskExecutor-4] INFO  c.s.kafka.sample.DemoApplication - Assigned: [spring.kafka.test-8]
11:17:33.240 [SimpleAsyncTaskExecutor-2] INFO  c.s.kafka.sample.DemoApplication - Assigned: [spring.kafka.test-6]

Consumer2:

11:17:33.242 [SimpleAsyncTaskExecutor-2] INFO  c.s.kafka.sample.DemoApplication2 - Assigned: []
11:17:33.242 [SimpleAsyncTaskExecutor-3] INFO  c.s.kafka.sample.DemoApplication2 - Assigned: []
11:17:33.242 [SimpleAsyncTaskExecutor-8] INFO  c.s.kafka.sample.DemoApplication2 - Assigned: []
11:17:33.242 [SimpleAsyncTaskExecutor-9] INFO  c.s.kafka.sample.DemoApplication2 - Assigned: []
11:17:33.242 [SimpleAsyncTaskExecutor-5] INFO  c.s.kafka.sample.DemoApplication2 - Assigned: []
11:17:33.242 [SimpleAsyncTaskExecutor-10] INFO  c.s.kafka.sample.DemoApplication2 - Assigned: []
11:17:33.242 [SimpleAsyncTaskExecutor-1] INFO  c.s.kafka.sample.DemoApplication2 - Assigned: []
11:17:33.242 [SimpleAsyncTaskExecutor-7] INFO  c.s.kafka.sample.DemoApplication2 - Assigned: []
11:17:33.242 [SimpleAsyncTaskExecutor-6] INFO  c.s.kafka.sample.DemoApplication2 - Assigned: []
11:17:33.242 [SimpleAsyncTaskExecutor-4] INFO  c.s.kafka.sample.DemoApplication2 - Assigned: []

So I still don't consider this to be a Spring issue.

garyrussell commented 2 years ago

I corrected my test to put the poll in a loop; but I still get the same results - edited the comment above.

garyrussell commented 2 years ago

I see this addition to the RangeAssignor javadocs:

 * Since the introduction of static membership, we could leverage <code>group.instance.id</code> to make the assignment behavior more sticky.
 * For the above example, after one rolling bounce, group coordinator will attempt to assign new <code>member.id</code> towards consumers,
 * for example <code>C0</code> -> <code>C3</code> <code>C1</code> -> <code>C2</code>.
 *
 * <p>The assignment could be completely shuffled to:
 * <ul>
 * <li><code>C3 (was C0): [t0p2, t1p2] (before was [t0p0, t0p1, t1p0, t1p1])</code>
 * <li><code>C2 (was C1): [t0p0, t0p1, t1p0, t1p1] (before was [t0p2, t1p2])</code>
 * </ul>
 *
 * The assignment change was caused by the change of <code>member.id</code> relative order, and
 * can be avoided by setting the group.instance.id.
 * Consumers will have individual instance ids <code>I1</code>, <code>I2</code>. As long as
 * 1. Number of members remain the same across generation
 * 2. Static members' identities persist across generation
 * 3. Subscription pattern doesn't change for any member
 *
 * <p>The assignment will always be:
 * <ul>
 * <li><code>I0: [t0p0, t0p1, t1p0, t1p1]</code>
 * <li><code>I1: [t0p2, t1p2]</code>
 * </ul>
 */

Specifically ...to make the assignment behavior more sticky....

I suspect that might be the problem; although the group.instance.id is null in both cases.

garyrussell commented 2 years ago

Another data point; if I add group.instance.id to each consumer; I get even weirder results.

for (int i = 0; i < 10; i++) {
    int j = i + 10; // = i in consumer2
    exec.execute(() -> {
        Map<String, Object> configs = new HashMap<>(config);
        configs.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "I" + j);
        KafkaConsumer consumer = new KafkaConsumer(configs);
                ...

Consumer 1

11:50:18.607 [SimpleAsyncTaskExecutor-1] INFO  c.s.kafka.sample.DemoApplication - Assigned: [spring.kafka.test-0]
11:50:18.607 [SimpleAsyncTaskExecutor-4] INFO  c.s.kafka.sample.DemoApplication - Assigned: [spring.kafka.test-3]
11:50:18.607 [SimpleAsyncTaskExecutor-8] INFO  c.s.kafka.sample.DemoApplication - Assigned: [spring.kafka.test-7]
11:50:18.607 [SimpleAsyncTaskExecutor-10] INFO  c.s.kafka.sample.DemoApplication - Assigned: [spring.kafka.test-9]
11:50:18.607 [SimpleAsyncTaskExecutor-7] INFO  c.s.kafka.sample.DemoApplication - Assigned: [spring.kafka.test-6]
11:50:18.607 [SimpleAsyncTaskExecutor-9] INFO  c.s.kafka.sample.DemoApplication - Assigned: [spring.kafka.test-8]
11:50:18.607 [SimpleAsyncTaskExecutor-6] INFO  c.s.kafka.sample.DemoApplication - Assigned: [spring.kafka.test-5]
11:50:18.607 [SimpleAsyncTaskExecutor-5] INFO  c.s.kafka.sample.DemoApplication - Assigned: [spring.kafka.test-4]
11:50:18.607 [SimpleAsyncTaskExecutor-3] INFO  c.s.kafka.sample.DemoApplication - Assigned: [spring.kafka.test-2]
11:50:18.607 [SimpleAsyncTaskExecutor-2] INFO  c.s.kafka.sample.DemoApplication - Assigned: [spring.kafka.test-1]
11:50:51.616 [SimpleAsyncTaskExecutor-9] INFO  c.s.kafka.sample.DemoApplication - Revoked: [spring.kafka.test-8]
11:50:51.616 [SimpleAsyncTaskExecutor-6] INFO  c.s.kafka.sample.DemoApplication - Revoked: [spring.kafka.test-5]
11:50:51.616 [SimpleAsyncTaskExecutor-5] INFO  c.s.kafka.sample.DemoApplication - Revoked: [spring.kafka.test-4]
11:50:51.617 [SimpleAsyncTaskExecutor-1] INFO  c.s.kafka.sample.DemoApplication - Revoked: [spring.kafka.test-0]
11:50:51.617 [SimpleAsyncTaskExecutor-10] INFO  c.s.kafka.sample.DemoApplication - Revoked: [spring.kafka.test-9]
11:50:51.617 [SimpleAsyncTaskExecutor-4] INFO  c.s.kafka.sample.DemoApplication - Revoked: [spring.kafka.test-3]
11:50:51.618 [SimpleAsyncTaskExecutor-2] INFO  c.s.kafka.sample.DemoApplication - Revoked: [spring.kafka.test-1]
11:50:51.618 [SimpleAsyncTaskExecutor-7] INFO  c.s.kafka.sample.DemoApplication - Revoked: [spring.kafka.test-6]
11:50:51.618 [SimpleAsyncTaskExecutor-3] INFO  c.s.kafka.sample.DemoApplication - Revoked: [spring.kafka.test-2]
11:50:51.618 [SimpleAsyncTaskExecutor-8] INFO  c.s.kafka.sample.DemoApplication - Revoked: [spring.kafka.test-7]
11:50:51.621 [SimpleAsyncTaskExecutor-9] INFO  c.s.kafka.sample.DemoApplication - Assigned: []
11:50:51.621 [SimpleAsyncTaskExecutor-4] INFO  c.s.kafka.sample.DemoApplication - Assigned: [spring.kafka.test-5]
11:50:51.621 [SimpleAsyncTaskExecutor-1] INFO  c.s.kafka.sample.DemoApplication - Assigned: [spring.kafka.test-2]
11:50:51.621 [SimpleAsyncTaskExecutor-2] INFO  c.s.kafka.sample.DemoApplication - Assigned: [spring.kafka.test-3]
11:50:51.621 [SimpleAsyncTaskExecutor-5] INFO  c.s.kafka.sample.DemoApplication - Assigned: [spring.kafka.test-6]
11:50:51.621 [SimpleAsyncTaskExecutor-8] INFO  c.s.kafka.sample.DemoApplication - Assigned: [spring.kafka.test-9]
11:50:51.621 [SimpleAsyncTaskExecutor-7] INFO  c.s.kafka.sample.DemoApplication - Assigned: [spring.kafka.test-8]
11:50:51.621 [SimpleAsyncTaskExecutor-3] INFO  c.s.kafka.sample.DemoApplication - Assigned: [spring.kafka.test-4]
11:50:51.621 [SimpleAsyncTaskExecutor-10] INFO  c.s.kafka.sample.DemoApplication - Assigned: []
11:50:51.621 [SimpleAsyncTaskExecutor-6] INFO  c.s.kafka.sample.DemoApplication - Assigned: [spring.kafka.test-7]

Consumer2

11:50:51.627 [SimpleAsyncTaskExecutor-2] INFO  c.s.kafka.sample.DemoApplication2 - Assigned: [spring.kafka.test-1]
11:50:51.626 [SimpleAsyncTaskExecutor-3] INFO  c.s.kafka.sample.DemoApplication2 - Assigned: []
11:50:51.626 [SimpleAsyncTaskExecutor-6] INFO  c.s.kafka.sample.DemoApplication2 - Assigned: []
11:50:51.626 [SimpleAsyncTaskExecutor-5] INFO  c.s.kafka.sample.DemoApplication2 - Assigned: []
11:50:51.626 [SimpleAsyncTaskExecutor-4] INFO  c.s.kafka.sample.DemoApplication2 - Assigned: []
11:50:51.626 [SimpleAsyncTaskExecutor-8] INFO  c.s.kafka.sample.DemoApplication2 - Assigned: []
11:50:51.626 [SimpleAsyncTaskExecutor-9] INFO  c.s.kafka.sample.DemoApplication2 - Assigned: []
11:50:51.627 [SimpleAsyncTaskExecutor-1] INFO  c.s.kafka.sample.DemoApplication2 - Assigned: [spring.kafka.test-0]
11:50:51.626 [SimpleAsyncTaskExecutor-7] INFO  c.s.kafka.sample.DemoApplication2 - Assigned: []
11:50:51.626 [SimpleAsyncTaskExecutor-10] INFO  c.s.kafka.sample.DemoApplication2 - Assigned: []

I seem to (reliably) get 2 partitions on the new instance.

ferblaca commented 2 years ago

It is very curious that we have different results using only the Kafka client (without Spring).

For example with the following code:

package com.kafka.sample;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ApplicationMultiThread {

    private static final Logger log = LoggerFactory.getLogger(ApplicationMultiThread.class);
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final String TOPIC = "sscc.global.des.amgjplayg.product.public.v1";
    public static final int CORE_POOL_SIZE = 10;

    public static void main(String[] args) {
        // Consumer list
        List<Consumer<Long, String>> consumerList = createConsumerList(CORE_POOL_SIZE, BOOTSTRAP_SERVERS, TOPIC);
        // Executor
        ExecutorService executorService = new ThreadPoolExecutor(CORE_POOL_SIZE, CORE_POOL_SIZE, 0L, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<Runnable>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
        //submit task consumer
        consumerList.stream().forEach(consumer -> executorService.submit(new ConsumerTask(consumer)));
    }

    private static List<Consumer<Long, String>> createConsumerList(Integer consumers, String bootstrapServers, String topic) {
        List<Consumer<Long, String>> consumerList = new ArrayList<>(consumers);
        for (int i = 0; i < consumers; i++) {
            consumerList.add(createConsumer(bootstrapServers, topic));
        }
        return consumerList;
    }

    private static Consumer<Long, String> createConsumer(String bootstrapServers, String topic) {
        final Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG,
                "KafkaExampleConsumerGroup");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                LongDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());

        // Create the consumer using props.
        final Consumer<Long, String> consumer =
                new KafkaConsumer<>(props);

        // Subscribe to the topic.
        consumer.subscribe(List.of(topic), new ConsumerRebalanceListener() {

            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                System.out.println("Revoked: " + partitions);
            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                System.out.println("Assigned: " + partitions);
            }
        });

        return consumer;
    }

    static class ConsumerTask implements Runnable {
        private Consumer<Long, String> consumer;
        final int giveUp = 100000;
        int noRecordsCount = 0;

        public ConsumerTask(Consumer<Long, String> consumer) {
            this.consumer = consumer;
        }

        @Override
        public void run() {
            while (true) {
                final ConsumerRecords<Long, String> consumerRecords =
                        consumer.poll(1000);

                if (consumerRecords.count() == 0) {
                    noRecordsCount++;
                    if (noRecordsCount > giveUp) break;
                    else continue;
                }

                consumerRecords.forEach(record -> {
                    log.info("Consumer Record:(%d, %s, %d, %d)\n",
                            record.key(), record.value(),
                            record.partition(), record.offset());
                });

                consumer.commitAsync();
            }
            consumer.close();
        }
    }
}

Consumer1

Assigned: [sscc.global.des.amgjplayg.product.public.v1-7]
Assigned: [sscc.global.des.amgjplayg.product.public.v1-3]
Assigned: [sscc.global.des.amgjplayg.product.public.v1-2]
Assigned: [sscc.global.des.amgjplayg.product.public.v1-9]
Assigned: [sscc.global.des.amgjplayg.product.public.v1-1]
Assigned: [sscc.global.des.amgjplayg.product.public.v1-0]
Assigned: [sscc.global.des.amgjplayg.product.public.v1-5]
Assigned: [sscc.global.des.amgjplayg.product.public.v1-4]
Assigned: [sscc.global.des.amgjplayg.product.public.v1-8]
Assigned: [sscc.global.des.amgjplayg.product.public.v1-6]
...
Revoked: [sscc.global.des.amgjplayg.product.public.v1-8]
Revoked: [sscc.global.des.amgjplayg.product.public.v1-1]
Revoked: [sscc.global.des.amgjplayg.product.public.v1-0]
Revoked: [sscc.global.des.amgjplayg.product.public.v1-7]
Revoked: [sscc.global.des.amgjplayg.product.public.v1-4]
Revoked: [sscc.global.des.amgjplayg.product.public.v1-5]
Revoked: [sscc.global.des.amgjplayg.product.public.v1-2]
Revoked: [sscc.global.des.amgjplayg.product.public.v1-6]
Revoked: [sscc.global.des.amgjplayg.product.public.v1-3]
Revoked: [sscc.global.des.amgjplayg.product.public.v1-9]
Assigned: []
Assigned: [sscc.global.des.amgjplayg.product.public.v1-5]
Assigned: [sscc.global.des.amgjplayg.product.public.v1-0]
Assigned: [sscc.global.des.amgjplayg.product.public.v1-2]
Assigned: []
Assigned: []
Assigned: [sscc.global.des.amgjplayg.product.public.v1-8]
Assigned: [sscc.global.des.amgjplayg.product.public.v1-7]
Assigned: []
Assigned: []

Consumer2

Assigned: []
Assigned: []
Assigned: []
Assigned: [sscc.global.des.amgjplayg.product.public.v1-1]
Assigned: []
Assigned: []
Assigned: [sscc.global.des.amgjplayg.product.public.v1-9]
Assigned: [sscc.global.des.amgjplayg.product.public.v1-4]
Assigned: [sscc.global.des.amgjplayg.product.public.v1-6]
Assigned: [sscc.global.des.amgjplayg.product.public.v1-3]

I have also tried your code above, using Spring's SimpleAsyncTaskExecutor executor and I also see an equal distribution of the parts...

Maybe the Kafka version of the broker also plays a role, in my case:

It seems more and more strange to me hehe

garyrussell commented 2 years ago

Interesting indeed; I just copied your app to the two projects.

Consumer1

Revoked: []
Revoked: []
Revoked: []
Revoked: []
Revoked: []
Revoked: []
Revoked: []
Revoked: []
Revoked: []
Revoked: []
Assigned: [spring.kafka.test-6]
Assigned: [spring.kafka.test-0]
Assigned: [spring.kafka.test-4]
Assigned: [spring.kafka.test-1]
Assigned: [spring.kafka.test-7]
Assigned: [spring.kafka.test-3]
Assigned: [spring.kafka.test-8]
Assigned: [spring.kafka.test-5]
Assigned: [spring.kafka.test-2]
Assigned: [spring.kafka.test-9]
Revoked: [spring.kafka.test-0]
Revoked: [spring.kafka.test-6]
Revoked: [spring.kafka.test-5]
Revoked: [spring.kafka.test-1]
Revoked: [spring.kafka.test-8]
Revoked: [spring.kafka.test-7]
Revoked: [spring.kafka.test-3]
Revoked: [spring.kafka.test-2]
Revoked: [spring.kafka.test-4]
Revoked: [spring.kafka.test-9]
Assigned: [spring.kafka.test-6]
Assigned: [spring.kafka.test-5]
Assigned: [spring.kafka.test-9]
Assigned: [spring.kafka.test-7]
Assigned: [spring.kafka.test-0]
Assigned: [spring.kafka.test-3]
Assigned: [spring.kafka.test-2]
Assigned: [spring.kafka.test-1]
Assigned: [spring.kafka.test-4]
Assigned: [spring.kafka.test-8]

Consumer2:

Assigned: []
Assigned: []
Assigned: []
Assigned: []
Assigned: []
Assigned: []
Assigned: []
Assigned: []
Assigned: []
Assigned: []

I am using Kafka 3.1.0.

ferblaca commented 2 years ago

Interesting indeed; I just copied your app to the two projects.

Consumer1

Revoked: []
Revoked: []
Revoked: []
Revoked: []
Revoked: []
Revoked: []
Revoked: []
Revoked: []
Revoked: []
Revoked: []
Assigned: [spring.kafka.test-6]
Assigned: [spring.kafka.test-0]
Assigned: [spring.kafka.test-4]
Assigned: [spring.kafka.test-1]
Assigned: [spring.kafka.test-7]
Assigned: [spring.kafka.test-3]
Assigned: [spring.kafka.test-8]
Assigned: [spring.kafka.test-5]
Assigned: [spring.kafka.test-2]
Assigned: [spring.kafka.test-9]
Revoked: [spring.kafka.test-0]
Revoked: [spring.kafka.test-6]
Revoked: [spring.kafka.test-5]
Revoked: [spring.kafka.test-1]
Revoked: [spring.kafka.test-8]
Revoked: [spring.kafka.test-7]
Revoked: [spring.kafka.test-3]
Revoked: [spring.kafka.test-2]
Revoked: [spring.kafka.test-4]
Revoked: [spring.kafka.test-9]
Assigned: [spring.kafka.test-6]
Assigned: [spring.kafka.test-5]
Assigned: [spring.kafka.test-9]
Assigned: [spring.kafka.test-7]
Assigned: [spring.kafka.test-0]
Assigned: [spring.kafka.test-3]
Assigned: [spring.kafka.test-2]
Assigned: [spring.kafka.test-1]
Assigned: [spring.kafka.test-4]
Assigned: [spring.kafka.test-8]

Consumer2:

Assigned: []
Assigned: []
Assigned: []
Assigned: []
Assigned: []
Assigned: []
Assigned: []
Assigned: []
Assigned: []
Assigned: []

I am using Kafka 3.1.0.

@garyrussell Try running the programme with these two attached projects. If I am right, you will get the same results as I did: kafkaWithoutSpring.zip

I don't know why yet, but by running the kafka-client programs in the project I added with Spring boot I was able to get the behaviour you are talking about.

However, with these projects without the Spring Boot dependencies the rebalancing with only kafka-client versions 2.4.1 and 3.1.0 is correct.

garyrussell commented 2 years ago

That is just bizarre - I see the same - but I don't see how Boot being present can affect the behavior; weird.

garyrussell commented 2 years ago

It's a problem with the partitioner and compatibility with newer subscription names.

"Old" consumers have an id like this

consumer-1-76d637d1-e3f7-4fe7-abd8-51b9a4220ae3

whereas new consumer ids include the group:

consumer-foo-1-1414822b-24e7-434a-a439-2522d1a23fe9

The RangeAssignor sorts the list of subscriptions and so all the old subscriptions come first in the list.

If I change the group from foo to 000 (in my version), I get the exact opposite behavior:

Consumer1:

13:45:03.106 [SimpleAsyncTaskExecutor-6] INFO  c.s.kafka.sample.DemoApplication - Assigned: [spring.kafka.test-3]
13:45:03.106 [SimpleAsyncTaskExecutor-2] INFO  c.s.kafka.sample.DemoApplication - Assigned: [spring.kafka.test-5]
13:45:03.106 [SimpleAsyncTaskExecutor-4] INFO  c.s.kafka.sample.DemoApplication - Assigned: [spring.kafka.test-7]
13:45:03.106 [SimpleAsyncTaskExecutor-5] INFO  c.s.kafka.sample.DemoApplication - Assigned: [spring.kafka.test-9]
13:45:03.106 [SimpleAsyncTaskExecutor-7] INFO  c.s.kafka.sample.DemoApplication - Assigned: [spring.kafka.test-2]
13:45:03.106 [SimpleAsyncTaskExecutor-8] INFO  c.s.kafka.sample.DemoApplication - Assigned: [spring.kafka.test-8]
13:45:03.106 [SimpleAsyncTaskExecutor-1] INFO  c.s.kafka.sample.DemoApplication - Assigned: [spring.kafka.test-6]
13:45:03.106 [SimpleAsyncTaskExecutor-10] INFO  c.s.kafka.sample.DemoApplication - Assigned: [spring.kafka.test-4]
13:45:03.106 [SimpleAsyncTaskExecutor-3] INFO  c.s.kafka.sample.DemoApplication - Assigned: [spring.kafka.test-1]
13:45:03.106 [SimpleAsyncTaskExecutor-9] INFO  c.s.kafka.sample.DemoApplication - Assigned: [spring.kafka.test-0]
13:45:09.099 [SimpleAsyncTaskExecutor-1] INFO  c.s.kafka.sample.DemoApplication - Revoked: [spring.kafka.test-6]
13:45:09.099 [SimpleAsyncTaskExecutor-8] INFO  c.s.kafka.sample.DemoApplication - Revoked: [spring.kafka.test-8]
13:45:09.099 [SimpleAsyncTaskExecutor-4] INFO  c.s.kafka.sample.DemoApplication - Revoked: [spring.kafka.test-7]
13:45:09.099 [SimpleAsyncTaskExecutor-10] INFO  c.s.kafka.sample.DemoApplication - Revoked: [spring.kafka.test-4]
13:45:09.099 [SimpleAsyncTaskExecutor-9] INFO  c.s.kafka.sample.DemoApplication - Revoked: [spring.kafka.test-0]
13:45:09.100 [SimpleAsyncTaskExecutor-5] INFO  c.s.kafka.sample.DemoApplication - Revoked: [spring.kafka.test-9]
13:45:09.100 [SimpleAsyncTaskExecutor-6] INFO  c.s.kafka.sample.DemoApplication - Revoked: [spring.kafka.test-3]
13:45:09.100 [SimpleAsyncTaskExecutor-3] INFO  c.s.kafka.sample.DemoApplication - Revoked: [spring.kafka.test-1]
13:45:09.100 [SimpleAsyncTaskExecutor-7] INFO  c.s.kafka.sample.DemoApplication - Revoked: [spring.kafka.test-2]
13:45:09.101 [SimpleAsyncTaskExecutor-2] INFO  c.s.kafka.sample.DemoApplication - Revoked: [spring.kafka.test-5]
13:45:09.105 [SimpleAsyncTaskExecutor-10] INFO  c.s.kafka.sample.DemoApplication - Assigned: []
13:45:09.105 [SimpleAsyncTaskExecutor-9] INFO  c.s.kafka.sample.DemoApplication - Assigned: []
13:45:09.105 [SimpleAsyncTaskExecutor-6] INFO  c.s.kafka.sample.DemoApplication - Assigned: []
13:45:09.105 [SimpleAsyncTaskExecutor-3] INFO  c.s.kafka.sample.DemoApplication - Assigned: []
13:45:09.105 [SimpleAsyncTaskExecutor-8] INFO  c.s.kafka.sample.DemoApplication - Assigned: []
13:45:09.105 [SimpleAsyncTaskExecutor-4] INFO  c.s.kafka.sample.DemoApplication - Assigned: []
13:45:09.105 [SimpleAsyncTaskExecutor-7] INFO  c.s.kafka.sample.DemoApplication - Assigned: []
13:45:09.105 [SimpleAsyncTaskExecutor-5] INFO  c.s.kafka.sample.DemoApplication - Assigned: []
13:45:09.105 [SimpleAsyncTaskExecutor-2] INFO  c.s.kafka.sample.DemoApplication - Assigned: []
13:45:09.106 [SimpleAsyncTaskExecutor-1] INFO  c.s.kafka.sample.DemoApplication - Assigned: []

Consumer2:

13:45:09.111 [SimpleAsyncTaskExecutor-4] INFO  c.s.kafka.sample.DemoApplication2 - Assigned: [spring.kafka.test-8]
13:45:09.111 [SimpleAsyncTaskExecutor-3] INFO  c.s.kafka.sample.DemoApplication2 - Assigned: [spring.kafka.test-7]
13:45:09.111 [SimpleAsyncTaskExecutor-5] INFO  c.s.kafka.sample.DemoApplication2 - Assigned: [spring.kafka.test-0]
13:45:09.111 [SimpleAsyncTaskExecutor-8] INFO  c.s.kafka.sample.DemoApplication2 - Assigned: [spring.kafka.test-3]
13:45:09.111 [SimpleAsyncTaskExecutor-10] INFO  c.s.kafka.sample.DemoApplication2 - Assigned: [spring.kafka.test-5]
13:45:09.111 [SimpleAsyncTaskExecutor-1] INFO  c.s.kafka.sample.DemoApplication2 - Assigned: [spring.kafka.test-2]
13:45:09.111 [SimpleAsyncTaskExecutor-2] INFO  c.s.kafka.sample.DemoApplication2 - Assigned: [spring.kafka.test-1]
13:45:09.111 [SimpleAsyncTaskExecutor-6] INFO  c.s.kafka.sample.DemoApplication2 - Assigned: [spring.kafka.test-6]
13:45:09.111 [SimpleAsyncTaskExecutor-7] INFO  c.s.kafka.sample.DemoApplication2 - Assigned: [spring.kafka.test-9]
13:45:09.111 [SimpleAsyncTaskExecutor-9] INFO  c.s.kafka.sample.DemoApplication2 - Assigned: [spring.kafka.test-4]

consumer-000-1-03e4a5ff-03c2-4a00-aa42-18286e42e694 Vs consumer-1-17f5fce4-f876-466c-bfd7-e00e125fe4fe, so the new ones are sorted first.

This is if the old instance is chosen as the assignor (always in my case).

The reason your stand-alone versions "work" is because you are comparing the 2.4.1 clients with the 3.1.0; the new subscription names must have been added in 2.4.

If you change the "old" one to use 2.3.1 instead (the version in your older Boot app), the behavior is the same.

garyrussell commented 2 years ago

This also explains the weird behavior when I set the group.instance.id because the sorted consumer list is

I0-647c7a67-89d4-49c0-b740-7853ddd62844 <<<< new instance gets the first two
I1-caeefe74-1f8e-43ca-9527-1383df45bbe9
I10-ede06173-85a4-409f-84c5-b4f481f7e7c1 <<<< old instance gets the rest
I11-e164b4c8-639e-4dac-b540-c5113beb17a3
I12-05bb0fce-a745-4bd7-b49b-e210ed0a3ba3
I13-6facaeed-6c75-4bc2-b239-4c8489326ac1
I14-f81999b0-8d2a-41e6-a54f-ac35f26e1b9d
I15-2c049581-d26c-496e-825d-5eda8c088fa1
I16-90cc0e09-0c42-45da-b809-f77a0dba0826
I17-0cc384c7-7961-4f5e-b90f-b8e17b101b53
I18-fd75f7bd-645e-41f3-8c4f-001ffa4bc065
I19-d4697697-8863-4c11-8eae-ed734f56d73d
I2-c2bb6d6a-df95-44fb-a2d6-62ebdedccf25
I3-a52b5dda-214f-42f4-ab26-10f8e1cdbfc4
I4-9c45eb01-9185-47ba-9cb0-cbc4c821442f
I5-f3ebdf24-1531-4e5a-ae99-47d665ca76ae
I6-f3377bd7-ebca-4136-b13f-8dc8cd38b7a3
I7-cb37735c-0733-4b10-a421-ed0210bee996
I8-45da8868-e8e6-403a-8531-c90de272a68a
I9-e5434c7c-815a-44c8-a8e7-c7dd77d76396

Recall that my "old" instance ids are 10-19 and "new" are 0-9.

garyrussell commented 2 years ago

I was able to solve it with a customized RangeAssignor to interleave the sorted list.

Collections.sort(consumersForTopic);
// begin custom
if (consumersForTopic.size() % 2 == 0) {
    for (int i = 1, j = consumersForTopic.size() / 2; i < consumersForTopic.size(); i += 2, j += 1) {
        consumersForTopic.add(i, consumersForTopic.remove(j));
    }
}
// end custom
consumer-1-0db129d1-f384-4f80-975f-b03b66df1322
consumer-foo-1-9395b090-c4fc-4c3d-9619-4805f710e38a
consumer-10-4c579c0b-e9ac-4690-9f61-1eb4fe92bb83
consumer-foo-10-c29400bb-eba2-46ff-bf1b-61b74a872b25
consumer-2-c4abc9c3-7e89-41a8-997c-5301b6750411
consumer-foo-2-c8bffa35-a522-4e2e-81ab-d817aa96f636
consumer-3-9000e4bf-5715-4fae-acb2-85ae48d34cd2
consumer-foo-3-aa1d9312-f417-4247-9749-dd23dbdd5840
consumer-4-27e023c6-9ba6-4c0d-90b7-4bbb563624ad
consumer-foo-4-4c412177-022f-44bf-82db-13cdcb55b3d9
consumer-5-593b8609-0f57-46e1-a85f-f438004dfed8
consumer-foo-5-3fdf7795-2f1c-4364-86d5-046956d2cdae
consumer-6-454ef633-6368-46e6-8599-ea19eea4abf5
consumer-foo-6-b533b386-079b-4ad0-b226-e91699bb0b97
consumer-7-3da45fe4-736c-4f1d-990d-1a8cee1baa44
consumer-foo-7-bb5ceede-5bc2-493d-bf52-407128ceb1f6
consumer-8-287e86b4-a61a-48a6-bb2c-db4ef1fcd0dc
consumer-foo-8-53c7ad2c-f033-468f-86a1-be8d65951b31
consumer-9-d1524b4d-6d99-4a0d-85c0-5e65862a6bfc
consumer-foo-9-e5892c52-5a2e-4297-8d63-240fb0290a2e

This would have to go in the old version, though, so it might not help you.

(I am not suggesting you actually do this, I just wanted to experiment).

ferblaca commented 2 years ago

Thank you very much @garyrussell for the effort!

You're right, using the kafka-client version 2.3.1 (less than 2.4) vs 3.1.0 (without Spring-Kafka) does reproduce the problem.