streamnative / pulsar-archived

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org
Apache License 2.0
72 stars 25 forks source link

ISSUE-14086: Cannot receive all messages from a partition via internal consumer handling that partition #3679

Open sijie opened 2 years ago

sijie commented 2 years ago

Original Issue: apache/pulsar#14086


Describe the bug I use an internal consumer received at consumer event listener becameActive(Consumer<?> consumer, int i) to consume message from the partition i. I got number of received messages that is very unidentical to number of produced messages.

To Reproduce Steps to reproduce the behavior: I leave my test program below.

Test steps:

  1. Start pulsar container
  2. Create partitioned topic with number of partitions equal to 1
  3. Subscribe to partitioned topic with failover subscription type and consumer event listener
  4. Wait sufficient time until pulsar broker notified consumer event listener about an internal consumer became active on partition 0
  5. Start receiving message via the given consumer on another thread, it will be forever running
  6. Create a partitioned producer then produces messages to topic

Expected behavior After received the last input message, number of received messages should identical to number of produced messages

Desktop (please complete the following information):

Additional context

import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.*;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.PulsarContainer;
import org.testcontainers.utility.DockerImageName;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

@Slf4j
public class InternalConsumerTest {
    final static String TOPIC = "my-topic";

    static void setup(String restUrl) throws PulsarClientException, PulsarAdminException {
        log.info("Start setup");
        try (PulsarAdmin admin = PulsarAdmin.builder()
                .serviceHttpUrl(restUrl)
                .build();
        ) {
            admin.topics().createPartitionedTopic(TOPIC, 1);
        }
        log.info("Setup finished");
    }

    static void subscribeToTopic(PulsarClient client, EventListener eventListener, String topic, String subscription) {
        try {
            client.newConsumer(Schema.STRING)
                    .topic(topic)
                    .subscriptionName(subscription)
                    .consumerEventListener(eventListener)
                    .subscriptionType(SubscriptionType.Failover)
                    .subscribe();
        } catch (PulsarClientException e) {
            log.error("Failed to instantiate consumer on topic: {}", TOPIC, e);
        }
    }

    @Test
    void test() throws InterruptedException, PulsarAdminException, PulsarClientException {
        DockerImageName PULSAR_IMAGE = DockerImageName.parse("apachepulsar/pulsar:2.9.1");
        try (PulsarContainer pulsar = new PulsarContainer(PULSAR_IMAGE)
        ) {
            pulsar.start();
            Thread.sleep(TimeUnit.SECONDS.toMillis(2));
            setup(pulsar.getHttpServiceUrl());
            try (PulsarClient client = PulsarClient.builder()
                    .serviceUrl(pulsar.getPulsarBrokerUrl())
                    .build()) {
                Manager manager = new Manager();
                EventListener eventListener = new EventListener(manager);
                subscribeToTopic(client, eventListener, TOPIC, "sub-1");
                while (!manager.consumingStarted.get()) {
                    Thread.sleep(TimeUnit.SECONDS.toMillis(1));
                }
                // sleep another 1 sec after internal consumer started consuming
                Thread.sleep(TimeUnit.SECONDS.toMillis(1));
                // create partitioned producer then produce messages to topic
                Runnable producerTask = new Thread(producerTask(client));
                producerTask.run();
                // forever sleep
                while (true) {
                    Thread.sleep(TimeUnit.MINUTES.toMillis(1));
                }
            }
        }
    }

    static Runnable producerTask(PulsarClient client) {
        return () -> {
            try (Producer<String> producer = client.newProducer(Schema.STRING)
                    .topic(TOPIC)
                    .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
                    .create()) {
                log.info("Start producing");
                String[] keys = {"1111", "2222", "3333"};
                int k = 0;
                for (int i = 0; i < 90; i++) {
                    if (k == 3) {
                        k = 0;
                    }
                    producer.newMessage()
                            .key(keys[k])
                            .value(String.valueOf(i))
                            .send();
                    k++;
                }

              producer.newMessage()
                      .key(keys[0])
                      .value(String.valueOf(-1))
                      .send();
                log.info("Producing finished");
            } catch (PulsarClientException e) {
                log.info("Error", e);
            }
        };
    }

    static Runnable consumerTask(Consumer<?> consumer, AtomicBoolean consumingStarted) {
        return () -> {
            String s = "";
            int n = 0;
            log.info("Start receiving message from topic {}", consumer.getTopic());
            consumingStarted.set(true);
            while (!s.equals("-1")) { // if message not equal to last input message
                try {
                    Message<?> msg = consumer.receive(1, TimeUnit.MINUTES);
                    if (msg != null) {
                        s = new String(msg.getData());
                        n++;
                        log.info("Received message: {}", s);
                        consumer.acknowledge(msg);
                    }
                } catch (PulsarClientException e) {
                    log.error("Error", e);
                }
            }
            log.info("Number of received messages: {}", n);
        };
    }

    static class EventListener implements ConsumerEventListener {
        private final Manager manager;

        EventListener(Manager manager) {
            this.manager = manager;
        }

        @Override
        public void becameActive(Consumer<?> consumer, int i) {
            log.info("Consumer {name: {}, hashcode: {}} became active on partition {}", consumer.getConsumerName(), consumer.hashCode(), i);
            manager.run(consumer, i);
        }

        @Override
        public void becameInactive(Consumer<?> consumer, int i) {
            log.info("Consumer {name: {}, hashcode: {}} became inactive on partition {}", consumer.getConsumerName(), consumer.hashCode(), i);
        }
    }

    static class Manager {
        Map<Integer, Thread> runningMap = new HashMap<>();
        AtomicBoolean consumingStarted = new AtomicBoolean(false);

        void run(Consumer<?> consumer, int i) {
            if (runningMap.containsKey(i)) {
                return;
            }
            Thread thread = new Thread(consumerTask(consumer, consumingStarted));
            thread.start();
            runningMap.put(i, thread);
        }
    }
}
github-actions[bot] commented 2 years ago

The issue had no activity for 30 days, mark with Stale label.