spring-projects / spring-kafka

Provides Familiar Spring Abstractions for Apache Kafka
https://projects.spring.io/spring-kafka
Apache License 2.0
2.19k stars 1.56k forks source link

childContainer of previous run is stopping ConcurrentContainer after a new start #3448

Open LokeshAlamuri opened 2 months ago

LokeshAlamuri commented 2 months ago

In what version(s) of Spring for Apache Kafka are you seeing this issue?

3.3

Describe the bug

If a ConcurrentContainer is stopped, then the child container should not be allowed to stop ConcurrentContainer. But, there are some scenarios where it is possible.

Scenario:

Concurrency: 2

ConcurrentContainer:: CMain Child containers: C0, C1.

1) ConcurrentContainer started.

CMain -- running.
C0 -- running.
C1 -- running.

2) ConcurrentContainer stopped.

CMain -- not running.
C0 -- delinked. (message processing is happening)
C1 -- delinked.

3) ConcurrentContainer started. This is permitted since stop is called before. Nothing wrong here. It should be allowed. Only the practise is not correct.

CMain -- running.
C2 -- running.
C3 -- running.

C0 -- delinked. (message processing is happening)

4) C0 has thrown error while processing. This would stop the running ConcurrentContainer !!!!

CMain -- not running.
C2 -- delinked.
C3 -- delinked.

C0 -- delinked. 

To Reproduce

