spring-cloud / spring-cloud-stream-binder-kafka

Spring Cloud Stream binders for Apache Kafka and Kafka Streams
Apache License 2.0
331 stars 301 forks source link

Duplicate consumption on multithreaded scenario (concurrency > 1) #1060

Closed Dionakra closed 3 years ago

Dionakra commented 3 years ago

When testing the currency parameter on the Consumer Config for a Spring Cloud Stream microservice (with Kafka), I noticed that several messages are processed twice. This happens because the second thread joins a little bit later than the first one, causing a rebalance prior T1 commiting its offsets, so T2 re-reads some messages from its newly assigned partitions.

I have the idempotence parameters set up, but it is not working with the concurrency parameter set to two as it may generate different producers for each thread, so it is not actually performing exactly-once-semantics.

Here you have an example log:

[
  {
    "@timestamp": "2021-04-15T09:19:11.321+02:00",
    "@version": "1",
    "message": "my-consumer-group: partitions assigned: [MY_AWESOME_TOPIC-0, MY_AWESOME_TOPIC-1]",
    "logger_name": "org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$1",
    "thread_name": "KafkaConsumerDestination{consumerDestinationName='MY_AWESOME_TOPIC', partitions=0, dlqName='null'}.container-0-C-1",
    "level": "INFO",
    "level_value": 20000
  },
  {
    "whatever": "some message processing...."
  },
  {
    "@timestamp": "2021-04-15T09:19:21.226+02:00",
    "@version": "1",
    "message": "my-consumer-group: partitions assigned: [MY_AWESOME_TOPIC-0]",
    "logger_name": "org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$1",
    "thread_name": "KafkaConsumerDestination{consumerDestinationName='MY_AWESOME_TOPIC', partitions=0, dlqName='null'}.container-1-C-1",
    "level": "INFO",
    "level_value": 20000
  },
  {
    "@timestamp": "2021-04-15T09:19:21.227+02:00",
    "@version": "1",
    "message": "my-consumer-group: partitions assigned: [MY_AWESOME_TOPIC-1]",
    "logger_name": "org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$1",
    "thread_name": "KafkaConsumerDestination{consumerDestinationName='MY_AWESOME_TOPIC', partitions=0, dlqName='null'}.container-0-C-1",
    "level": "INFO",
    "level_value": 20000
  }
]

Any clue on why does this happens? It thrills me a bit that there is a first assignment and, 10 seconds later, the second thread joins, firing the rebalance, but T1 already started processing. Shouldn't all the N threads configured by the concurrency parameter start at the same time to avoid this?

garyrussell commented 3 years ago

I don't see the same behavior at all...

