apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.23k stars 3.58k forks source link

[Bug] pulsar keep creating dead letter queue producer and exceed the maximum limit #20635

Open mingmcb opened 1 year ago

mingmcb commented 1 year ago

Search before asking

Version

2.11

Minimal reproduce step

Use pulsar java client library to create a consumer with dlq prodcer.

  1. after application starts, stop bookie
  2. produce some message to the queue and trigger the consumer
  3. check the log. It seems keep creating dead letter queue producer, and eventually hit the maximum limit

What did you expect to see?

extra producer should not be created if there is an issue on pulsar

What did you see instead?

created over 10000 producers and eventually exceed the limits

Anything else?

see logs

--
Starting Pulsar producer perf with config: {"topicName":"persistent://public/default/my-topic","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
--
Pulsar client config: {"serviceUrl":"pulsar://pulsar:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":1,"numListenerThreads":1,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":true,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null}
[persistent://public/default/dlq] [null] Creating producer on cnx [id: 0x910a07bb, L:/10.204.68.122:45104 - R:pulsar/10.204.67.17:6650]
[persistent://public/default/dlq] [pulsar-dev-5-10032] Created producer on cnx [id: 0x910a07bb, L:/10.204.68.122:45104 - R:pulsar/10.204.67.17:6650]
[persistent://public/default/my-topic] failed to get schema : org.apache.pulsar.client.api.PulsarClientException: {"errorMsg":"org.apache.pulsar.broker.service.schema.exceptions.SchemaException: Bookie handle is not available - ledger=35399 - operation=Failed to read entry - entry=0","reqId":4059106653410978868, "remote":"pulsar/10.204.70.7:6650", "local":"/10.204.68.122:44372"}
Dead letter producer exception with topic: persistent://public/default/dlq
Starting Pulsar producer perf with config: {"topicName":"persistent://public/default/dlq","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
Pulsar client config: {"serviceUrl":"pulsar://pulsar:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":1,"numListenerThreads":1,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":true,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null}
[persistent://public/default/dlq] [null] Creating producer on cnx [id: 0x910a07bb, L:/10.204.68.122:45104 - R:pulsar/10.204.67.17:6650]
[persistent://public/default/dlq] [pulsar-dev-5-10033] Created producer on cnx [id: 0x910a07bb, L:/10.204.68.122:45104 - R:pulsar/10.204.67.17:6650]
[persistent://public/default/my-topic] failed to get schema : org.apache.pulsar.client.api.PulsarClientException: {"errorMsg":"org.apache.pulsar.broker.service.schema.exceptions.SchemaException: Bookie handle is not available - ledger=35399 - operation=Failed to read entry - entry=0","reqId":4059106653410978872, "remote":"pulsar/10.204.70.7:6650", "local":"/10.204.68.122:44372"}
Dead letter producer exception with topic: persistent://public/default/dlq
Starting Pulsar producer perf with config: {"topicName":"persistent://public/default/dlq","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
Pulsar client config: {"serviceUrl":"pulsar://pulsar:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":1,"numListenerThreads":1,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":true,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null}
[persistent://public/default/dlq] [null] Creating producer on cnx [id: 0x910a07bb, L:/10.204.68.122:45104 - R:pulsar/10.204.67.17:6650]
[persistent://public/default/dlq] [pulsar-dev-5-10034] Created producer on cnx [id: 0x910a07bb, L:/10.204.68.122:45104 - R:pulsar/10.204.67.17:6650]
[persistent://public/default/my-topic] failed to get schema : org.apache.pulsar.client.api.PulsarClientException: {"errorMsg":"org.apache.pulsar.broker.service.schema.exceptions.SchemaException: Bookie handle is not available - ledger=35399 - operation=Failed to read entry - entry=0","reqId":4059106653410978876, "remote":"pulsar/10.204.70.7:6650", "local":"/10.204.68.122:44372"}
Dead letter producer exception with topic: persistent://public/default/dlq
Starting Pulsar producer perf with config: {"topicName":"persistent://public/default/dlq","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
Pulsar client config: {"serviceUrl":"pulsar://pulsar:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":1,"numListenerThreads":1,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":true,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null}
[persistent://public/default/dlq] [null] Creating producer on cnx [id: 0x910a07bb, L:/10.204.68.122:45104 - R:pulsar/10.204.67.17:6650]
[persistent://public/default/dlq] [pulsar-dev-5-10035] Created producer on cnx [id: 0x910a07bb, L:/10.204.68.122:45104 - R:pulsar/10.204.67.17:6650]
[persistent://public/default/my-topic] failed to get schema : org.apache.pulsar.client.api.PulsarClientException: {"errorMsg":"org.apache.pulsar.broker.service.schema.exceptions.SchemaException: Bookie handle is not available - ledger=35399 - operation=Failed to read entry - entry=0","reqId":4059106653410978880, "remote":"pulsar/10.204.70.7:6650", "local":"/10.204.68.122:44372"}
Dead letter producer exception with topic: persistent://public/default/dlq

Are you willing to submit a PR?

david-streamlio commented 1 year ago

A single bookie failure shouldn’t have that type of impact unless it drops the number of active/writable bookies below the write/ack quorum. Can you share the number of bookies in your cluster and the write and ack quorum settings ?

mingmcb commented 1 year ago

There are 4 bookies configured for each environment. We also have the following configure that requires 3 bookie up running. In another word, only 1 bookie is allowed to be down without service interruption.

Number of bookies to use when creating a ledger

  managedLedgerDefaultEnsembleSize: "3"
  # Number of copies to store for each message
  managedLedgerDefaultWriteQuorum: "3"
  # Number of guaranteed copies (acks to wait before write is complete)
  managedLedgerDefaultAckQuorum: "3"
github-actions[bot] commented 1 year ago

The issue had no activity for 30 days, mark with Stale label.

tonisojandu commented 1 year ago

I think we are facing the same problem. This issue might also be related.

We face this issue from time to time, when due to engineer, we encounter and schema incompatibility between the received messages and the one consumer expects. We have disabled schema validations on broker side by choice. However, rather than just sending those messages to DLQ, Pulsar client side fails validation and goes into a producer creating loop.

Here is the example code to replicate the issue.

We had a look at the client code and the problem seems to be in AutoProduceBytesSchema.encode(byte[] message) method since the requireSchemaValidation is flipped to true in the constructor, even though broker side validation is disabled.

We were able to "fix" this problem when in ConsumerImpl.initDeadLetterProducerIfNeeded() instead of :

((ProducerBuilderImpl<byte[]>) client.newProducer(Schema.AUTO_PRODUCE_BYTES(schema)))

we created the producer with as:

((ProducerBuilderImpl<byte[]>) client.newProducer(Schema.BYTES))

and in ConsumerImpl.processPossibleToDLQ(MessageIdAdv messageId) instead of

producerDLQ.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))

we sent the message with

producerDLQ.newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.BYTES))

I am not sure if this is proper way to fix it, as it leaves the DLQ with binary schema in registry. This would be fine for us, but not sure if someone else is relying on it having a more descriptive schema. However, I would like to imagine DLQ as a dumping ground that should be able to accept all types of messages.

If this is an OK fix, I can create a pull request for it.

edit:

github-actions[bot] commented 1 year ago

The issue had no activity for 30 days, mark with Stale label.