apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.14k stars 3.57k forks source link

KeyShared mode bug #20813

Open jinxiaoyi opened 1 year ago

jinxiaoyi commented 1 year ago

Search before asking

Version

pulsar-clinet 2.10.4 pulsar-server 2.10.1

Minimal reproduce step

i use pulsar-manage create topic with 2 partitions i start java application with 5 topics(topic1,topic2,topic3,topic4,topic5) one topic use one consumer set listenerThreads=10 set topic1 mode=KeyShared topic1 bind consumer1 use messageListener i send 50 messages to topic1

What did you expect to see?

in key-shared mode 2 threads works because topic1 use one consumer with 2 partitions。 48 messages should wait

What did you see instead?

10 threads works at the same time。 40messages wait in shared mode 2 threads works at the same time。 48messages wait

Anything else?

No response

Are you willing to submit a PR?

jinxiaoyi commented 1 year ago

for (int i = 0; i < 200; i++) { stringProducer.newMessage().key(UUID.randomUUID().toString()).value("aaaa").send(); }

Technoboy- commented 1 year ago

for (int i = 0; i < 200; i++) { stringProducer.newMessage().key(UUID.randomUUID().toString()).value("aaaa").send(); }

Do you want to fix it ?

jinxiaoyi commented 1 year ago

for (int i = 0; i < 200; i++) { stringProducer.newMessage().key(UUID.randomUUID().toString()).value("aaaa").send(); }

Do you want to fix it ? in consumerBase.class private void triggerListener() { this.internalPinnedExecutor.execute(() -> { while(true) { try { Message msg = this.internalReceive(0L, TimeUnit.MILLISECONDS); if (msg != null) { if (SubscriptionType.Key_Shared == this.conf.getSubscriptionType()) { this.executorProvider.getExecutor(this.peekMessageKey(msg)).execute(() -> { this.callMessageListener(msg); }); } else { this.getExternalExecutor(msg).execute(() -> { this.callMessageListener(msg); }); } } else if (log.isDebugEnabled()) { log.debug("[{}] [{}] Message has been cleared from the queue", this.topic, this.subscription); }

                if (msg != null) {
                    continue;
                }
            } catch (PulsarClientException var3) {
                log.warn("[{}] [{}] Failed to dequeue the message for listener", new Object[]{this.topic, this.subscription, var3});
            }

            return;
        }
    });
}

may i delete if (SubscriptionType.Key_Shared == this.conf.getSubscriptionType()) ?

Technoboy- commented 1 year ago

for (int i = 0; i < 200; i++) { stringProducer.newMessage().key(UUID.randomUUID().toString()).value("aaaa").send(); }

Do you want to fix it ? in consumerBase.class private void triggerListener() { this.internalPinnedExecutor.execute(() -> { while(true) { try { Message msg = this.internalReceive(0L, TimeUnit.MILLISECONDS); if (msg != null) { if (SubscriptionType.Key_Shared == this.conf.getSubscriptionType()) { this.executorProvider.getExecutor(this.peekMessageKey(msg)).execute(() -> { this.callMessageListener(msg); }); } else { this.getExternalExecutor(msg).execute(() -> { this.callMessageListener(msg); }); } } else if (log.isDebugEnabled()) { log.debug("[{}] [{}] Message has been cleared from the queue", this.topic, this.subscription); }

                if (msg != null) {
                    continue;
                }
            } catch (PulsarClientException var3) {
                log.warn("[{}] [{}] Failed to dequeue the message for listener", new Object[]{this.topic, this.subscription, var3});
            }

            return;
        }
    });
}

may i delete if (SubscriptionType.Key_Shared == this.conf.getSubscriptionType()) ?

Seems not right

jinxiaoyi commented 1 year ago

for (int i = 0; i < 200; i++) { stringProducer.newMessage().key(UUID.randomUUID().toString()).value("aaaa").send(); }

Do you want to fix it ? in consumerBase.class private void triggerListener() { this.internalPinnedExecutor.execute(() -> { while(true) { try { Message msg = this.internalReceive(0L, TimeUnit.MILLISECONDS); if (msg != null) { if (SubscriptionType.Key_Shared == this.conf.getSubscriptionType()) { this.executorProvider.getExecutor(this.peekMessageKey(msg)).execute(() -> { this.callMessageListener(msg); }); } else { this.getExternalExecutor(msg).execute(() -> { this.callMessageListener(msg); }); } } else if (log.isDebugEnabled()) { log.debug("[{}] [{}] Message has been cleared from the queue", this.topic, this.subscription); }

                if (msg != null) {
                    continue;
                }
            } catch (PulsarClientException var3) {
                log.warn("[{}] [{}] Failed to dequeue the message for listener", new Object[]{this.topic, this.subscription, var3});
            }

            return;
        }
    });
}

may i delete if (SubscriptionType.Key_Shared == this.conf.getSubscriptionType()) ?

Seems not right

我额外发了一个邮件到你邮箱。 你能看看吗?

github-actions[bot] commented 1 year ago

The issue had no activity for 30 days, mark with Stale label.