Closed srouthu1 closed 3 years ago
in DeadLetterTopicTest
I add a testcase
@Test(groups = "quarantine")
public void testDeadLetterKeyTopic() throws Exception {
final String topic = "persistent://my-property/my-ns/dead-letter-topic";
final int maxRedeliveryCount = 1;
final int sendMessages = 1;
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Key_Shared)
.keySharedPolicy(KeySharedPolicy.autoSplitHashRange())
.ackTimeout(1, TimeUnit.SECONDS)
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
.receiverQueueSize(100)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
@Cleanup
PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
Consumer<byte[]> deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES)
.topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ")
.subscriptionName("my-subscription")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topic)
.create();
for (int i = 0; i < sendMessages; i++) {
producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
}
producer.close();
int totalReceived = 0;
do {
Message<byte[]> message = consumer.receive();
log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
totalReceived++;
} while (totalReceived < sendMessages * (maxRedeliveryCount + 10));
}
It seems can not reproduce this issue.
08:52:16.839 [pulsar-web-29-4] INFO org.eclipse.jetty.server.RequestLog - 127.0.0.1 - - [14/Aug/2021:08:52:16 +0800] "GET /admin/v2/persistent/my-property/my-ns/dead-letter-topic/partitions?checkAllowAutoCreation=true HTTP/1.1" 200 16 "-" "Pulsar-Java-v2.9.0-SNAPSHOT" 164
08:52:16.965 [metadata-store-8-1] INFO org.apache.pulsar.common.naming.NamespaceBundleFactory - Policy updated for namespace my-property/my-ns, refreshing the bundle cache.
08:52:17.026 [pulsar-4-4] INFO org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl - 1 brokers being considered for assignment of my-property/my-ns/0x00000000_0xffffffff
08:52:17.032 [pulsar-4-4] INFO org.apache.pulsar.broker.namespace.OwnershipCache - Trying to acquire ownership of my-property/my-ns/0x00000000_0xffffffff
08:52:17.045 [metadata-store-8-1] INFO org.apache.pulsar.metadata.coordination.impl.ResourceLockImpl - Acquired resource lock on /namespace/my-property/my-ns/0x00000000_0xffffffff
08:52:17.045 [metadata-store-8-1] INFO org.apache.pulsar.broker.namespace.OwnershipCache - Successfully acquired ownership of OwnedBundle(bundle=my-property/my-ns/0x00000000_0xffffffff, isActive=1)
08:52:17.062 [metadata-store-8-1] INFO org.eclipse.jetty.server.RequestLog - 127.0.0.1 - - [14/Aug/2021:08:52:16 +0800] "GET /lookup/v2/topic/persistent/my-property/my-ns/dead-letter-topic HTTP/1.1" 200 217 "-" "Pulsar-Java-v2.9.0-SNAPSHOT" 179
08:52:17.150 [pulsar-client-io-35-1] INFO org.apache.pulsar.client.impl.ConnectionPool - [[id: 0xf527e621, L:/127.0.0.1:53148 - R:localhost/127.0.0.1:53141]] Connected to server
08:52:17.172 [pulsar-io-6-1] INFO org.apache.pulsar.broker.service.ServerCnx - New connection from /127.0.0.1:53148
08:52:17.178 [pulsar-client-io-35-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://my-property/my-ns/dead-letter-topic][my-subscription] Subscribing to topic on cnx [id: 0xf527e621, L:/127.0.0.1:53148 - R:localhost/127.0.0.1:53141], consumerId 0
08:52:17.182 [pulsar-io-6-1] INFO org.apache.pulsar.broker.intercept.CounterBrokerInterceptor - [0] On [SUBSCRIBE] Pulsar command
08:52:17.191 [pulsar-io-6-1] INFO org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:53148] Subscribing on topic persistent://my-property/my-ns/dead-letter-topic / my-subscription
08:52:17.279 [metadata-store-8-1] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - Opening managed ledger my-property/my-ns/persistent/dead-letter-topic
08:52:17.288 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO org.apache.bookkeeper.mledger.impl.MetaStoreImpl - Creating '/managed-ledgers/my-property/my-ns/persistent/dead-letter-topic'
08:52:17.289 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO org.apache.pulsar.zookeeper.ZooKeeperCache - [MockZookeeper] Received ZooKeeper watch event: WatchedEvent state:SyncConnected type:NodeCreated path:/managed-ledgers/my-property/my-ns/persistent/dead-letter-topic
08:52:17.305 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [my-property/my-ns/persistent/dead-letter-topic] Creating ledger, metadata: {component=[109, 97, 110, 97, 103, 101, 100, 45, 108, 101, 100, 103, 101, 114], pulsar/managed-ledger=[109, 121, 45, 112, 114, 111, 112, 101, 114, 116, 121, 47, 109, 121, 45, 110, 115, 47, 112, 101, 114, 115, 105, 115, 116, 101, 110, 116, 47, 100, 101, 97, 100, 45, 108, 101, 116, 116, 101, 114, 45, 116, 111, 112, 105, 99], application=[112, 117, 108, 115, 97, 114]} - metadata ops timeout : 60 seconds
08:52:17.311 [mock-pulsar-bk-OrderedExecutor-0-0] INFO org.apache.bookkeeper.client.PulsarMockBookKeeper - Creating ledger 3
08:52:17.360 [mock-pulsar-bk-OrderedExecutor-0-0] WARN org.apache.bookkeeper.proto.checksum.CRC32CDigestManager - Sse42Crc32C is not supported, will use a slower CRC32C implementation.
08:52:17.370 [mock-pulsar-bk-OrderedExecutor-0-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [my-property/my-ns/persistent/dead-letter-topic] Created ledger 3
08:52:17.396 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl - [my-property/my-ns/persistent/dead-letter-topic] Successfully initialize managed ledger
08:52:17.418 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO org.apache.pulsar.broker.service.AbstractTopic - Disabling publish throttling for persistent://my-property/my-ns/dead-letter-topic
08:52:17.428 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://my-property/my-ns/dead-letter-topic] There are no replicated subscriptions on the topic
08:52:17.438 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO org.apache.pulsar.broker.service.BrokerService - Created topic persistent://my-property/my-ns/dead-letter-topic - dedup is disabled
08:52:17.464 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [my-property/my-ns/persistent/dead-letter-topic] Cursor my-subscription recovered to position 3:-1
08:52:17.471 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [my-property/my-ns/persistent/dead-letter-topic] Creating ledger, metadata: {component=[109, 97, 110, 97, 103, 101, 100, 45, 108, 101, 100, 103, 101, 114], pulsar/managed-ledger=[109, 121, 45, 112, 114, 111, 112, 101, 114, 116, 121, 47, 109, 121, 45, 110, 115, 47, 112, 101, 114, 115, 105, 115, 116, 101, 110, 116, 47, 100, 101, 97, 100, 45, 108, 101, 116, 116, 101, 114, 45, 116, 111, 112, 105, 99], pulsar/cursor=[109, 121, 45, 115, 117, 98, 115, 99, 114, 105, 112, 116, 105, 111, 110], application=[112, 117, 108, 115, 97, 114]} - metadata ops timeout : 60 seconds
08:52:17.472 [mock-pulsar-bk-OrderedExecutor-0-0] INFO org.apache.bookkeeper.client.PulsarMockBookKeeper - Creating ledger 4
08:52:17.492 [mock-pulsar-bk-OrderedExecutor-0-0] INFO org.apache.bookkeeper.mledger.impl.MetaStoreImpl - [my-property/my-ns/persistent/dead-letter-topic] [my-subscription] Updating cursor info ledgerId=4 mark-delete=3:-1
08:52:17.494 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [my-property/my-ns/persistent/dead-letter-topic] Updated cursor my-subscription with ledger id 4 md-position=3:-1 rd-position=3:0
08:52:17.495 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [my-property/my-ns/persistent/dead-letter-topic] Opened new cursor: ManagedCursorImpl{ledger=my-property/my-ns/persistent/dead-letter-topic, name=my-subscription, ackPos=3:-1, readPos=3:0}
08:52:17.528 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [my-property/my-ns/persistent/dead-letter-topic-my-subscription] Rewind from 3:0 to 3:0
08:52:17.532 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://my-property/my-ns/dead-letter-topic] There are no replicated subscriptions on the topic
08:52:17.532 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://my-property/my-ns/dead-letter-topic][my-subscription] Created new subscription for 0
08:52:17.532 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:53148] Created subscription on topic persistent://my-property/my-ns/dead-letter-topic / my-subscription
08:52:17.533 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO org.apache.pulsar.broker.intercept.CounterBrokerInterceptor - [1] On [SUCCESS] Pulsar command
08:52:17.534 [pulsar-client-io-35-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://my-property/my-ns/dead-letter-topic][my-subscription] Subscribed to topic on localhost/127.0.0.1:53141 -- consumer: 0
08:52:17.576 [pulsar-web-29-7] INFO org.eclipse.jetty.server.RequestLog - 127.0.0.1 - - [14/Aug/2021:08:52:17 +0800] "GET /admin/v2/persistent/my-property/my-ns/dead-letter-topic-my-subscription-DLQ/partitions?checkAllowAutoCreation=true HTTP/1.1" 200 16 "-" "Pulsar-Java-v2.9.0-SNAPSHOT" 23
08:52:17.610 [pulsar-web-29-8] INFO org.eclipse.jetty.server.RequestLog - 127.0.0.1 - - [14/Aug/2021:08:52:17 +0800] "GET /lookup/v2/topic/persistent/my-property/my-ns/dead-letter-topic-my-subscription-DLQ HTTP/1.1" 200 217 "-" "Pulsar-Java-v2.9.0-SNAPSHOT" 15
08:52:17.641 [pulsar-client-io-44-1] INFO org.apache.pulsar.client.impl.ConnectionPool - [[id: 0xf74125a5, L:/127.0.0.1:53150 - R:localhost/127.0.0.1:53141]] Connected to server
08:52:17.655 [pulsar-io-6-3] INFO org.apache.pulsar.broker.service.ServerCnx - New connection from /127.0.0.1:53150
08:52:17.657 [pulsar-client-io-44-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ][my-subscription] Subscribing to topic on cnx [id: 0xf74125a5, L:/127.0.0.1:53150 - R:localhost/127.0.0.1:53141], consumerId 0
08:52:17.661 [pulsar-io-6-3] INFO org.apache.pulsar.broker.intercept.CounterBrokerInterceptor - [2] On [SUBSCRIBE] Pulsar command
08:52:17.661 [pulsar-io-6-3] INFO org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:53150] Subscribing on topic persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ / my-subscription
08:52:17.667 [pulsar-io-6-3] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - Opening managed ledger my-property/my-ns/persistent/dead-letter-topic-my-subscription-DLQ
08:52:17.669 [bookkeeper-ml-scheduler-OrderedScheduler-0-0] INFO org.apache.bookkeeper.mledger.impl.MetaStoreImpl - Creating '/managed-ledgers/my-property/my-ns/persistent/dead-letter-topic-my-subscription-DLQ'
08:52:17.670 [bookkeeper-ml-scheduler-OrderedScheduler-0-0] INFO org.apache.pulsar.zookeeper.ZooKeeperCache - [MockZookeeper] Received ZooKeeper watch event: WatchedEvent state:SyncConnected type:NodeCreated path:/managed-ledgers/my-property/my-ns/persistent/dead-letter-topic-my-subscription-DLQ
08:52:17.672 [bookkeeper-ml-scheduler-OrderedScheduler-0-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [my-property/my-ns/persistent/dead-letter-topic-my-subscription-DLQ] Creating ledger, metadata: {component=[109, 97, 110, 97, 103, 101, 100, 45, 108, 101, 100, 103, 101, 114], pulsar/managed-ledger=[109, 121, 45, 112, 114, 111, 112, 101, 114, 116, 121, 47, 109, 121, 45, 110, 115, 47, 112, 101, 114, 115, 105, 115, 116, 101, 110, 116, 47, 100, 101, 97, 100, 45, 108, 101, 116, 116, 101, 114, 45, 116, 111, 112, 105, 99, 45, 109, 121, 45, 115, 117, 98, 115, 99, 114, 105, 112, 116, 105, 111, 110, 45, 68, 76, 81], application=[112, 117, 108, 115, 97, 114]} - metadata ops timeout : 60 seconds
08:52:17.674 [mock-pulsar-bk-OrderedExecutor-0-0] INFO org.apache.bookkeeper.client.PulsarMockBookKeeper - Creating ledger 5
08:52:17.674 [mock-pulsar-bk-OrderedExecutor-0-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [my-property/my-ns/persistent/dead-letter-topic-my-subscription-DLQ] Created ledger 5
08:52:17.677 [bookkeeper-ml-scheduler-OrderedScheduler-0-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl - [my-property/my-ns/persistent/dead-letter-topic-my-subscription-DLQ] Successfully initialize managed ledger
08:52:17.678 [bookkeeper-ml-scheduler-OrderedScheduler-0-0] INFO org.apache.pulsar.broker.service.AbstractTopic - Disabling publish throttling for persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ
08:52:17.679 [bookkeeper-ml-scheduler-OrderedScheduler-0-0] INFO org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ] There are no replicated subscriptions on the topic
08:52:17.681 [bookkeeper-ml-scheduler-OrderedScheduler-0-0] INFO org.apache.pulsar.broker.service.BrokerService - Created topic persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ - dedup is disabled
08:52:17.683 [bookkeeper-ml-scheduler-OrderedScheduler-0-0] INFO org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [my-property/my-ns/persistent/dead-letter-topic-my-subscription-DLQ] Cursor my-subscription recovered to position 5:-1
08:52:17.685 [bookkeeper-ml-scheduler-OrderedScheduler-0-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [my-property/my-ns/persistent/dead-letter-topic-my-subscription-DLQ] Creating ledger, metadata: {component=[109, 97, 110, 97, 103, 101, 100, 45, 108, 101, 100, 103, 101, 114], pulsar/managed-ledger=[109, 121, 45, 112, 114, 111, 112, 101, 114, 116, 121, 47, 109, 121, 45, 110, 115, 47, 112, 101, 114, 115, 105, 115, 116, 101, 110, 116, 47, 100, 101, 97, 100, 45, 108, 101, 116, 116, 101, 114, 45, 116, 111, 112, 105, 99, 45, 109, 121, 45, 115, 117, 98, 115, 99, 114, 105, 112, 116, 105, 111, 110, 45, 68, 76, 81], pulsar/cursor=[109, 121, 45, 115, 117, 98, 115, 99, 114, 105, 112, 116, 105, 111, 110], application=[112, 117, 108, 115, 97, 114]} - metadata ops timeout : 60 seconds
08:52:17.685 [mock-pulsar-bk-OrderedExecutor-0-0] INFO org.apache.bookkeeper.client.PulsarMockBookKeeper - Creating ledger 6
08:52:17.687 [mock-pulsar-bk-OrderedExecutor-0-0] INFO org.apache.bookkeeper.mledger.impl.MetaStoreImpl - [my-property/my-ns/persistent/dead-letter-topic-my-subscription-DLQ] [my-subscription] Updating cursor info ledgerId=6 mark-delete=5:-1
08:52:17.689 [bookkeeper-ml-scheduler-OrderedScheduler-0-0] INFO org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [my-property/my-ns/persistent/dead-letter-topic-my-subscription-DLQ] Updated cursor my-subscription with ledger id 6 md-position=5:-1 rd-position=5:0
08:52:17.689 [bookkeeper-ml-scheduler-OrderedScheduler-0-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [my-property/my-ns/persistent/dead-letter-topic-my-subscription-DLQ] Opened new cursor: ManagedCursorImpl{ledger=my-property/my-ns/persistent/dead-letter-topic-my-subscription-DLQ, name=my-subscription, ackPos=5:-1, readPos=5:0}
08:52:17.711 [bookkeeper-ml-scheduler-OrderedScheduler-0-0] INFO org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [my-property/my-ns/persistent/dead-letter-topic-my-subscription-DLQ-my-subscription] Rewind from 5:0 to 5:0
08:52:17.713 [bookkeeper-ml-scheduler-OrderedScheduler-0-0] INFO org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ] There are no replicated subscriptions on the topic
08:52:17.713 [bookkeeper-ml-scheduler-OrderedScheduler-0-0] INFO org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ][my-subscription] Created new subscription for 0
08:52:17.713 [bookkeeper-ml-scheduler-OrderedScheduler-0-0] INFO org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:53150] Created subscription on topic persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ / my-subscription
08:52:17.713 [bookkeeper-ml-scheduler-OrderedScheduler-0-0] INFO org.apache.pulsar.broker.intercept.CounterBrokerInterceptor - [3] On [SUCCESS] Pulsar command
08:52:17.715 [pulsar-client-io-44-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ][my-subscription] Subscribed to topic on localhost/127.0.0.1:53141 -- consumer: 0
08:52:17.778 [pulsar-web-29-15] INFO org.eclipse.jetty.server.RequestLog - 127.0.0.1 - - [14/Aug/2021:08:52:17 +0800] "GET /admin/v2/persistent/my-property/my-ns/dead-letter-topic/partitions?checkAllowAutoCreation=true HTTP/1.1" 200 16 "-" "Pulsar-Java-v2.9.0-SNAPSHOT" 28
08:52:17.826 [pulsar-web-29-9] INFO org.eclipse.jetty.server.RequestLog - 127.0.0.1 - - [14/Aug/2021:08:52:17 +0800] "GET /lookup/v2/topic/persistent/my-property/my-ns/dead-letter-topic HTTP/1.1" 200 217 "-" "Pulsar-Java-v2.9.0-SNAPSHOT" 14
08:52:17.826 [pulsar-client-io-35-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [persistent://my-property/my-ns/dead-letter-topic] [null] Creating producer on cnx [id: 0xf527e621, L:/127.0.0.1:53148 - R:localhost/127.0.0.1:53141]
08:52:17.832 [pulsar-io-6-1] INFO org.apache.pulsar.broker.intercept.CounterBrokerInterceptor - [4] On [PRODUCER] Pulsar command
08:52:17.837 [pulsar-io-6-1] INFO org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:53148][persistent://my-property/my-ns/dead-letter-topic] Creating producer. producerId=0
08:52:17.850 [pulsar-io-6-1] INFO org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:53148] persistent://my-property/my-ns/dead-letter-topic configured with schema false
08:52:17.865 [pulsar-io-6-1] INFO org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:53148] Created new producer: Producer{topic=PersistentTopic{topic=persistent://my-property/my-ns/dead-letter-topic}, client=/127.0.0.1:53148, producerName=test-1-0, producerId=0}
08:52:17.867 [pulsar-io-6-1] INFO org.apache.pulsar.broker.intercept.CounterBrokerInterceptor - [5] On [PRODUCER_SUCCESS] Pulsar command
08:52:17.871 [pulsar-client-io-35-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [persistent://my-property/my-ns/dead-letter-topic] [test-1-0] Created producer on cnx [id: 0xf527e621, L:/127.0.0.1:53148 - R:localhost/127.0.0.1:53141]
08:52:17.888 [main] WARN com.scurrilous.circe.checksum.Crc32cIntChecksum - Failed to load Circe JNI library. Falling back to Java based CRC32c provider
08:52:17.890 [pulsar-io-6-1] INFO org.apache.pulsar.broker.intercept.CounterBrokerInterceptor - [6] On [SEND] Pulsar command
08:52:17.897 [pulsar-io-6-1] INFO org.apache.pulsar.broker.intercept.CounterBrokerInterceptor - [7] On [SEND_RECEIPT] Pulsar command
08:52:17.901 [pulsar-io-6-1] INFO org.apache.pulsar.broker.intercept.CounterBrokerInterceptor - [8] On [CLOSE_PRODUCER] Pulsar command
08:52:17.903 [pulsar-io-6-1] INFO org.apache.pulsar.broker.service.ServerCnx - [PersistentTopic{topic=persistent://my-property/my-ns/dead-letter-topic}][test-1-0] Closing producer on cnx /127.0.0.1:53148. producerId=0
08:52:17.905 [pulsar-io-6-1] INFO org.apache.pulsar.broker.service.ServerCnx - [PersistentTopic{topic=persistent://my-property/my-ns/dead-letter-topic}][test-1-0] Closed producer on cnx /127.0.0.1:53148. producerId=0
08:52:17.905 [pulsar-io-6-1] INFO org.apache.pulsar.broker.intercept.CounterBrokerInterceptor - [9] On [SUCCESS] Pulsar command
08:52:17.906 [pulsar-client-io-35-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [persistent://my-property/my-ns/dead-letter-topic] [test-1-0] Closed Producer
08:52:17.906 [mock-pulsar-bk-OrderedExecutor-0-0] INFO org.apache.pulsar.broker.intercept.CounterBrokerInterceptor - Send message to topic PersistentTopic{topic=persistent://my-property/my-ns/dead-letter-topic}, subscription my-subscription
08:52:17.911 [pulsar-io-6-1] INFO org.apache.pulsar.broker.intercept.CounterBrokerInterceptor - [10] On [MESSAGE] Pulsar command
08:52:17.913 [main] INFO org.apache.pulsar.client.api.DeadLetterTopicTest - consumer received message : 3:0:-1:0 Hello Pulsar [0]
08:52:19.872 [pulsar-timer-40-1] WARN org.apache.pulsar.client.impl.UnAckedMessageTracker - [ConsumerBase{subscription='my-subscription', consumerName='4f67d', topic='persistent://my-property/my-ns/dead-letter-topic'}] 1 messages have timed-out
08:52:19.910 [mock-pulsar-bk-OrderedExecutor-0-0] INFO org.apache.pulsar.broker.intercept.CounterBrokerInterceptor - Send message to topic PersistentTopic{topic=persistent://my-property/my-ns/dead-letter-topic}, subscription my-subscription
08:52:19.910 [pulsar-io-6-1] INFO org.apache.pulsar.broker.intercept.CounterBrokerInterceptor - [11] On [MESSAGE] Pulsar command
08:52:19.912 [main] INFO org.apache.pulsar.client.api.DeadLetterTopicTest - consumer received message : 3:0:-1:0 Hello Pulsar [0]
08:52:21.889 [pulsar-timer-40-1] WARN org.apache.pulsar.client.impl.UnAckedMessageTracker - [ConsumerBase{subscription='my-subscription', consumerName='4f67d', topic='persistent://my-property/my-ns/dead-letter-topic'}] 1 messages have timed-out
08:52:21.918 [pulsar-web-29-14] INFO org.eclipse.jetty.server.RequestLog - 127.0.0.1 - - [14/Aug/2021:08:52:21 +0800] "GET /admin/v2/persistent/my-property/my-ns/dead-letter-topic-my-subscription-DLQ/partitions?checkAllowAutoCreation=true HTTP/1.1" 200 16 "-" "Pulsar-Java-v2.9.0-SNAPSHOT" 19
08:52:21.929 [pulsar-web-29-16] INFO org.eclipse.jetty.server.RequestLog - 127.0.0.1 - - [14/Aug/2021:08:52:21 +0800] "GET /lookup/v2/topic/persistent/my-property/my-ns/dead-letter-topic-my-subscription-DLQ HTTP/1.1" 200 217 "-" "Pulsar-Java-v2.9.0-SNAPSHOT" 7
08:52:21.933 [pulsar-client-io-35-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ] [null] Creating producer on cnx [id: 0xf527e621, L:/127.0.0.1:53148 - R:localhost/127.0.0.1:53141]
08:52:21.934 [pulsar-io-6-1] INFO org.apache.pulsar.broker.intercept.CounterBrokerInterceptor - [12] On [PRODUCER] Pulsar command
08:52:21.934 [pulsar-io-6-1] INFO org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:53148][persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ] Creating producer. producerId=1
08:52:21.935 [ForkJoinPool.commonPool-worker-2] INFO org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:53148] persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ configured with schema false
08:52:21.939 [ForkJoinPool.commonPool-worker-2] INFO org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:53148] Created new producer: Producer{topic=PersistentTopic{topic=persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ}, client=/127.0.0.1:53148, producerName=test-1-1, producerId=1}
08:52:21.939 [ForkJoinPool.commonPool-worker-2] INFO org.apache.pulsar.broker.intercept.CounterBrokerInterceptor - [13] On [PRODUCER_SUCCESS] Pulsar command
08:52:21.940 [pulsar-client-io-35-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ] [test-1-1] Created producer on cnx [id: 0xf527e621, L:/127.0.0.1:53148 - R:localhost/127.0.0.1:53141]
08:52:21.945 [pulsar-io-6-1] INFO org.apache.pulsar.broker.intercept.CounterBrokerInterceptor - [14] On [SEND] Pulsar command
08:52:21.947 [pulsar-io-6-1] INFO org.apache.pulsar.broker.intercept.CounterBrokerInterceptor - [15] On [SEND_RECEIPT] Pulsar command
08:52:21.950 [broker-topic-workers-OrderedScheduler-1-0] INFO org.apache.pulsar.broker.intercept.CounterBrokerInterceptor - Send message to topic PersistentTopic{topic=persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ}, subscription my-subscription
08:52:21.951 [pulsar-io-6-3] INFO org.apache.pulsar.broker.intercept.CounterBrokerInterceptor - [16] On [MESSAGE] Pulsar command
08:53:09.897 [pulsar-load-manager-3-1] INFO org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl - Writing local data to metadata store because maximum change Infinity% exceeded threshold 10%; time since last report written is 55.167 seconds
08:53:14.569 [pulsar-load-manager-3-1] INFO org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl - Only 1 broker available: no load shedding will be performed
08:54:09.886 [pulsar-load-manager-3-1] INFO org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl - Writing local data to metadata store because maximum change 100.0% exceeded threshold 10%; time since last report written is 59.982 seconds
08:54:14.567 [pulsar-load-manager-3-1] INFO org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl - Only 1 broker available: no load shedding will be performed
08:55:14.565 [pulsar-load-manager-3-1] INFO org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl - Only 1 broker available: no load shedding will be performed
08:56:14.565 [pulsar-load-manager-3-1] INFO org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl - Only 1 broker available: no load shedding will be performed
08:57:14.556 [pulsar-load-manager-3-1] INFO org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl - Only 1 broker available: no load shedding will be performed
from the log , I can see only receive 2 times, and the message was sent to deadletter
08:52:21.950 [broker-topic-workers-OrderedScheduler-1-0] INFO org.apache.pulsar.broker.intercept.CounterBrokerInterceptor - Send message to topic PersistentTopic{topic=persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ}, subscription my-subscription
@srouthu1 Have you tried a new Pulsar Client version? I noticed the client version is 2.6.0 which released almost 2 years ago.
Hi @codelipenghui @leizhiyuan I am able to reproduce this issue with multi-topic consumer of a key shared subscription, can you please check if you can reproduce this issue
@srouthu1 Which Pulsar version are you testing? /cc @Technoboy-
@codelipenghui I am using 2.8 version
@codelipenghui @srouthu1 When is the partitioned topic, broker (2.8, latest version), client version (2.6, 2.8) could reproduce this issue. We will fix this later. Please assign to me.
Expected behavior
I have created consumer with the following code. Consumer consumer = client.newConsumer() .topic("persistent://mytenant/myns/mytopic") .subscriptionName(userName+"-default") .subscriptionType(SubscriptionType.Key_Shared).keySharedPolicy(KeySharedPolicy.autoSplitHashRange()) .receiverQueueSize(10) .deadLetterPolicy(DeadLetterPolicy.builder() .maxRedeliverCount(3) .deadLetterTopic("persistent://mytenant/myns/mytopic-userName-default.DLQ") .build()) .ackTimeout(40,TimeUnit.SECONDS) .subscribe();
My my publisher sends messages with different keys.
My consumer receives the messages but it does not acknowledge the message because I want the messages to be moved to dead letter topic.
I am expecting the messages to redelivered to my consumer 3 times and then they should be published to dead letter topic.
Actual behavior
The messages are getting redelivered to my consumer unlimited times. They never moved to dead letter topic. If I change the subscription type to shared then the messages are moving to dead letter topic.
System configuration
Pulsar version: Tried with both 2.6.0 and 2.8.0 Pulsar Client: 2.6.0