apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.27k stars 3.59k forks source link

[Pulsar Websocket] Consumer message can't be acknowleaged successfully after toByteArray & fromByteArray #12059

Open yakir-Yang opened 3 years ago

yakir-Yang commented 3 years ago

Case 1. After receiving the message from Pulsar consumer, just calling the consumer.acknowledge() immediately, this message can be acknowledged successfully. Here's the code

consumer.receiveAsync().thenAccept(msg -> {
    consumer.acknowledge(msg.getMessageId());
    ....
}

Case 2. If I calling the toByteArray & fromByteArrayWithTopic functions, the message just can't be acknowledged successfully.

consumer.receiveAsync().thenAccept(msg -> {
    MessageId msgId = MessageId.fromByteArrayWithTopic(msg.getMessageId().toByteArray(), topic.toString());
    consumer.acknowledgeAsync(msgId);
    ....
}

Case 3. If I calling the toByteArray & fromByteArray functions, the message just still can't be acknowledged successfully.

consumer.receiveAsync().thenAccept(msg -> {
    MessageId msgId = MessageId.fromByteArray(msg.getMessageId().toByteArray());
   consumer.acknowledgeAsync(msgId);
    ....
}

Case 4. Still failed

consumer.receiveAsync().thenAccept(msg -> {
    String messageId = Base64.getEncoder().encodeToString(msg.getMessageId().toByteArray());
    MessageId msgId = MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(messageId), topic.toString());
    consumer.acknowledgeAsync(msgId);
    ....
}
yakir-Yang commented 3 years ago

@codelipenghui

yakir-Yang commented 3 years ago

Here's my simple code that can reproduce this problem quickly.

You can check the unacked messages through pulsar-admin tools

root@pulsar-node-102135:/pulsar# ./bin/pulsar-admin topics stats persistent://edge-sit-message-center/default/subscriber_1415926654114594816
{
  "count" : 0,
  ....
  "subscriptions" : {
   "kk-subscription" : {
      ......
      "unackedMessages" : 4842,
      "consumers" : [ {
        "unackedMessages" : 4842,
      } ],
      "isDurable" : false,
    }
  },
package com.mycompany.app;

import org.apache.commons.cli.*;
import org.apache.pulsar.client.api.*;

import java.io.IOException;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
//import org.json.JSONException;
//import org.json.JSONObject;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;

import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.util.HashMap;
import java.util.Map;

/**
 * Hello world!
 *
 */
public class App
{
    static CommandLine commandLine = null;
    static long pkts = 0;

    public static void main( String[] args ) {
        CommandLineParser commandLineParser = new DefaultParser();
        Options options = new Options();
        options.addOption("addr", true, "Pulsar Server 地址");
        options.addOption("topic", true, "主题名称");

        try {
            commandLine = commandLineParser.parse(options, args);
        } catch (ParseException e) {
            System.out.println("---- exception");
            System.out.println(e);
            return;
        }

        new PulsarThread().start();
        new MetricThread().start();

        while (true) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {}
        }
    }

    static private class PulsarThread extends Thread {
        public PulsarThread() {
        }

        public void run() {
            try {
                PulsarClient client = PulsarClient.builder().serviceUrl(commandLine.getOptionValue("addr")).build();

                Consumer<byte[]> consumer = client.newConsumer()
                        .topic(commandLine.getOptionValue("topic"))
                        .consumerName("javatest")
                        .subscriptionName("kk-subscription")
                        .ackTimeout(10, TimeUnit.SECONDS)
                        .subscriptionType(SubscriptionType.Key_Shared)
                        .keySharedPolicy(KeySharedPolicy.autoSplitHashRange())
                        .subscriptionMode(SubscriptionMode.NonDurable)
                        .subscribe();

                while (true) {
                    Message<byte[]> message = consumer.receive();
                    if (message == null) {
                        continue;
                    }

                    // consumer.acknowledge(message);

                    MessageId msgId = MessageId.fromByteArrayWithTopic(message.getMessageId().toByteArray(), "persistent://edge-sit-message-center/default/subscriber_1415926654114594816");
                    consumer.acknowledgeAsync(msgId);

                    pkts += 1;
                }
            } catch (Exception e) {

            }
        }
    }

    static private class MetricThread extends Thread {
        public void run() {
            long last = 0;
            while (true) {
                System.out.printf("consumer speed: %d pkt/s\n", pkts - last);
                last = pkts;
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {}
            }
        }
    }
}
nodece commented 3 years ago

@yakir-Yang I tried to run the code provided by you, it works fine.

Set up the environment:

docker run -it \
  -p 6650:6650 \
  -p 8080:8080 --name pulsar-issues-test \
  apachepulsar/pulsar:2.8.0 \
  bin/pulsar standalone

The code so like:

import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;

public class Main {
    public static void main(String[] args) throws Exception {
        String url = "http://localhost:8080";
        String topic = "persistent://public/default/issues-12059";
        PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(url).build();
        PulsarClient client = PulsarClient.builder().serviceUrl(url).build();
        Producer<byte[]> producer = client.newProducer().topic(topic).create();
        Thread producerThread = new Thread(() -> {
            while (true) {
                try {
                    producer.send("hello-pulsar".getBytes(StandardCharsets.UTF_8));
                    Thread.sleep(5 * 1000);
                } catch (PulsarClientException | InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        producerThread.start();

        Consumer<byte[]> consumer = client.newConsumer()
                .topic(topic)
                .consumerName("javatest")
                .subscriptionName("kk-subscription")
                .ackTimeout(10, TimeUnit.SECONDS)
                .subscriptionType(SubscriptionType.Key_Shared)
                .keySharedPolicy(KeySharedPolicy.autoSplitHashRange())
                .subscriptionMode(SubscriptionMode.NonDurable)
                .subscribe();

        int n = 0;
        while (true) {
            Message<byte[]> message = consumer.receive();
            if (message == null) {
                continue;
            }

            MessageId msgId;

            switch (n % 4) {
                case 0:
                    msgId = message.getMessageId();
                    System.out.println("create MessageId from message.getMessageId()");
                    break;
                case 1:
                    System.out.println(
                            "create MessageId from MessageId.fromByteArray(message.getMessageId().toByteArray())");
                    msgId = MessageId.fromByteArray(message.getMessageId().toByteArray());
                    break;
                case 2:
                    System.out.println(
                            "create MessageId from MessageId.fromByteArrayWithTopic(message.getMessageId().toByteArray(), topic)");
                    msgId = MessageId.fromByteArrayWithTopic(message.getMessageId().toByteArray(), topic);
                    break;
                case 3:
                    System.out.println(
                            "create MessageId from MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(messageId), topic)");
                    String messageId = Base64.getEncoder().encodeToString(message.getMessageId().toByteArray());
                    msgId = MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(messageId), topic);
                    break;
                default:
                    throw new IllegalStateException("Unexpected value: " + n);
            }

            consumer.acknowledgeAsync(msgId);
            Thread.sleep(2 * 1000);
            admin.topics().getStats(topic).getSubscriptions().forEach((key, value) -> {
                long unackedMessages = value.getUnackedMessages();
                System.out.println("subscription: " + key + ", unackedMessages: " + unackedMessages);
                if (unackedMessages != 0) {
                    System.out.println("expect value.getUnackedMessages() is 0, but got " + unackedMessages);
                    System.exit(1);
                }
            });
            n++;
        }
    }
}

If you use other versions of pulsar, please tell me.

github-actions[bot] commented 2 years ago

The issue had no activity for 30 days, mark with Stale label.

sliwamichal commented 1 year ago

The environment with the same issue: Java: 17 Pulsar: 3.0.0 (2.11.0 also did not work) Pulsar-client: 3.0.0 (starting from 2.7.2 it stopped working)

The environment in which the issue does NOT exist: Java: 17 Pulsar: 3.0.0 Pulsar-client: 2.7.1

this change was introduced in 2.7.2 (https://github.com/apache/pulsar/pull/9855): https://github.com/apache/pulsar/pull/9855/files

szkoludasebastian commented 1 year ago

Have same issue. Previously we were using Pulsar Client with version 2.7.1. After updating it to 3.0.0 it stopped acknowledging messages. We have tried all versions after 2.7.1 till 3.0.0.

So the environment in which issue exists: Java 17 Pulsar: 3.0.0 Pulsar Client: 3.0.0

Below I attached simple project in which I reproduced this issue: pulsar-message-ack-issue.zip It contains two simple apps: one is responsible for sending messages to topic test-topic-read (it is placed in file Main2.java) second one is responsible for reading those messages from topic test-topic-read, sending them to topic test-topic-write and at the end acknowledging messages on test-topic-read (it is placed in file Main.java). It acknowledges messages with this code:

@SneakyThrows
private CompletableFuture<Void> acceptMessage(Message<byte[]> msg) {
    MessageId msgId = MessageId.fromByteArray(msg.getMessageId().toByteArray());
    return consumer.acknowledgeAsync(msgId);
//            return consumer.acknowledgeAsync(msg.getMessageId());
}

First line is causing the problem that messages are not acknowledged. When I will comment two first lines and uncomment the third line then everything works correctly.