apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.17k stars 3.57k forks source link

Producer can‘t continue sending messages after all brokers are restarted #15559

Open xuesongxs opened 2 years ago

xuesongxs commented 2 years ago

Describe the bug Pulsar v2.8.1 Pulsar cluster: 3 brokers Producer can‘t continue sending messages after all brokers are restarted.

To Reproduce Steps to reproduce the behavior:

  1. Create producer

    public class PulsarProducerDemo3 {
    // 连接集群 broker
    private static String localClusterUrl = "pulsar://127.0.0.1:6650,127.0.0.1:6651,127.0.0.1:6652";
    
    public static void main(String[] args) {
        try {
            Producer<String> producer = getProducer();
    
            Long start = System.currentTimeMillis();
            int i = 0;
            while (i < 50000) {
                producer.send(i + "");
                System.out.println("send msg:" + i + "");
                i++;
            }
        } catch (Exception e) {
            System.err.println("send fail:" + e);
        }
    }
    
    public static Producer<String> getProducer() throws Exception {
        PulsarClient client;
        Map<String, Object> prop = new HashMap<>();
        prop.put("topicName", "persistent://public/default/test-string3");
        client = PulsarClient.builder().serviceUrl(localClusterUrl).build();
        Producer<String> producer = client.newProducer(Schema.STRING)
                .loadConf(prop)
                .create();
        return producer;
    }
    }
  2. Run producer
  3. After sending 100 messages, stop all brokers
  4. See producer's log
    [pulsar-timer-5-1] INFO org.apache.pulsar.client.impl.ConnectionHandler - [persistent://public/default/test-string3-partition-2] [pulsar-cluster-10-0] Reconnecting after connection was closed
    [pulsar-client-io-1-1] WARN org.apache.pulsar.client.impl.ConnectionPool - Failed to open connection to 127.0.0.1:6650 : org.apache.pulsar.shade.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /127.0.0.1:6650
    [pulsar-client-io-1-1] WARN org.apache.pulsar.client.impl.ConnectionHandler - [persistent://public/default/test-string3-partition-2] [pulsar-cluster-10-0] Error connecting to broker: org.apache.pulsar.client.api.PulsarClientException: java.util.concurrent.CompletionException: org.apache.pulsar.shade.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /127.0.0.1:6650
    [pulsar-client-io-1-1] WARN org.apache.pulsar.client.impl.ConnectionHandler - [persistent://public/default/test-string3-partition-2] [pulsar-cluster-10-0] Could not get connection to broker: org.apache.pulsar.client.api.PulsarClientException: java.util.concurrent.CompletionException: org.apache.pulsar.shade.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /127.0.0.1:6650 -- Will try again in 1.496 s
    [pulsar-timer-5-1] INFO org.apache.pulsar.client.impl.ConnectionHandler - [persistent://public/default/test-string3-partition-2] [pulsar-cluster-10-0] Reconnecting after connection was closed
    [pulsar-client-io-1-1] WARN org.apache.pulsar.client.impl.ConnectionPool - Failed to open connection to 172.32.149.123:16650 : org.apache.pulsar.shade.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /172.32.149.123:16650
    [pulsar-client-io-1-1] WARN org.apache.pulsar.client.impl.ConnectionHandler - [persistent://public/default/test-string3-partition-2] [pulsar-cluster-10-0] Error connecting to broker: org.apache.pulsar.client.api.PulsarClientException: java.util.concurrent.CompletionException: org.apache.pulsar.shade.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /127.0.0.1:6651
    [pulsar-client-io-1-1] WARN org.apache.pulsar.client.impl.ConnectionHandler - [persistent://public/default/test-string3-partition-2] [pulsar-cluster-10-0] Could not get connection to broker: org.apache.pulsar.client.api.PulsarClientException: java.util.concurrent.CompletionException: org.apache.pulsar.shade.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /127.0.0.1:6651 -- Will try again in 3.193 s
    [pulsar-timer-5-1] INFO org.apache.pulsar.client.impl.ConnectionHandler - [persistent://public/default/test-string3-partition-1] [pulsar-cluster-11-0] Reconnecting after connection was closed
    [pulsar-client-io-1-1] WARN org.apache.pulsar.client.impl.ConnectionPool - Failed to open connection to 127.0.0.1:6652 : org.apache.pulsar.shade.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /127.0.0.1:6652
    [pulsar-client-io-1-1] WARN org.apache.pulsar.client.impl.ConnectionHandler - [persistent://public/default/test-string3-partition-1] [pulsar-cluster-11-0] Error connecting to broker: org.apache.pulsar.client.api.PulsarClientException: java.util.concurrent.CompletionException: org.apache.pulsar.shade.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /127.0.0.1:6652
    [pulsar-client-io-1-1] WARN org.apache.pulsar.client.impl.ConnectionHandler - [persistent://public/default/test-string3-partition-1] [pulsar-cluster-11-0] Could not get connection to broker: org.apache.pulsar.client.api.PulsarClientException: java.util.concurrent.CompletionException: org.apache.pulsar.shade.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /127.0.0.1:6652 -- Will try again in 2.97 s
  5. Start all brokers
  6. See producer's log
    [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConnectionPool - [[id: 0x7fa30072, L:/127.0.0.1:50232 - R:/127.0.0.1:6650]] Connected to server
    [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [persistent://public/default/test-string3-partition-2] [pulsar-cluster-14-0] Creating producer on cnx [id: 0x7fa30072, L:/127.0.0.1:50232 - R:/127.0.0.1:16651]
    [pulsar-timer-5-1] INFO org.apache.pulsar.client.impl.ConnectionHandler - [persistent://public/default/test-string3-partition-0] [pulsar-cluster-14-1] Reconnecting after connection was closed
    [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConnectionPool - [[id: 0xa49808e4, L:/127.0.0.1:58935 - R:/127.0.0.1:6651]] Connected to server
    [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [persistent://public/default/test-string3-partition-0] [pulsar-cluster-14-1] Creating producer on cnx [id: 0xa49808e4, L:/127.0.0.1:58935 - R:/127.0.0.1:6652]

    Creating producer success, but producer can‘t continue sending messages after all brokers are restarted.

  7. Jstack producer's pid

Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.251-b08 mixed mode):

"Attach Listener" #13 daemon prio=9 os_prio=0 tid=0x00007fc510001000 nid=0x4dc2 waiting on condition [0x00000000 00000000] java.lang.Thread.State: RUNNABLE

"DestroyJavaVM" #12 prio=5 os_prio=0 tid=0x00007fc53c009800 nid=0x4c40 waiting on condition [0x0000000000000000] java.lang.Thread.State: RUNNABLE

"pulsar-external-listener-3-1" #11 prio=5 os_prio=0 tid=0x00007fc4f40b3800 nid=0x4c95 waiting on condition [0x00 007fc5185f5000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method)

"pulsar-timer-5-1" #10 prio=5 os_prio=0 tid=0x00007fc4f4065800 nid=0x4c4c waiting on condition [0x00007fc5192170 00] java.lang.Thread.State: TIMED_WAITING (sleeping) at java.lang.Thread.sleep(Native Method) at org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$Worker.waitForNextTick(HashedWheelTimer.java:5 66) at org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:462) at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable. java:30) at java.lang.Thread.run(Thread.java:748)

"pulsar-client-io-1-1" #8 prio=5 os_prio=0 tid=0x00007fc53c40b800 nid=0x4c4b runnable [0x00007fc52c18a000] java.lang.Thread.State: RUNNABLE at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93) at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)

"Service Thread" #7 daemon prio=9 os_prio=0 tid=0x00007fc53c0d4000 nid=0x4c49 runnable [0x0000000000000000] java.lang.Thread.State: RUNNABLE

"C1 CompilerThread1" #6 daemon prio=9 os_prio=0 tid=0x00007fc53c0b7800 nid=0x4c48 waiting on condition [0x000000 0000000000] java.lang.Thread.State: RUNNABLE

"C2 CompilerThread0" #5 daemon prio=9 os_prio=0 tid=0x00007fc53c0b4800 nid=0x4c47 waiting on condition [0x000000 0000000000] java.lang.Thread.State: RUNNABLE



**Expected behavior**
Producer can continue sending messages after all brokers are restarted #15559
Technoboy- commented 2 years ago

From your demo, when the brokers restart, the client throws exception. so it can't send anymore.

github-actions[bot] commented 2 years ago

The issue had no activity for 30 days, mark with Stale label.