2021-04-15 09:32:52.599  INFO 90373 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$2  : kbgh1060: partitions assigned: [input-in-0-0]
2021-04-15 09:32:52.605  INFO 90373 --- [container-1-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$2  : kbgh1060: partitions assigned: [input-in-0-1]

What version are you using?

Dionakra commented 3 years ago

Hi Gary,

Spring Boot: 2.3.8.RELEASE Spring Cloud Stream Binder Kafka: 3.0.10.RELEASE

I have been testing a bit more and I found that the behaviour is not always the same, and I might have a clue. Here you have a more detailed log:

[
  {
    "@timestamp": "2021-04-16T08:51:34.017+02:00",
    "message": "[Consumer clientId=WHATEVER_CLIENT_ID-0, groupId=MY_CONSUMER_GROUP] Cluster ID: bElf9f2mRC2RHUd9tYx3gA",
    "logger_name": "org.apache.kafka.clients.Metadata",
    "thread_name": "KafkaConsumerDestination{consumerDestinationName='MY_AWESOME_TOPIC', partitions=0, dlqName='null'},.container-0-C-1"
  },
  {
    "@timestamp": "2021-04-16T08:51:34.076+02:00",
    "message": "[Consumer clientId=WHATEVER_CLIENT_ID-0, groupId=MY_CONSUMER_GROUP] Discovered group coordinator 127.0.0.1:9092 (id: 2147483645 rack: null)",
    "logger_name": "org.apache.kafka.clients.consumer.internals.AbstractCoordinator",
    "thread_name": "KafkaConsumerDestination{consumerDestinationName='MY_AWESOME_TOPIC', partitions=0, dlqName='null'},.container-0-C-1"
  },
  {
    "@timestamp": "2021-04-16T08:51:34.089+02:00",
    "message": "[Consumer clientId=WHATEVER_CLIENT_ID-0, groupId=MY_CONSUMER_GROUP] (Re-)joining group",
    "logger_name": "org.apache.kafka.clients.consumer.internals.AbstractCoordinator",
    "thread_name": "KafkaConsumerDestination{consumerDestinationName='MY_AWESOME_TOPIC', partitions=0, dlqName='null'},.container-0-C-1"
  },
  {
    "@timestamp": "2021-04-16T08:51:34.189+02:00",
    "message": "Tomcat started on port(s): 8080 (http) with context path ''",
    "logger_name": "org.springframework.boot.web.embedded.tomcat.TomcatWebServer",
    "thread_name": "main"
  },
  {
    "@timestamp": "2021-04-16T08:51:34.284+02:00",
    "message": "[Consumer clientId=WHATEVER_CLIENT_ID-1, groupId=MY_CONSUMER_GROUP] Cluster ID: bElf9f2mRC2RHUd9tYx3gA",
    "logger_name": "org.apache.kafka.clients.Metadata",
    "thread_name": "KafkaConsumerDestination{consumerDestinationName='MY_AWESOME_TOPIC', partitions=0, dlqName='null'},.container-1-C-1"
  },
  {
    "@timestamp": "2021-04-16T08:51:34.284+02:00",
    "message": "[Consumer clientId=WHATEVER_CLIENT_ID-1, groupId=MY_CONSUMER_GROUP] Discovered group coordinator 127.0.0.1:9092 (id: 2147483645 rack: null)",
    "logger_name": "org.apache.kafka.clients.consumer.internals.AbstractCoordinator",
    "thread_name": "KafkaConsumerDestination{consumerDestinationName='MY_AWESOME_TOPIC', partitions=0, dlqName='null'},.container-1-C-1"
  },
  {
    "@timestamp": "2021-04-16T08:51:34.293+02:00",
    "message": "[Consumer clientId=WHATEVER_CLIENT_ID-1, groupId=MY_CONSUMER_GROUP] (Re-)joining group",
    "logger_name": "org.apache.kafka.clients.consumer.internals.AbstractCoordinator",
    "thread_name": "KafkaConsumerDestination{consumerDestinationName='MY_AWESOME_TOPIC', partitions=0, dlqName='null'},.container-1-C-1"
  },
  {
    "@timestamp": "2021-04-16T08:51:34.391+02:00",
    "message": "[Consumer clientId=WHATEVER_CLIENT_ID-0, groupId=MY_CONSUMER_GROUP] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group",
    "logger_name": "org.apache.kafka.clients.consumer.internals.AbstractCoordinator",
    "thread_name": "KafkaConsumerDestination{consumerDestinationName='MY_AWESOME_TOPIC', partitions=0, dlqName='null'},.container-0-C-1"
  },
  {
    "@timestamp": "2021-04-16T08:51:34.391+02:00",
    "message": "[Consumer clientId=WHATEVER_CLIENT_ID-0, groupId=MY_CONSUMER_GROUP] (Re-)joining group",
    "logger_name": "org.apache.kafka.clients.consumer.internals.AbstractCoordinator",
    "thread_name": "KafkaConsumerDestination{consumerDestinationName='MY_AWESOME_TOPIC', partitions=0, dlqName='null'},.container-0-C-1"
  },
  {
    "@timestamp": "2021-04-16T08:51:34.478+02:00",
    "message": "[Consumer clientId=WHATEVER_CLIENT_ID-0, groupId=MY_CONSUMER_GROUP] Finished assignment for group at generation 691: {WHATEVER_CLIENT_ID-0-d77f2f19-c5ff-46ec-a8ed-8ddbb767d3d0=Assignment(partitions=[MY_AWESOME_TOPIC-0, MY_AWESOME_TOPIC-1, MY_AWESOME_TOPIC-2, MY_AWESOME_TOPIC-3, MY_AWESOME_TOPIC-4, MY_AWESOME_TOPIC-5, MY_AWESOME_TOPIC-6, MY_AWESOME_TOPIC-7, MY_AWESOME_TOPIC-8, MY_AWESOME_TOPIC-9, MY_AWESOME_TOPIC-10, MY_AWESOME_TOPIC-11, MY_AWESOME_TOPIC-12, MY_AWESOME_TOPIC-13, MY_AWESOME_TOPIC-14, MY_AWESOME_TOPIC-15, MY_AWESOME_TOPIC-16, MY_AWESOME_TOPIC-17, MY_AWESOME_TOPIC-18, MY_AWESOME_TOPIC-19])},",
    "logger_name": "org.apache.kafka.clients.consumer.internals.ConsumerCoordinator",
    "thread_name": "KafkaConsumerDestination{consumerDestinationName='MY_AWESOME_TOPIC', partitions=0, dlqName='null'},.container-0-C-1"
  },
  {
    "@timestamp": "2021-04-16T08:51:34.532+02:00",
    "message": "[Consumer clientId=WHATEVER_CLIENT_ID-0, groupId=MY_CONSUMER_GROUP] Successfully joined group with generation 691",
    "logger_name": "org.apache.kafka.clients.consumer.internals.AbstractCoordinator",
    "thread_name": "KafkaConsumerDestination{consumerDestinationName='MY_AWESOME_TOPIC', partitions=0, dlqName='null'},.container-0-C-1"
  },
  {
    "@timestamp": "2021-04-16T08:51:34.585+02:00",
    "message": "[Consumer clientId=WHATEVER_CLIENT_ID-0, groupId=MY_CONSUMER_GROUP] Adding newly assigned partitions: MY_AWESOME_TOPIC-0, MY_AWESOME_TOPIC-4, MY_AWESOME_TOPIC-3, MY_AWESOME_TOPIC-2, MY_AWESOME_TOPIC-1, MY_AWESOME_TOPIC-8, MY_AWESOME_TOPIC-7, MY_AWESOME_TOPIC-6, MY_AWESOME_TOPIC-5, MY_AWESOME_TOPIC-12, MY_AWESOME_TOPIC-11, MY_AWESOME_TOPIC-10, MY_AWESOME_TOPIC-9, MY_AWESOME_TOPIC-16, MY_AWESOME_TOPIC-15, MY_AWESOME_TOPIC-14, MY_AWESOME_TOPIC-13, MY_AWESOME_TOPIC-19, MY_AWESOME_TOPIC-18, MY_AWESOME_TOPIC-17",
    "logger_name": "org.apache.kafka.clients.consumer.internals.ConsumerCoordinator",
    "thread_name": "KafkaConsumerDestination{consumerDestinationName='MY_AWESOME_TOPIC', partitions=0, dlqName='null'},.container-0-C-1"
  },
  {
    "@timestamp": "2021-04-16T08:51:34.603+02:00",
    "message": "[Consumer clientId=WHATEVER_CLIENT_ID-0, groupId=MY_CONSUMER_GROUP] Setting offset for partition MY_AWESOME_TOPIC-0 to the committed offset FetchPosition{offset=474398, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[127.0.0.1:9092 (id: 2 rack: null)], epoch=21},},",
    "logger_name": "org.apache.kafka.clients.consumer.internals.ConsumerCoordinator",
    "thread_name": "KafkaConsumerDestination{consumerDestinationName='MY_AWESOME_TOPIC', partitions=0, dlqName='null'},.container-0-C-1"
  },
  {
    "@timestamp": "2021-04-16T08:51:34.603+02:00",
    "message": "Same line as above with all partitions",
    "logger_name": "org.apache.kafka.clients.consumer.internals.ConsumerCoordinator",
    "thread_name": "KafkaConsumerDestination{consumerDestinationName='MY_AWESOME_TOPIC', partitions=0, dlqName='null'},.container-0-C-1"
  },
  {
    "@timestamp": "2021-04-16T08:51:34.606+02:00",
    "message": "[Consumer clientId=WHATEVER_CLIENT_ID-1, groupId=MY_CONSUMER_GROUP] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group",
    "logger_name": "org.apache.kafka.clients.consumer.internals.AbstractCoordinator",
    "thread_name": "KafkaConsumerDestination{consumerDestinationName='MY_AWESOME_TOPIC', partitions=0, dlqName='null'},.container-1-C-1"
  },
  {
    "@timestamp": "2021-04-16T08:51:34.606+02:00",
    "message": "[Consumer clientId=WHATEVER_CLIENT_ID-1, groupId=MY_CONSUMER_GROUP] (Re-)joining group",
    "logger_name": "org.apache.kafka.clients.consumer.internals.AbstractCoordinator",
    "thread_name": "KafkaConsumerDestination{consumerDestinationName='MY_AWESOME_TOPIC', partitions=0, dlqName='null'},.container-1-C-1"
  },
  {
    "@timestamp": "2021-04-16T08:51:34.607+02:00",
    "message": "MY_CONSUMER_GROUP: partitions assigned: [MY_AWESOME_TOPIC-0, MY_AWESOME_TOPIC-4, MY_AWESOME_TOPIC-3, MY_AWESOME_TOPIC-2, MY_AWESOME_TOPIC-1, MY_AWESOME_TOPIC-8, MY_AWESOME_TOPIC-7, MY_AWESOME_TOPIC-6, MY_AWESOME_TOPIC-5, MY_AWESOME_TOPIC-12, MY_AWESOME_TOPIC-11, MY_AWESOME_TOPIC-10, MY_AWESOME_TOPIC-9, MY_AWESOME_TOPIC-16, MY_AWESOME_TOPIC-15, MY_AWESOME_TOPIC-14, MY_AWESOME_TOPIC-13, MY_AWESOME_TOPIC-19, MY_AWESOME_TOPIC-18, MY_AWESOME_TOPIC-17]",
    "logger_name": "org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$1",
    "thread_name": "KafkaConsumerDestination{consumerDestinationName='MY_AWESOME_TOPIC', partitions=0, dlqName='null'},.container-0-C-1"
  },
  {
    "@timestamp": "2021-04-16T08:51:37.584+02:00",
    "message": "[Consumer clientId=WHATEVER_CLIENT_ID-0, groupId=MY_CONSUMER_GROUP] Attempt to heartbeat failed since group is rebalancing",
    "logger_name": "org.apache.kafka.clients.consumer.internals.AbstractCoordinator",
    "thread_name": "KafkaConsumerDestination{consumerDestinationName='MY_AWESOME_TOPIC', partitions=0, dlqName='null'},.container-0-C-1"
  },
  {
    "@timestamp": "2021-04-16T08:51:37.585+02:00",
    "message": "[Consumer clientId=WHATEVER_CLIENT_ID-0, groupId=MY_CONSUMER_GROUP] Revoke previously assigned partitions MY_AWESOME_TOPIC-0, MY_AWESOME_TOPIC-4, MY_AWESOME_TOPIC-3, MY_AWESOME_TOPIC-2, MY_AWESOME_TOPIC-1, MY_AWESOME_TOPIC-8, MY_AWESOME_TOPIC-7, MY_AWESOME_TOPIC-6, MY_AWESOME_TOPIC-5, MY_AWESOME_TOPIC-12, MY_AWESOME_TOPIC-11, MY_AWESOME_TOPIC-10, MY_AWESOME_TOPIC-9, MY_AWESOME_TOPIC-16, MY_AWESOME_TOPIC-15, MY_AWESOME_TOPIC-14, MY_AWESOME_TOPIC-13, MY_AWESOME_TOPIC-19, MY_AWESOME_TOPIC-18, MY_AWESOME_TOPIC-17",
    "logger_name": "org.apache.kafka.clients.consumer.internals.ConsumerCoordinator",
    "thread_name": "KafkaConsumerDestination{consumerDestinationName='MY_AWESOME_TOPIC', partitions=0, dlqName='null'},.container-0-C-1"
  },
  {
    "@timestamp": "2021-04-16T08:51:37.585+02:00",
    "message": "MY_CONSUMER_GROUP: partitions revoked: [MY_AWESOME_TOPIC-0, MY_AWESOME_TOPIC-4, MY_AWESOME_TOPIC-3, MY_AWESOME_TOPIC-2, MY_AWESOME_TOPIC-1, MY_AWESOME_TOPIC-8, MY_AWESOME_TOPIC-7, MY_AWESOME_TOPIC-6, MY_AWESOME_TOPIC-5, MY_AWESOME_TOPIC-12, MY_AWESOME_TOPIC-11, MY_AWESOME_TOPIC-10, MY_AWESOME_TOPIC-9, MY_AWESOME_TOPIC-16, MY_AWESOME_TOPIC-15, MY_AWESOME_TOPIC-14, MY_AWESOME_TOPIC-13, MY_AWESOME_TOPIC-19, MY_AWESOME_TOPIC-18, MY_AWESOME_TOPIC-17]",
    "logger_name": "org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$1",
    "thread_name": "KafkaConsumerDestination{consumerDestinationName='MY_AWESOME_TOPIC', partitions=0, dlqName='null'},.container-0-C-1"
  },
  {
    "@timestamp": "2021-04-16T08:51:37.586+02:00",
    "message": "[Consumer clientId=WHATEVER_CLIENT_ID-0, groupId=MY_CONSUMER_GROUP] (Re-)joining group",
    "logger_name": "org.apache.kafka.clients.consumer.internals.AbstractCoordinator",
    "thread_name": "KafkaConsumerDestination{consumerDestinationName='MY_AWESOME_TOPIC', partitions=0, dlqName='null'},.container-0-C-1"
  },
  {
    "@timestamp": "2021-04-16T08:51:37.679+02:00",
    "message": "[Consumer clientId=WHATEVER_CLIENT_ID-0, groupId=MY_CONSUMER_GROUP] Finished assignment for group at generation 692: {WHATEVER_CLIENT_ID-0-d77f2f19-c5ff-46ec-a8ed-8ddbb767d3d0=Assignment(partitions=[MY_AWESOME_TOPIC-0, MY_AWESOME_TOPIC-1, MY_AWESOME_TOPIC-2, MY_AWESOME_TOPIC-3, MY_AWESOME_TOPIC-4, MY_AWESOME_TOPIC-5, MY_AWESOME_TOPIC-6, MY_AWESOME_TOPIC-7, MY_AWESOME_TOPIC-8, MY_AWESOME_TOPIC-9]), WHATEVER_CLIENT_ID-1-712c577a-2634-4481-b1e2-f128a7b1d89e=Assignment(partitions=[MY_AWESOME_TOPIC-10, MY_AWESOME_TOPIC-11, MY_AWESOME_TOPIC-12, MY_AWESOME_TOPIC-13, MY_AWESOME_TOPIC-14, MY_AWESOME_TOPIC-15, MY_AWESOME_TOPIC-16, MY_AWESOME_TOPIC-17, MY_AWESOME_TOPIC-18, MY_AWESOME_TOPIC-19])},",
    "logger_name": "org.apache.kafka.clients.consumer.internals.ConsumerCoordinator",
    "thread_name": "KafkaConsumerDestination{consumerDestinationName='MY_AWESOME_TOPIC', partitions=0, dlqName='null'},.container-0-C-1"
  },
  {
    "@timestamp": "2021-04-16T08:51:37.776+02:00",
    "message": "[Consumer clientId=WHATEVER_CLIENT_ID-1, groupId=MY_CONSUMER_GROUP] Successfully joined group with generation 692",
    "logger_name": "org.apache.kafka.clients.consumer.internals.AbstractCoordinator",
    "thread_name": "KafkaConsumerDestination{consumerDestinationName='MY_AWESOME_TOPIC', partitions=0, dlqName='null'},.container-1-C-1"
  },
  {
    "@timestamp": "2021-04-16T08:51:37.776+02:00",
    "message": "[Consumer clientId=WHATEVER_CLIENT_ID-0, groupId=MY_CONSUMER_GROUP] Successfully joined group with generation 692",
    "logger_name": "org.apache.kafka.clients.consumer.internals.AbstractCoordinator",
    "thread_name": "KafkaConsumerDestination{consumerDestinationName='MY_AWESOME_TOPIC', partitions=0, dlqName='null'},.container-0-C-1"
  },
  {
    "@timestamp": "2021-04-16T08:51:37.778+02:00",
    "message": "[Consumer clientId=WHATEVER_CLIENT_ID-1, groupId=MY_CONSUMER_GROUP] Adding newly assigned partitions: MY_AWESOME_TOPIC-12, MY_AWESOME_TOPIC-11, MY_AWESOME_TOPIC-10, MY_AWESOME_TOPIC-16, MY_AWESOME_TOPIC-15, MY_AWESOME_TOPIC-14, MY_AWESOME_TOPIC-13, MY_AWESOME_TOPIC-19, MY_AWESOME_TOPIC-18, MY_AWESOME_TOPIC-17",
    "logger_name": "org.apache.kafka.clients.consumer.internals.ConsumerCoordinator",
    "thread_name": "KafkaConsumerDestination{consumerDestinationName='MY_AWESOME_TOPIC', partitions=0, dlqName='null'},.container-1-C-1"
  },
  {
    "@timestamp": "2021-04-16T08:51:37.778+02:00",
    "message": "[Consumer clientId=WHATEVER_CLIENT_ID-0, groupId=MY_CONSUMER_GROUP] Adding newly assigned partitions: MY_AWESOME_TOPIC-9, MY_AWESOME_TOPIC-0, MY_AWESOME_TOPIC-4, MY_AWESOME_TOPIC-3, MY_AWESOME_TOPIC-2, MY_AWESOME_TOPIC-1, MY_AWESOME_TOPIC-8, MY_AWESOME_TOPIC-7, MY_AWESOME_TOPIC-6, MY_AWESOME_TOPIC-5",
    "logger_name": "org.apache.kafka.clients.consumer.internals.ConsumerCoordinator",
    "thread_name": "KafkaConsumerDestination{consumerDestinationName='MY_AWESOME_TOPIC', partitions=0, dlqName='null'},.container-0-C-1"
  },
  {
    "@timestamp": "2021-04-16T08:51:37.786+02:00",
    "message": "MY_CONSUMER_GROUP: partitions assigned: [MY_AWESOME_TOPIC-9, MY_AWESOME_TOPIC-0, MY_AWESOME_TOPIC-4, MY_AWESOME_TOPIC-3, MY_AWESOME_TOPIC-2, MY_AWESOME_TOPIC-1, MY_AWESOME_TOPIC-8, MY_AWESOME_TOPIC-7, MY_AWESOME_TOPIC-6, MY_AWESOME_TOPIC-5]",
    "logger_name": "org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$1",
    "thread_name": "KafkaConsumerDestination{consumerDestinationName='MY_AWESOME_TOPIC', partitions=0, dlqName='null'},.container-0-C-1"
  },
  {
    "@timestamp": "2021-04-16T08:51:37.787+02:00",
    "message": "MY_CONSUMER_GROUP: partitions assigned: [MY_AWESOME_TOPIC-12, MY_AWESOME_TOPIC-11, MY_AWESOME_TOPIC-10, MY_AWESOME_TOPIC-16, MY_AWESOME_TOPIC-15, MY_AWESOME_TOPIC-14, MY_AWESOME_TOPIC-13, MY_AWESOME_TOPIC-19, MY_AWESOME_TOPIC-18, MY_AWESOME_TOPIC-17]",
    "logger_name": "org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$1",
    "thread_name": "KafkaConsumerDestination{consumerDestinationName='MY_AWESOME_TOPIC', partitions=0, dlqName='null'},.container-1-C-1"
  }
]

As you might see, there is a group joining failure due to a lack of member.id, so, based on KIP-394, it happens when the client does not provide a member.id, so the broker gives it to the client for it to be able to join a consumer group.

As per the log, we can see that both threads ask for a member.id, and one of them gets the response a bit later than the first one, causing Thread-0 to join first, getting all 20 partitions for itself, and when Thread-1 finally joins a group, it causes a rebalance.

EDIT: The joining failure due to a lack of member.id seems to be a normal thing due to the handshake for it, as it is always the broker who provides it and, in Kafka 2.7, the message will be moved to DEBUG.

So, might it be a cause of different threads getting responses at different times causing a race condition? If so, is there a possibility to synchronize group membership between threads?

garyrussell commented 3 years ago

You could try increasing group.initial.rebalance.delay.ms on the brokers. I have never seen this problem with the default 3 seconds.

BTW, I asked them to remove that log since it's a normal occurrence when a consumer first starts. https://issues.apache.org/jira/browse/KAFKA-10802

Dionakra commented 3 years ago

Thanks a lot Gary, I will into that!

Just before closing the issue. I read a blog post by Confluent about handling rebalances on a multi-threaded scenario, on which they talk about waiting for in-flight requests to be ACK'd (or timed out) and commiting the ACK'd offsets inside the onPartitionsRevoked method. Do you know if this library provides something like that out of the box as it seems quite standard? If not, I will add the code featured in the blog post.

garyrussell commented 3 years ago

Spring's behavior depends on the container AckMode. With AckMode.BATCH (the default), any pending offsets, for already processed records, are committed in onPartitionsRevoked; with AckMode.RECORD, commits are done immediately after processing each record, so there is nothing to do in onPartitionsRevoked since there is nothing pending.

This is not something the application needs to worry about.

Dionakra commented 3 years ago

Thanks Gary!

So, when having AckMode set to AckMode.BATCH, for pending offsets (sent but not acknowledged), I guess those records, not being considered as processed, are not commited, hence those records being processed again on the new container / thread, causing possible duplicates, am I right?

garyrussell commented 3 years ago

No; for a "normal" rebalance (new member joins), you should not see any duplicates. You can confirm this by running 2 copies of this example, with concurrency set to 1:

spring.cloud.stream.bindings.input-in-0.consumer.concurrency=1
spring.cloud.stream.bindings.input-in-0.group=kbgh1060
spring.cloud.stream.bindings.input-in-0.destination=kbgh1060

spring.cloud.stream.kafka.binder.min-partition-count=2
spring.cloud.stream.kafka.binder.auto-add-partitions=true
@SpringBootApplication
public class Kbgh1060Application {

    public static void main(String[] args) {
        SpringApplication.run(Kbgh1060Application.class, args);
    }

    @Bean
    public Consumer<Message<String>> input() {
        return msg -> {
            System.out.println(msg.getPayload() + ":" + msg.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID)
                    + "@" + msg.getHeaders().get(KafkaHeaders.OFFSET));
            try {
                System.in.read();
            }
            catch (IOException e) {
                e.printStackTrace();
            }
        };
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
        return args -> {
            IntStream.range(0, 10).forEach(i -> template.send("kbgh1060", i%2, null,
                    (new Date() + "foo" + i).getBytes()));
        };
    }

}

Run the first copy and wait for the first message

Mon Apr 19 10:20:20 EDT 2021foo1:1@0

Run the second copy and you will start seeing Attempt to heartbeat failed since group is rebalancing in the first copy.

Hit <Enter> 10 times in the first copy (to complete the batch) and only then will the rebalance occur. There is nothing to do in the onPartitionsRevoked() because the offsets are already committed. You will then get 5 records in each copy (sent by the second copy). No duplicates.

You can get duplicates for a rebalance that occurs if your application takes too long to process a batch. You need to adjust max.poll.records and max.poll.interval.ms to avoid that.

Dionakra commented 3 years ago

Thanks Gary, I will tune those parameters to avoid duplicates. I am closing the issue. Again, thanks a lot!