@Test
    public void testFencedContainerFailed() throws Exception {
        this.logger.info("Start testFencedContainerFailed");
        Map<String, Object> props = KafkaTestUtils.consumerProps("test1", "true",
                embeddedKafka);
        AtomicReference<Properties> overrides = new AtomicReference<>();
        DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(props) {
            @Override
            protected Consumer<Integer, String> createKafkaConsumer(String groupId, String clientIdPrefix,
                                                                    String clientIdSuffixArg, Properties properties) {
                overrides.set(properties);
                return super.createKafkaConsumer(groupId, clientIdPrefix, clientIdSuffixArg, properties);
            }
        };
        ContainerProperties containerProps = new ContainerProperties(topic1);
        containerProps.setLogContainerConfig(true);
        containerProps.setClientId("client");
        containerProps.setAckMode(ContainerProperties.AckMode.RECORD);

        final CountDownLatch secondRunLatch = new CountDownLatch(5);
        final Set<String> listenerThreadNames = new ConcurrentSkipListSet<>();
        final List<String> payloads = new ArrayList<>();
        final CountDownLatch processingLatch = new CountDownLatch(1);
        final CountDownLatch firstLatch = new CountDownLatch(1);

        AtomicBoolean first = new AtomicBoolean(true);

        containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
            if (first.getAndSet(false)) {
                try {
                    firstLatch.await(100, TimeUnit.SECONDS);
                    throw new NullPointerException();
                }
                catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            ConcurrentMessageListenerContainerTests.this.logger.info("auto: " + message);
            listenerThreadNames.add(Thread.currentThread().getName());
            payloads.add(message.value());
            secondRunLatch.countDown();
            processingLatch.countDown();
        });

        ConcurrentMessageListenerContainer<Integer, String> container =
                new ConcurrentMessageListenerContainer<>(cf, containerProps);
        container.setConcurrency(2);
        container.setBeanName("testAuto");
        container.setChangeConsumerThreadName(true);
        container.setCommonErrorHandler(new CommonContainerStoppingErrorHandler());

        BlockingQueue<KafkaEvent> events = new LinkedBlockingQueue<>();
        CountDownLatch concurrentContainerStopLatch = new CountDownLatch(1);
        CountDownLatch consumerStoppedEventLatch = new CountDownLatch(1);

        container.setApplicationEventPublisher(e -> {
            events.add((KafkaEvent) e);
            if (e instanceof ConcurrentContainerStoppedEvent) {
                concurrentContainerStopLatch.countDown();
            }
            if (e instanceof ConsumerStoppedEvent) {
                consumerStoppedEventLatch.countDown();
            }
        });

        CountDownLatch interceptedSecondRun = new CountDownLatch(5);
        container.setRecordInterceptor((record, consumer) -> {
            interceptedSecondRun.countDown();
            return record;
        });

        container.start();

        MessageListenerContainer childContainer0 = container.getContainers().get(0);
        MessageListenerContainer childContainer1 = container.getContainers().get(1);

        ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
        assertThat(container.getAssignedPartitions()).hasSize(2);
        Map<String, Collection<TopicPartition>> assignments = container.getAssignmentsByClientId();
        assertThat(assignments).hasSize(2);
        assertThat(assignments.get("client-0")).isNotNull();
        assertThat(assignments.get("client-1")).isNotNull();

        Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
        ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
        KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
        template.setDefaultTopic(topic1);
        template.sendDefault(0, 0, "foo");
        template.sendDefault(1, 2, "bar");
        template.sendDefault(0, 0, "baz");
        template.sendDefault(1, 2, "qux");
        template.flush();

        assertThat(container.metrics()).isNotNull();
        assertThat(container.isInExpectedState()).isTrue();
        assertThat(childContainer0.isRunning()).isTrue();
        assertThat(childContainer1.isRunning()).isTrue();
        assertThat(container.isChildRunning()).isTrue();

        assertThat(processingLatch.await(60, TimeUnit.SECONDS)).isTrue();

        container.stop();

        assertThat(container.isChildRunning()).isTrue();
        assertThat(container.isRunning()).isFalse();
        assertThat(childContainer0.isRunning()).isFalse();
        assertThat(childContainer1.isRunning()).isFalse();

        assertThat(consumerStoppedEventLatch.await(30, TimeUnit.SECONDS)).isTrue();

        assertThat(container.isChildRunning()).isTrue();

        assertThat(listenerThreadNames).containsAnyOf("testAuto-0", "testAuto-1");

        assertThat(concurrentContainerStopLatch.await(30, TimeUnit.SECONDS)).isFalse();

        template.sendDefault(0, 0, "FOO");
        template.sendDefault(1, 2, "BAR");
        template.sendDefault(0, 0, "BAZ");
        template.sendDefault(1, 2, "QUX");
        template.flush();

        // permitted since stop is called prior.
        container.start();

        assertThat(container.isRunning()).isTrue();
        assertThat(container.getContainers().stream().allMatch(containerL -> containerL.isRunning()))
                .isTrue();

        firstLatch.countDown();

        //Running container is stopped!!!!!!!!
        assertThat(concurrentContainerStopLatch.await(30, TimeUnit.SECONDS)).isTrue();

        assertThat(container.isRunning()).isFalse();
        assertThat(container.getContainers().stream().anyMatch(containerL -> containerL.isRunning()))
                .isFalse();

        this.logger.info("Stop testFencedContainerFailed");
    }

Please suggest if this is a valid scenario.

artembilan commented 2 months ago

What are your 1, 2, 3, 4? Different scenarios, or steps, or states of the same scenario? Isn't this a result of all the changes you have introduced recently?

C0 has thrown error while processing. This would stop the running ConcurrentContainer !!!!

This indeed must not happen. The failure in one child container must not effect all others.

Does it happen even in previous versions, even before your recent changes?

LokeshAlamuri commented 2 months ago

Isn't this a result of all the changes you have introduced recently?

I have not introduced any bugs. Current bug is a different one. It is existing in previous versions also. I have provided JUnit to replicate this scenario. Please review and suggest if this a valid scenario and needs to be fixed.

LokeshAlamuri commented 2 months ago

What are your 1, 2, 3, 4? Different scenarios, or steps, or states of the same scenario? Isn't this a result of all the changes you have introduced recently?

These are steps to be followed to get the overview of bug. These are not different scenarios. Please let me know, if I have to provide more information regarding this issue.

This bug is not because of my changes.

artembilan commented 2 months ago

No problem!

I will need more time to investigate this. We may ask if @sobychacko has some cycles to look into this quicker. Thanks

LokeshAlamuri commented 2 months ago

If you are ok, I will try to fix the issue and provide the PR. I am having some idea on it.