streamnative / pulsar-archived

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org
Apache License 2.0
73 stars 25 forks source link

ISSUE-12747: Pulsar Sink connectors not getting messages when input rate to pulsar is high #3255

Open sijie opened 2 years ago

sijie commented 2 years ago

Original Issue: apache/pulsar#12747


Describe the bug The Pulsar Sink connector doesn't receive all the messages if the input rate to the pulsar topic is high. I tried using the inbuilt Cassandra connector and followed the instructions to try it out here. I push 100000 messages to pulsar via a java code but I don't see all the messages there in the Sink. Topic receives all the messages but the Sink doesn't. If I send these many messages in small chunks, it reaches the sink.

To Reproduce Steps to reproduce the behavior:

  1. Setup Pulsar(2.8.1) standalone and Cassandra sink as described in the official docs.
  2. Use the Java code given below to insert messages into the Pulsar topic.
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class PushToCassandra {
    public static void main(String[] args) throws PulsarClientException, ExecutionException, InterruptedException {
        /**
         * for i in {0..9}; do bin/pulsar-client produce -m "key-$i" -n 1 test_cassandra; done
         */
        ClientConfigurationData clientConfigurationData = new ClientConfigurationData();
        clientConfigurationData.setServiceUrl("pulsar://localhost:6650");
        PulsarClient pulsarClient = new PulsarClientImpl(clientConfigurationData);
        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                .topic("test_cassandra")
                .blockIfQueueFull(true)
                .enableBatching(true)
                .batchingMaxMessages(50_000)
                .maxPendingMessages(50_000)
                .batchingMaxBytes(5242880)
                .batchingMaxPublishDelay(10, TimeUnit.SECONDS)
                .batcherBuilder(BatcherBuilder.DEFAULT)
                .create();
        try (producer) {
            Future<MessageId> future = null;
            for (int i = 0; i < 100000; i++) {
                future = producer.newMessage().value("key-" + i).sendAsync();
            }
            producer.flush();
            future.get();
        }
        producer.close();
        pulsarClient.close();
    }
}

Expected behavior A sink should receive all 100000 messages.

Metrics

availablePermits is also a negative number in the topic stats. Isn't that wrong too? If I run the sink stats check again, the numReadFromPulsar and numWrittenToSink keep on increasing. Now I see 1400000 for them. Pulsar seems to be retrying 50k unackedMessages. Not sure why they are unackedMessages!

github-actions[bot] commented 2 years ago

The issue had no activity for 30 days, mark with Stale label.