Open sijie opened 3 years ago
Original Issue: apache/pulsar#12370
the current version of pulsar client: 2.7.3 the available permit goes to negative .
consumer = pulsarClient.newConsumer() .topic("persistent://xx/xxxx/xxx-1ms") .ackTimeout(60, TimeUnit.SECONDS) .subscriptionName("vmbTest") // .subscriptionInitialPosition(SubscriptionInitialPosition.Latest) .batchReceivePolicy(BatchReceivePolicy.builder() .maxNumMessages(100) .maxNumBytes(1024 * 1024) .timeout(1000, TimeUnit.MILLISECONDS) .build()).subscriptionType(SubscriptionType.Shared) .subscribe();
while (consumer.isConnected()) { Messages<byte[]> batchMessages = consumer.batchReceive(); List<String> messages = StreamSupport .stream(batchMessages.spliterator(), false) .map(msg -> String.valueOf(msg.getData())) .collect(Collectors.toList()); consumer.acknowledge(batchMessages); } consumer.close(); } catch (PulsarClientException e) { if (consumer != null) { consumer.close(); } }
When does the available permit increment again
The issue had no activity for 30 days, mark with Stale label.
Original Issue: apache/pulsar#12370
the current version of pulsar client: 2.7.3 the available permit goes to negative .
consumer = pulsarClient.newConsumer() .topic("persistent://xx/xxxx/xxx-1ms") .ackTimeout(60, TimeUnit.SECONDS) .subscriptionName("vmbTest") // .subscriptionInitialPosition(SubscriptionInitialPosition.Latest) .batchReceivePolicy(BatchReceivePolicy.builder() .maxNumMessages(100) .maxNumBytes(1024 * 1024) .timeout(1000, TimeUnit.MILLISECONDS) .build()).subscriptionType(SubscriptionType.Shared) .subscribe();
When does the available permit increment again