apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.13k stars 3.57k forks source link

pulsar c++ topic compaction with message key does not seem to work #9317

Closed fayce66 closed 3 years ago

fayce66 commented 3 years ago

C++ consumer configured for topic compaction still receives all messages in the log instead of the last one for each topic key.

  1. The c++ producer:
    std::string some_string;
    std::string some_topic_key;

    Client client(lookup_url_);
    Producer producer;
    ProducerConfiguration configuration;
    configuration.setCompressionType(compression_type_); // CompressionNone or CompressionZSTD
    auto result = client.createProducer(topic_, configuration,producer);
    if (result != ResultOk) {
        // print error
        return -1;
    }
    auto msg = MessageBuilder()
            .setContent(some_string.data(), some_string.size())
            .setPartitionKey(some_topic_key)
            .build();
    auto res = producer.send(msg);
  1. The c++ consumer:

    
    Client client(lookup_url_);
    Consumer consumer;
    ConsumerConfiguration configuration;
    configuration.setConsumerType(consumer_type_); // ConsumerExclusive or ConsumerFailover
    if (consumer_type_ == pulsar::ConsumerShared) {
        // cannot have topic compaction with shared subscription ?
        configuration.setReadCompacted(false);
    } else {
        // set topic compaction
        configuration.setReadCompacted(true);
    }
    auto result = client.subscribe(topic_, subscription_name_, configuration, consumer);
    if (result != ResultOk) {
        // print error
        return -1;
    }
    
    Message msg;
    while (true) {
        consumer.receive(msg);
        // do something with message
        consumer.acknowledge(msg);
    }

3. set manual topic compaction with pulsar-admin:
`$ bin/pulsar-admin topics compact "persistent://marianas/alphatrader/wing-calibration"
`

desktop: ubuntu 20.04 client (producer & consumer), pulsar-daemon running on centos 6.10

I checked the messages received by the pulsar-client and the topic keys are properly set:

----- got message -----
key:[fek.wing.ks102], properties:[], content:{"some string"}
----- got message -----
key:[fek.wing.ks103], properties:[], content:{"some string"}
BewareMyPower commented 3 years ago

@codelipenghui please assign to me

BewareMyPower commented 3 years ago

I've tested in my local environment, it works well. Here're my detail steps:

  1. Clone the latest pulsar repository.
  2. Change SampleConsumer.cc and SampleProducer.cc under pulsar-client-cpp/examples to
// SampleProducer.cc (ignore the license header...)
#include <pulsar/Client.h>
#include <lib/LogUtils.h>

DECLARE_LOG_OBJECT()
using namespace pulsar;

int main() {
    Client client("pulsar://localhost:6650");
    Producer producer;
    Result result = client.createProducer("xyz-topic", producer);
    if (result != ResultOk) {
        LOG_ERROR("Error creating producer: " << result);
        return -1;
    }

    // Send synchronously
    const std::string key = "my-key";
    for (int i = 0; i < 10; i++) {
        const auto msg = MessageBuilder().setContent("msg-" + std::to_string(i)).setPartitionKey(key).build();
        MessageId id;
        result = producer.send(msg, id);
        if (result == ResultOk) {
            LOG_INFO("Send " << msg << " to " << id);
        } else {
            LOG_ERROR("Failed to send " << msg << ": " << result);
            break;
        }
    }

    client.close();
    return 0;
}
#include <pulsar/Client.h>
#include <lib/LogUtils.h>

DECLARE_LOG_OBJECT()
using namespace pulsar;

int main(int argc, char* argv[]) {
    const std::string subName = (argc > 1) ? argv[1] : "my-sub";
    Client client("pulsar://localhost:6650");
    Consumer consumer;
    ConsumerConfiguration conf;
    conf.setReadCompacted(true);
    conf.setSubscriptionInitialPosition(InitialPositionEarliest);
    Result result = client.subscribe("xyz-topic", subName, conf, consumer);
    if (result != ResultOk) {
        LOG_ERROR("Failed to subscribe: " << result);
        return -1;
    }

    Message msg;
    while (true) {
        Result result = consumer.receive(msg, 1000);
        if (result == ResultTimeout) {
            break;
        }
        if (result != ResultOk) {
            LOG_ERROR("Failed to receive: " << result);
            return 1;
        }
        LOG_INFO("Receive: " << msg.getPartitionKey() << " => " << msg.getDataAsString() << " from "
                             << msg.getMessageId());
        consumer.acknowledge(msg);
    }

    client.close();
    return 0;
}

As you can see, the SampleProducer send 10 messages (msg-0 to msg-9) with the same key my-key to topic xyz-topic. And the SampleConsumer try to consume all messages from the initial position.

  1. Build the pulsar using mvn clean install -DskipTests -Pcore-modules.
  2. Run a test standalone service by running pulsar-client-cpp/pulsar-test-service-start.sh.
  3. Build the C++ client using cmake.
  4. Run ./examples/SampleProducer.
2021-01-27 16:21:28.153 INFO  [0x118964dc0] SampleProducer:41 | Send Message(prod=standalone-0-0, seq=0, publish_time=1611735688045, payload_size=5, msg_id=(-1,-1,-1,-1), props={}) to (0,0,-1,0)
2021-01-27 16:21:28.158 INFO  [0x118964dc0] SampleProducer:41 | Send Message(prod=standalone-0-0, seq=1, publish_time=1611735688153, payload_size=5, msg_id=(-1,-1,-1,-1), props={}) to (0,1,-1,0)
2021-01-27 16:21:28.164 INFO  [0x118964dc0] SampleProducer:41 | Send Message(prod=standalone-0-0, seq=2, publish_time=1611735688158, payload_size=5, msg_id=(-1,-1,-1,-1), props={}) to (0,2,-1,0)
2021-01-27 16:21:28.168 INFO  [0x118964dc0] SampleProducer:41 | Send Message(prod=standalone-0-0, seq=3, publish_time=1611735688164, payload_size=5, msg_id=(-1,-1,-1,-1), props={}) to (0,3,-1,0)
2021-01-27 16:21:28.175 INFO  [0x118964dc0] SampleProducer:41 | Send Message(prod=standalone-0-0, seq=4, publish_time=1611735688168, payload_size=5, msg_id=(-1,-1,-1,-1), props={}) to (0,4,-1,0)
2021-01-27 16:21:28.180 INFO  [0x118964dc0] SampleProducer:41 | Send Message(prod=standalone-0-0, seq=5, publish_time=1611735688175, payload_size=5, msg_id=(-1,-1,-1,-1), props={}) to (0,5,-1,0)
2021-01-27 16:21:28.185 INFO  [0x118964dc0] SampleProducer:41 | Send Message(prod=standalone-0-0, seq=6, publish_time=1611735688180, payload_size=5, msg_id=(-1,-1,-1,-1), props={}) to (0,6,-1,0)
2021-01-27 16:21:28.191 INFO  [0x118964dc0] SampleProducer:41 | Send Message(prod=standalone-0-0, seq=7, publish_time=1611735688185, payload_size=5, msg_id=(-1,-1,-1,-1), props={}) to (0,7,-1,0)
2021-01-27 16:21:28.195 INFO  [0x118964dc0] SampleProducer:41 | Send Message(prod=standalone-0-0, seq=8, publish_time=1611735688191, payload_size=5, msg_id=(-1,-1,-1,-1), props={}) to (0,8,-1,0)
2021-01-27 16:21:28.200 INFO  [0x118964dc0] SampleProducer:41 | Send Message(prod=standalone-0-0, seq=9, publish_time=1611735688195, payload_size=5, msg_id=(-1,-1,-1,-1), props={}) to (0,9,-1,0)
  1. Run ./examples/SampleConsumer
2021-01-27 16:21:31.886 INFO  [0x102fd1dc0] SampleConsumer:49 | Receive: my-key => msg-0 from (0,0,-1,0)
2021-01-27 16:21:31.887 INFO  [0x102fd1dc0] SampleConsumer:49 | Receive: my-key => msg-1 from (0,1,-1,0)
2021-01-27 16:21:31.887 INFO  [0x102fd1dc0] SampleConsumer:49 | Receive: my-key => msg-2 from (0,2,-1,0)
2021-01-27 16:21:31.887 INFO  [0x102fd1dc0] SampleConsumer:49 | Receive: my-key => msg-3 from (0,3,-1,0)
2021-01-27 16:21:31.887 INFO  [0x102fd1dc0] SampleConsumer:49 | Receive: my-key => msg-4 from (0,4,-1,0)
2021-01-27 16:21:31.887 INFO  [0x102fd1dc0] SampleConsumer:49 | Receive: my-key => msg-5 from (0,5,-1,0)
2021-01-27 16:21:31.887 INFO  [0x102fd1dc0] SampleConsumer:49 | Receive: my-key => msg-6 from (0,6,-1,0)
2021-01-27 16:21:31.887 INFO  [0x102fd1dc0] SampleConsumer:49 | Receive: my-key => msg-7 from (0,7,-1,0)
2021-01-27 16:21:31.887 INFO  [0x102fd1dc0] SampleConsumer:49 | Receive: my-key => msg-8 from (0,8,-1,0)
2021-01-27 16:21:31.887 INFO  [0x102fd1dc0] SampleConsumer:49 | Receive: my-key => msg-9 from (0,9,-1,0)
  1. Go to the project directory and run bin/pulsar-admin topics compact xyz-topic to compact this topic.
  2. Go back to C++ client build directory and run ./examples/SampleConsumer new-sub
2021-01-27 16:21:46.097 INFO  [0x700005d52000] ConsumerImpl:216 | [persistent://public/default/xyz-topic, new-sub, 0] Created consumer on broker [127.0.0.1:54843 -> 127.0.0.1:6650] 
2021-01-27 16:21:46.114 INFO  [0x10ce3ddc0] SampleConsumer:49 | Receive: my-key => msg-9 from (0,9,-1,0)
2021-01-27 16:21:47.116 INFO  [0x10ce3ddc0] ClientImpl:480 | Closing Pulsar client
fayce66 commented 3 years ago

it still does not work for me, I tried to simplify my code to get closer to your example and I even hard-coded the partition key, but it still does not work. I run the producer for 1 or 2 mins to produce few messages then I launch the consumer expecting only the last message but I get:

PM|9827|9839|09:47:07.429047|INFO|[pm.calib.sub]: new message from [stsd.local] on topic [persistent://xyz/foo/calib]: partition key:[test_key]
PM|9827|9839|09:47:07.429175|INFO|[pm.calib.sub]: new message from [stsd.local] on topic [persistent://xyz/foo/calib]: partition key:[test_key]
PM|9827|9839|09:47:07.429206|INFO|[pm.calib.sub]: new message from [stsd.local] on topic [persistent://xyz/foo/calib]: partition key:[test_key]
PM|9827|9839|09:47:07.429228|INFO|[pm.calib.sub]: new message from [stsd.local] on topic [persistent://xyz/foo/calib]: partition key:[test_key]
PM|9827|9839|09:47:07.429256|INFO|[pm.calib.sub]: new message from [stsd.local] on topic [persistent://xyz/foo/calib]: partition key:[test_key]
PM|9827|9839|09:47:07.429277|INFO|[pm.calib.sub]: new message from [stsd.local] on topic [persistent://xyz/foo/calib]: partition key:[test_key]
PM|9827|9839|09:47:07.429296|INFO|[pm.calib.sub]: new message from [stsd.local] on topic [persistent://xyz/foo/calib]: partition key:[test_key]
PM|9827|9839|09:47:07.429322|INFO|[pm.calib.sub]: new message from [stsd.local] on topic [persistent://xyz/foo/calib]: partition key:[test_key]

Also, if I stop the producer and let the consumer receive all the messages, then restart just the consumer with the same subscription to receive the last message with the partition key = 'test_key', I get no messages at all, like I would for a subscription with no topic compaction...(btw, you are using conf.setSubscriptionInitialPosition(InitialPositionEarliest); does that mean that for topic compaction subscription, it's absolutely necessary to set this configuration to receive the last message of my partition key?)

Did you use a newer version of pulsar than 2.7.0? This is the version that I am using now.

I noticed that you are using: result = producer.send(msg, id); which does not exist in pulsar c++ 2.7.0

BewareMyPower commented 3 years ago

@fayce66 Sorry it's a new API. For the old client, you can use

        result = producer.send(msg);
        if (result == ResultOk) {
            LOG_INFO("Send " << msg << " to " << msg.getMessageId());

In addition, I suspect it's not a client side problem but just your compaction failed. You can run a Java consumer to see whether it works. Could you check your broker logs when you did the topic compaction? Here's my logs:

10:12:07.084 [pulsar-web-67-12] INFO  org.apache.pulsar.broker.admin.impl.PersistentTopicsBase - [null] Trigger compaction on topic persistent://public/default/xyz-topic
10:12:07.095 [ForkJoinPool.commonPool-worker-4] INFO  org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config:
// ...
10:12:07.103 [pulsar-client-io-96-1] INFO  org.apache.pulsar.client.impl.ConnectionPool - [[id: 0xebd955fb, L:/127.0.0.1:50650 - R:localhost/127.0.0.1:6650]] Connected to server
10:12:07.099 [ForkJoinPool.commonPool-worker-4] INFO  org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Pulsar client config: {
// ...
10:12:07.103 [pulsar-client-io-96-1] INFO  org.apache.pulsar.client.impl.ConnectionPool - [[id: 0xebd955fb, L:/127.0.0.1:50650 - R:localhost/127.0.0.1:6650]] Connected to server
10:12:07.106 [ForkJoinPool.commonPool-worker-4] INFO  org.eclipse.jetty.server.RequestLog - 127.0.0.1 - - [28/一月/2021:10:12:07 +0800] "PUT /admin/v2/persistent/public/default/xyz-topic/compaction HTTP/1.1" 204 0 "-" "Pulsar-Java-v2.8.0-SNAPSHOT" 24
10:12:07.106 [pulsar-io-50-12] INFO  org.apache.pulsar.broker.service.ServerCnx - New connection from /127.0.0.1:50650
10:12:07.108 [pulsar-client-io-96-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/xyz-topic][__compaction] Subscribing to topic on cnx [id: 0xebd955fb, L:/127.0.0.1:50650 - R:localhost/127.0.0.1:6650], consumerId 0
10:12:07.108 [pulsar-io-50-12] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:50650] Subscribing on topic persistent://public/default/xyz-topic / __compaction
10:12:07.108 [pulsar-io-50-12] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/xyz-topic] Cursor __compaction recovered to position 22:9
10:12:07.109 [pulsar-io-50-12] INFO  org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/xyz-topic] Creating ledger, metadata: {component=[109, 97, 110, 97, 103, 101, 100, 45, 108, 101, 100, 103, 101, 114], pulsar/managed-ledger=[112, 117, 98, 108, 105, 99, 47, 100, 101, 102, 97, 117, 108, 116, 47, 112, 101, 114, 115, 105, 115, 116, 101, 110, 116, 47, 120, 121, 122, 45, 116, 111, 112, 105, 99], pulsar/cursor=[95, 95, 99, 111, 109, 112, 97, 99, 116, 105, 111, 110], application=[112, 117, 108, 115, 97, 114]} - metadata ops timeout : 60 seconds
10:12:07.112 [main-EventThread] INFO  org.apache.bookkeeper.client.LedgerCreateOp - Ensemble: [127.0.0.1:3181] for ledger: 23
10:12:07.118 [BookKeeperClientWorker-OrderedExecutor-11-0] INFO  org.apache.bookkeeper.mledger.impl.MetaStoreImpl - [public/default/persistent/xyz-topic] [__compaction] Updating cursor info ledgerId=23 mark-delete=22:9
10:12:07.119 [bookkeeper-ml-workers-OrderedExecutor-3-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/xyz-topic] Updated cursor __compaction with ledger id 23 md-position=22:9 rd-position=22:10
10:12:07.119 [bookkeeper-ml-workers-OrderedExecutor-3-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/xyz-topic] Opened new cursor: ManagedCursorImpl{ledger=public/default/persistent/xyz-topic, name=__compaction, ackPos=22:9, readPos=22:10}
10:12:07.120 [bookkeeper-ml-workers-OrderedExecutor-3-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/xyz-topic-__compaction] Rewind from 22:10 to 22:10
10:12:07.120 [bookkeeper-ml-workers-OrderedExecutor-3-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/default/xyz-topic] There are no replicated subscriptions on the topic
10:12:07.120 [bookkeeper-ml-workers-OrderedExecutor-3-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/default/xyz-topic][__compaction] Created new subscription for 0
10:12:07.120 [bookkeeper-ml-workers-OrderedExecutor-3-0] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:50650] Created subscription on topic persistent://public/default/xyz-topic / __compaction
10:12:07.121 [pulsar-client-io-96-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/xyz-topic][__compaction] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0
10:12:07.123 [pulsar-client-io-96-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/xyz-topic][__compaction] Seek subscription to message id -1:-1:-1
10:12:07.125 [pulsar-io-50-12] INFO  org.apache.pulsar.broker.service.Consumer - Disconnecting consumer: Consumer{subscription=CompactorSubscription{topic=persistent://public/default/xyz-topic, name=__compaction}, consumerId=0, consumerName=3ee4e, address=/127.0.0.1:50650}
10:12:07.125 [pulsar-io-50-12] INFO  org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer - Removing consumer Consumer{subscription=CompactorSubscription{topic=persistent://public/default/xyz-topic, name=__compaction}, consumerId=0, consumerName=3ee4e, address=/127.0.0.1:50650}
10:12:07.126 [pulsar-io-50-12] INFO  org.apache.pulsar.broker.service.persistent.PersistentSubscription - [persistent://public/default/xyz-topic][__compaction] Successfully disconnected consumers from subscription, proceeding with cursor reset
10:12:07.126 [bookkeeper-ml-workers-OrderedExecutor-3-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/xyz-topic] Initiate reset position to 22:-1 on cursor __compaction
10:12:07.127 [pulsar-client-io-96-1] INFO  org.apache.pulsar.client.impl.ClientCnx - [localhost/127.0.0.1:6650] Broker notification of Closed consumer: 0
10:12:07.127 [pulsar-client-io-96-1] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://public/default/xyz-topic] [__compaction] Closed connection [id: 0xebd955fb, L:/127.0.0.1:50650 - R:localhost/127.0.0.1:6650] -- Will try again in 0.1 s
10:12:07.130 [BookKeeperClientWorker-OrderedExecutor-11-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/xyz-topic] reset position to 22:-1 before current read position 22:10 on cursor __compaction
10:12:07.132 [BookKeeperClientWorker-OrderedExecutor-11-0] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:50650] [persistent://public/default/xyz-topic][__compaction] Reset subscription to message id -1:-1
10:12:07.133 [pulsar-client-io-96-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/xyz-topic][__compaction] Successfully reset subscription to message id -1:-1:-1
10:12:07.228 [pulsar-timer-99-1] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://public/default/xyz-topic] [__compaction] Reconnecting after timeout
10:12:07.230 [pulsar-client-io-96-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/xyz-topic][__compaction] Subscribing to topic on cnx [id: 0xebd955fb, L:/127.0.0.1:50650 - R:localhost/127.0.0.1:6650], consumerId 0
10:12:07.230 [pulsar-io-50-12] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:50650] Subscribing on topic persistent://public/default/xyz-topic / __compaction
10:12:07.231 [pulsar-io-50-12] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/xyz-topic-__compaction] Rewind from 22:-1 to 22:0
10:12:07.231 [pulsar-io-50-12] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/default/xyz-topic] There are no replicated subscriptions on the topic
10:12:07.231 [pulsar-io-50-12] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/default/xyz-topic][__compaction] Created new subscription for 0
10:12:07.231 [pulsar-io-50-12] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:50650] Created subscription on topic persistent://public/default/xyz-topic / __compaction
10:12:07.231 [pulsar-client-io-96-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/xyz-topic][__compaction] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0
10:12:07.239 [pulsar-external-listener-97-1] WARN  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/xyz-topic] [__compaction] Could not get connection while getLastMessageId -- Will try again in 100 ms
10:12:07.240 [pulsar-external-listener-97-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/xyz-topic][__compaction] Get topic last message Id
10:12:07.243 [pulsar-client-io-96-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/xyz-topic][__compaction] Successfully getLastMessageId 22:9
10:12:07.243 [pulsar-client-io-96-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/xyz-topic][__compaction] Get topic last message Id
10:12:07.244 [pulsar-client-io-96-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/xyz-topic][__compaction] Successfully getLastMessageId 22:9
10:12:07.245 [pulsar-client-io-96-1] INFO  org.apache.pulsar.compaction.TwoPhaseCompactor - Commencing phase one of compaction for persistent://public/default/xyz-topic, reading to 22:9:-1:0
10:12:07.256 [main-EventThread] INFO  org.apache.bookkeeper.client.LedgerCreateOp - Ensemble: [127.0.0.1:3181] for ledger: 24
10:12:07.256 [main-EventThread] INFO  org.apache.pulsar.compaction.TwoPhaseCompactor - Commencing phase two of compaction for persistent://public/default/xyz-topic, from 22:0:-1:-1 to 22:9:-1:-1, compacting 1 keys to ledger 24
10:12:07.258 [main-EventThread] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/xyz-topic][__compaction] Seek subscription to message id 22:0:-1:-1
10:12:07.259 [pulsar-io-50-12] INFO  org.apache.pulsar.broker.service.Consumer - Disconnecting consumer: Consumer{subscription=CompactorSubscription{topic=persistent://public/default/xyz-topic, name=__compaction}, consumerId=0, consumerName=3ee4e, address=/127.0.0.1:50650}
10:12:07.259 [pulsar-io-50-12] INFO  org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer - Removing consumer Consumer{subscription=CompactorSubscription{topic=persistent://public/default/xyz-topic, name=__compaction}, consumerId=0, consumerName=3ee4e, address=/127.0.0.1:50650}
10:12:07.259 [pulsar-io-50-12] INFO  org.apache.pulsar.broker.service.persistent.PersistentSubscription - [persistent://public/default/xyz-topic][__compaction] Successfully disconnected consumers from subscription, proceeding with cursor reset
10:12:07.259 [pulsar-client-io-96-1] INFO  org.apache.pulsar.client.impl.ClientCnx - [localhost/127.0.0.1:6650] Broker notification of Closed consumer: 0
10:12:07.259 [pulsar-client-io-96-1] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://public/default/xyz-topic] [__compaction] Closed connection [id: 0xebd955fb, L:/127.0.0.1:50650 - R:localhost/127.0.0.1:6650] -- Will try again in 0.1 s
10:12:07.259 [bookkeeper-ml-workers-OrderedExecutor-3-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/xyz-topic] Initiate reset position to 22:0 on cursor __compaction
10:12:07.261 [BookKeeperClientWorker-OrderedExecutor-11-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/xyz-topic] reset position to 22:0 before current read position 22:10 on cursor __compaction
10:12:07.261 [BookKeeperClientWorker-OrderedExecutor-11-0] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:50650] [persistent://public/default/xyz-topic][__compaction] Reset subscription to message id 22:0
10:12:07.261 [pulsar-client-io-96-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/xyz-topic][__compaction] Successfully reset subscription to message id 22:0:-1:-1
10:12:07.360 [pulsar-timer-99-1] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://public/default/xyz-topic] [__compaction] Reconnecting after timeout
10:12:07.361 [pulsar-client-io-96-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/xyz-topic][__compaction] Subscribing to topic on cnx [id: 0xebd955fb, L:/127.0.0.1:50650 - R:localhost/127.0.0.1:6650], consumerId 0
10:12:07.361 [pulsar-io-50-12] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:50650] Subscribing on topic persistent://public/default/xyz-topic / __compaction
10:12:07.362 [pulsar-io-50-12] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/xyz-topic-__compaction] Rewind from 22:0 to 22:0
10:12:07.362 [pulsar-io-50-12] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/default/xyz-topic] There are no replicated subscriptions on the topic
10:12:07.362 [pulsar-io-50-12] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/default/xyz-topic][__compaction] Created new subscription for 0
10:12:07.362 [pulsar-io-50-12] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:50650] Created subscription on topic persistent://public/default/xyz-topic / __compaction
10:12:07.362 [pulsar-client-io-96-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/xyz-topic][__compaction] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0
10:12:07.382 [pulsar-io-50-12] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:50650] Closing consumer: consumerId=0
10:12:07.382 [pulsar-io-50-12] INFO  org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer - Removing consumer Consumer{subscription=CompactorSubscription{topic=persistent://public/default/xyz-topic, name=__compaction}, consumerId=0, consumerName=3ee4e, address=/127.0.0.1:50650}
10:12:07.382 [pulsar-io-50-12] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:50650] Closed consumer, consumerId=0
10:12:07.383 [pulsar-client-io-96-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/xyz-topic] [__compaction] Closed consumer
fayce66 commented 3 years ago

I don't see any option to start the bin/pulsar-client with topic compaction...am i wrong? I tried to test it this way but the help message does not say how to set the read compacted to true:

bin/pulsar-client --url "pulsar://marianas1:6650" consume "persistent://marianas/alphatrader/wing-calibration" -s "fek.sts" -n 0

17:01:45.245 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConnectionPool - [[id: 0xdf0a36c1, L:/10.0.3.6:48808 - R:marianas1/10.108.12.141:6650]] Connected to server
17:01:45.809 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {
  "topicNames" : [ "persistent://marianas/alphatrader/wing-calibration" ],
  "topicsPattern" : null,
  "subscriptionName" : "fek.sts",
  "subscriptionType" : "Exclusive",
  "subscriptionMode" : "Durable",
  "receiverQueueSize" : 1000,
  "acknowledgementsGroupTimeMicros" : 100000,
  "negativeAckRedeliveryDelayMicros" : 60000000,
  "maxTotalReceiverQueueSizeAcrossPartitions" : 50000,
  "consumerName" : null,
  "ackTimeoutMillis" : 0,
  "tickDurationMillis" : 1000,
  "priorityLevel" : 0,
  "maxPendingChuckedMessage" : 10,
  "autoAckOldestChunkedMessageOnQueueFull" : false,
  "expireTimeOfIncompleteChunkedMessageMillis" : 60000,
  "cryptoFailureAction" : "FAIL",
  "properties" : { },
  "readCompacted" : false,
  "subscriptionInitialPosition" : "Latest",
  "patternAutoDiscoveryPeriod" : 60,
  "regexSubscriptionMode" : "PersistentOnly",
  "deadLetterPolicy" : null,
  "retryEnable" : false,
  "autoUpdatePartitions" : true,
  "autoUpdatePartitionsIntervalSeconds" : 60,
  "replicateSubscriptionState" : false,
  "resetIncludeHead" : false,
  "keySharedPolicy" : null,
  "batchIndexAckEnabled" : false
}
17:01:45.814 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Pulsar client config: {
  "serviceUrl" : "pulsar://marianas1:6650",
  "authPluginClassName" : null,
  "operationTimeoutMs" : 30000,
  "statsIntervalSeconds" : 60,
  "numIoThreads" : 1,
  "numListenerThreads" : 1,
  "connectionsPerBroker" : 1,
  "useTcpNoDelay" : true,
  "useTls" : false,
  "tlsTrustCertsFilePath" : "",
  "tlsAllowInsecureConnection" : false,
  "tlsHostnameVerificationEnable" : false,
  "concurrentLookupRequest" : 5000,
  "maxLookupRequest" : 50000,
  "maxLookupRedirects" : 20,
  "maxNumberOfRejectedRequestPerConnection" : 50,
  "keepAliveIntervalSeconds" : 30,
  "connectionTimeoutMs" : 10000,
  "requestTimeoutMs" : 60000,
  "initialBackoffIntervalNanos" : 100000000,
  "maxBackoffIntervalNanos" : 60000000000,
  "listenerName" : null,
  "useKeyStoreTls" : false,
  "sslProvider" : null,
  "tlsTrustStoreType" : "JKS",
  "tlsTrustStorePath" : "",
  "tlsTrustStorePassword" : "",
  "tlsCiphers" : [ ],
  "tlsProtocols" : [ ],
  "proxyServiceUrl" : null,
  "proxyProtocol" : null,
  "enableTransaction" : false
}
BewareMyPower commented 3 years ago

My description before might be a little confused.

The logs I showed before was from broker side with the compaction command bin/pulsar-admin topics compact xyz-topic. The broker itself uses an internal ConsumerImpl to consume messages and then do compaction. I just want you to check if your topic compaction was performed. The steps:

  1. Run a pulsar standalone in front end: bin/pulsar standalone, then you can see the output directly.
  2. Open a new terminal and run a producer to send some messages to xyz-topic.
  3. Run bin/pulsar-admin topics compact xyz-topic, see the output of standalone to check if the compaction was performed.

As for

You can run a Java consumer to see whether it works.

I just mean if you're sure that the compaction was done in broker side, you can run a Java client application (pulsar-client seems not to provide the option to enable readCompacted) to check if Java client could only read 1 message while C++ client reads all messages.

fayce66 commented 3 years ago

OK, I rerun the pulsar-admin topics compact commands and effectively I can see some __compaction messages in the log now, and I restarted my client and it looks like it is working now. I did that 2 times yesterday and it did not work, that is strange. FYI, I stop and restart the pulsar-daemon every morning. Are the configurations saved or do I have to re-run the topics commands all the time I restart the broker??

Here is my log, I think the compaction is working now, but I'm gonna keep on testing some more to make sure it's OK now:

16:14:05.400 [pulsar-web-68-12] INFO  org.apache.pulsar.broker.admin.impl.PersistentTopicsBase - [null] Trigger compaction on topic persistent://marianas/alphatrader/wing-calibration
16:14:05.421 [ForkJoinPool.commonPool-worker-29] INFO  org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {
  "topicNames" : [ "persistent://marianas/alphatrader/wing-calibration" ],
  "topicsPattern" : null,
  "subscriptionName" : "__compaction",
  "subscriptionType" : "Exclusive",
  "subscriptionMode" : "Durable",
  "receiverQueueSize" : 1000,
  "acknowledgementsGroupTimeMicros" : 100000,
  "negativeAckRedeliveryDelayMicros" : 60000000,
  "maxTotalReceiverQueueSizeAcrossPartitions" : 50000,
  "consumerName" : null,
  "ackTimeoutMillis" : 0,
  "tickDurationMillis" : 1000,
  "priorityLevel" : 0,
  "maxPendingChuckedMessage" : 10,
  "autoAckOldestChunkedMessageOnQueueFull" : false,
  "expireTimeOfIncompleteChunkedMessageMillis" : 60000,
  "cryptoFailureAction" : "FAIL",
  "properties" : { },
  "readCompacted" : true,
  "subscriptionInitialPosition" : "Latest",
  "patternAutoDiscoveryPeriod" : 60,
  "regexSubscriptionMode" : "PersistentOnly",
  "deadLetterPolicy" : null,
  "retryEnable" : false,
  "autoUpdatePartitions" : true,
  "autoUpdatePartitionsIntervalSeconds" : 60,
  "replicateSubscriptionState" : false,
  "resetIncludeHead" : false,
  "keySharedPolicy" : null,
  "batchIndexAckEnabled" : false
}
16:14:05.425 [ForkJoinPool.commonPool-worker-29] INFO  org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Pulsar client config: {
  "serviceUrl" : "pulsar://localhost:6650",
  "authPluginClassName" : "org.apache.pulsar.client.impl.auth.AuthenticationDisabled",
  "operationTimeoutMs" : 30000,
  "statsIntervalSeconds" : 60,
  "numIoThreads" : 1,
  "numListenerThreads" : 1,
  "connectionsPerBroker" : 1,
  "useTcpNoDelay" : true,
  "useTls" : false,
  "tlsTrustCertsFilePath" : null,
  "tlsAllowInsecureConnection" : false,
  "tlsHostnameVerificationEnable" : false,
  "concurrentLookupRequest" : 5000,
  "maxLookupRequest" : 50000,
  "maxLookupRedirects" : 20,
  "maxNumberOfRejectedRequestPerConnection" : 50,
  "keepAliveIntervalSeconds" : 30,
  "connectionTimeoutMs" : 10000,
  "requestTimeoutMs" : 60000,
  "initialBackoffIntervalNanos" : 100000000,
  "maxBackoffIntervalNanos" : 60000000000,
  "listenerName" : null,
  "useKeyStoreTls" : false,
  "sslProvider" : null,
  "tlsTrustStoreType" : "JKS",
  "tlsTrustStorePath" : null,
  "tlsTrustStorePassword" : null,
  "tlsCiphers" : [ ],
  "tlsProtocols" : [ ],
  "proxyServiceUrl" : null,
  "proxyProtocol" : null,
  "enableTransaction" : false
}
16:14:05.432 [pulsar-client-io-101-1] INFO  org.apache.pulsar.client.impl.ConnectionPool - [[id: 0x977756f3, L:/127.0.0.1:42032 - R:localhost/127.0.0.1:6650]] Connected to server
16:14:05.432 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.ServerCnx - New connection from /127.0.0.1:42032
16:14:05.436 [ForkJoinPool.commonPool-worker-29] INFO  org.eclipse.jetty.server.RequestLog - 127.0.0.1 - - [28/Jan/2021:16:14:05 +0900] "PUT /admin/v2/persistent/marianas/alphatrader/wing-calibration/compaction HTTP/1.1" 204 0 "-" "Pulsar-Java-v2.7.0" 38
16:14:05.437 [pulsar-client-io-101-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://marianas/alphatrader/wing-calibration][__compaction] Subscribing to topic on cnx [id: 0x977756f3, L:/127.0.0.1:42032 - R:localhost/127.0.0.1:6650], consumerId 0
16:14:05.437 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:42032] Subscribing on topic persistent://marianas/alphatrader/wing-calibration / __compaction
16:14:05.437 [pulsar-io-50-41] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [marianas/alphatrader/persistent/wing-calibration-__compaction] Rewind from 13911:132 to 13911:132
16:14:05.437 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://marianas/alphatrader/wing-calibration] There are no replicated subscriptions on the topic
16:14:05.437 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://marianas/alphatrader/wing-calibration][__compaction] Created new subscription for 0
16:14:05.437 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:42032] Created subscription on topic persistent://marianas/alphatrader/wing-calibration / __compaction
16:14:05.438 [pulsar-client-io-101-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://marianas/alphatrader/wing-calibration][__compaction] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0
16:14:05.442 [pulsar-client-io-101-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://marianas/alphatrader/wing-calibration][__compaction] Seek subscription to message id -1:-1:-1
16:14:05.443 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.Consumer - Disconnecting consumer: Consumer{subscription=CompactorSubscription{topic=persistent://marianas/alphatrader/wing-calibration, name=__compaction}, consumerId=0, consumerName=99cae, address=/127.0.0.1:42032}
16:14:05.443 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer - Removing consumer Consumer{subscription=CompactorSubscription{topic=persistent://marianas/alphatrader/wing-calibration, name=__compaction}, consumerId=0, consumerName=99cae, address=/127.0.0.1:42032}
16:14:05.444 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.persistent.PersistentSubscription - [persistent://marianas/alphatrader/wing-calibration][__compaction] Successfully disconnected consumers from subscription, proceeding with cursor reset
16:14:05.444 [pulsar-client-io-101-1] INFO  org.apache.pulsar.client.impl.ClientCnx - [localhost/127.0.0.1:6650] Broker notification of Closed consumer: 0
16:14:05.444 [pulsar-client-io-101-1] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://marianas/alphatrader/wing-calibration] [__compaction] Closed connection [id: 0x977756f3, L:/127.0.0.1:42032 - R:localhost/127.0.0.1:6650] -- Will try again in 0.1 s
16:14:05.446 [bookkeeper-ml-workers-OrderedExecutor-0-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [marianas/alphatrader/persistent/wing-calibration] Initiate reset position to 12028:-1 on cursor __compaction
16:14:05.448 [bookkeeper-ml-workers-OrderedExecutor-0-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [marianas/alphatrader/persistent/wing-calibration] Creating ledger, metadata: {component=[109, 97, 110, 97, 103, 101, 100, 45, 108, 101, 100, 103, 101, 114], pulsar/managed-ledger=[109, 97, 114, 105, 97, 110, 97, 115, 47, 97, 108, 112, 104, 97, 116, 114, 97, 100, 101, 114, 47, 112, 101, 114, 115, 105, 115, 116, 101, 110, 116, 47, 119, 105, 110, 103, 45, 99, 97, 108, 105, 98, 114, 97, 116, 105, 111, 110], pulsar/cursor=[95, 95, 99, 111, 109, 112, 97, 99, 116, 105, 111, 110], application=[112, 117, 108, 115, 97, 114]} - metadata ops timeout : 60 seconds
16:14:05.465 [main-EventThread] INFO  org.apache.bookkeeper.client.LedgerCreateOp - Ensemble: [127.0.0.1:3181] for ledger: 19142
16:14:05.475 [BookKeeperClientWorker-OrderedExecutor-14-0] INFO  org.apache.bookkeeper.mledger.impl.MetaStoreImpl - [marianas/alphatrader/persistent/wing-calibration] [__compaction] Updating cursor info ledgerId=19142 mark-delete=13911:131
16:14:05.481 [bookkeeper-ml-workers-OrderedExecutor-0-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [marianas/alphatrader/persistent/wing-calibration] Updated cursor __compaction with ledger id 19142 md-position=13911:131 rd-position=13911:132
16:14:05.484 [BookKeeperClientWorker-OrderedExecutor-14-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [marianas/alphatrader/persistent/wing-calibration] reset position to 12028:-1 before current read position 13911:132 on cursor __compaction
16:14:05.487 [BookKeeperClientWorker-OrderedExecutor-14-0] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:42032] [persistent://marianas/alphatrader/wing-calibration][__compaction] Reset subscription to message id -1:-1
16:14:05.487 [pulsar-client-io-101-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://marianas/alphatrader/wing-calibration][__compaction] Successfully reset subscription to message id -1:-1:-1
16:14:05.546 [pulsar-timer-104-1] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://marianas/alphatrader/wing-calibration] [__compaction] Reconnecting after timeout
16:14:05.548 [pulsar-client-io-101-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://marianas/alphatrader/wing-calibration][__compaction] Subscribing to topic on cnx [id: 0x977756f3, L:/127.0.0.1:42032 - R:localhost/127.0.0.1:6650], consumerId 0
16:14:05.548 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:42032] Subscribing on topic persistent://marianas/alphatrader/wing-calibration / __compaction
16:14:05.548 [pulsar-io-50-41] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [marianas/alphatrader/persistent/wing-calibration-__compaction] Rewind from 12028:-1 to 12028:0
16:14:05.549 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://marianas/alphatrader/wing-calibration] There are no replicated subscriptions on the topic
16:14:05.549 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://marianas/alphatrader/wing-calibration][__compaction] Created new subscription for 0
16:14:05.549 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:42032] Created subscription on topic persistent://marianas/alphatrader/wing-calibration / __compaction
16:14:05.549 [pulsar-client-io-101-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://marianas/alphatrader/wing-calibration][__compaction] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0
16:14:05.590 [pulsar-external-listener-102-1] WARN  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://marianas/alphatrader/wing-calibration] [__compaction] Could not get connection while getLastMessageId -- Will try again in 100 ms
16:14:05.592 [pulsar-external-listener-102-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://marianas/alphatrader/wing-calibration][__compaction] Get topic last message Id
16:14:05.614 [pulsar-client-io-101-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://marianas/alphatrader/wing-calibration][__compaction] Successfully getLastMessageId 18970:696
16:14:05.614 [pulsar-client-io-101-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://marianas/alphatrader/wing-calibration][__compaction] Get topic last message Id
16:14:05.618 [pulsar-client-io-101-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://marianas/alphatrader/wing-calibration][__compaction] Successfully getLastMessageId 18970:696
16:14:05.618 [pulsar-client-io-101-1] INFO  org.apache.pulsar.compaction.TwoPhaseCompactor - Commencing phase one of compaction for persistent://marianas/alphatrader/wing-calibration, reading to 18970:696:-1
16:14:06.037 [io-write-scheduler-OrderedScheduler-0-0] INFO  org.apache.bookkeeper.statelib.impl.kv.RocksdbKVStore - Checkpoint local state store 000000000000000000/000000000000000000/000000000000000000 at revision -1
16:14:06.037 [io-checkpoint-scheduler-OrderedScheduler-0-0] INFO  org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.RocksdbCheckpointTask - Create a local checkpoint of state store 000000000000000000/000000000000000000/000000000000000000 at /home/faycal/Tools/apache-pulsar-2.7.0/data/standalone/bookkeeper/ranges/data/ranges/000000000000000000/000000000000000000/000000000000000000/checkpoints/90356abe-2479-421d-a67b-fcd874fdcd85
16:14:06.175 [main-EventThread] INFO  org.apache.bookkeeper.client.LedgerCreateOp - Ensemble: [127.0.0.1:3181] for ledger: 19143
16:14:06.176 [main-EventThread] INFO  org.apache.pulsar.compaction.TwoPhaseCompactor - Commencing phase two of compaction for persistent://marianas/alphatrader/wing-calibration, from 13911:130:-1:-1 to 18970:696:-1:-1, compacting 2 keys to ledger 19143
16:14:06.178 [main-EventThread] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://marianas/alphatrader/wing-calibration][__compaction] Seek subscription to message id 13911:130:-1:-1
16:14:06.179 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.Consumer - Disconnecting consumer: Consumer{subscription=CompactorSubscription{topic=persistent://marianas/alphatrader/wing-calibration, name=__compaction}, consumerId=0, consumerName=99cae, address=/127.0.0.1:42032}
16:14:06.179 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer - Removing consumer Consumer{subscription=CompactorSubscription{topic=persistent://marianas/alphatrader/wing-calibration, name=__compaction}, consumerId=0, consumerName=99cae, address=/127.0.0.1:42032}
16:14:06.179 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.persistent.PersistentSubscription - [persistent://marianas/alphatrader/wing-calibration][__compaction] Successfully disconnected consumers from subscription, proceeding with cursor reset
16:14:06.179 [bookkeeper-ml-workers-OrderedExecutor-0-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [marianas/alphatrader/persistent/wing-calibration] Initiate reset position to 13911:130 on cursor __compaction
16:14:06.179 [pulsar-client-io-101-1] INFO  org.apache.pulsar.client.impl.ClientCnx - [localhost/127.0.0.1:6650] Broker notification of Closed consumer: 0
16:14:06.179 [pulsar-client-io-101-1] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://marianas/alphatrader/wing-calibration] [__compaction] Closed connection [id: 0x977756f3, L:/127.0.0.1:42032 - R:localhost/127.0.0.1:6650] -- Will try again in 0.1 s
16:14:06.181 [BookKeeperClientWorker-OrderedExecutor-14-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [marianas/alphatrader/persistent/wing-calibration] reset position to 13911:130 before current read position 18970:697 on cursor __compaction
16:14:06.181 [BookKeeperClientWorker-OrderedExecutor-14-0] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:42032] [persistent://marianas/alphatrader/wing-calibration][__compaction] Reset subscription to message id 13911:130
16:14:06.181 [pulsar-client-io-101-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://marianas/alphatrader/wing-calibration][__compaction] Successfully reset subscription to message id 13911:130:-1:-1
16:14:06.222 [main-EventThread] INFO  org.apache.distributedlog.bk.SimpleLedgerAllocator - Ledger allocator for /stream/storage/000000000000000000/000000000000000000/000000000000000000/checkpoints/90356abe-2479-421d-a67b-fcd874fdcd85/MANIFEST-000017/<default>/allocation moved version from -1 to 0.
16:14:06.226 [main-EventThread] INFO  org.apache.distributedlog.BKLogWriteHandler - Initiating Recovery For 000000000000000000/000000000000000000/000000000000000000/checkpoints/90356abe-2479-421d-a67b-fcd874fdcd85/MANIFEST-000017:<default> : []
16:14:06.227 [io-checkpoint-scheduler-OrderedScheduler-0-0] INFO  org.apache.distributedlog.BKLogWriteHandler - Initiating Recovery For 000000000000000000/000000000000000000/000000000000000000/checkpoints/90356abe-2479-421d-a67b-fcd874fdcd85/MANIFEST-000017:<default> : []
16:14:06.231 [DLM-/stream/storage-OrderedScheduler-0-0] INFO  org.apache.distributedlog.bk.SimpleLedgerAllocator - Ledger allocator /stream/storage/000000000000000000/000000000000000000/000000000000000000/checkpoints/90356abe-2479-421d-a67b-fcd874fdcd85/MANIFEST-000017/<default>/allocation moved to phase ALLOCATING : version = 0.
16:14:06.245 [DLM-/stream/storage-OrderedScheduler-3-0-EventThread] INFO  org.apache.bookkeeper.client.LedgerCreateOp - Ensemble: [127.0.0.1:3181] for ledger: 19144
16:14:06.261 [main-EventThread] INFO  org.apache.distributedlog.bk.SimpleLedgerAllocator - Ledger allocator for /stream/storage/000000000000000000/000000000000000000/000000000000000000/checkpoints/90356abe-2479-421d-a67b-fcd874fdcd85/MANIFEST-000017/<default>/allocation moved version from 0 to 1.
16:14:06.261 [main-EventThread] INFO  org.apache.distributedlog.bk.SimpleLedgerAllocator - Ledger allocator /stream/storage/000000000000000000/000000000000000000/000000000000000000/checkpoints/90356abe-2479-421d-a67b-fcd874fdcd85/MANIFEST-000017/<default>/allocation moved to phase ALLOCATED : version = 1.
16:14:06.261 [main-EventThread] INFO  org.apache.distributedlog.bk.SimpleLedgerAllocator - Ledger allocator /stream/storage/000000000000000000/000000000000000000/000000000000000000/checkpoints/90356abe-2479-421d-a67b-fcd874fdcd85/MANIFEST-000017/<default>/allocation moved to phase HANDING_OVER : version = 1.
16:14:06.261 [main-EventThread] INFO  org.apache.distributedlog.BKLogWriteHandler - No max ledger sequence number found while creating log segment 1 for 000000000000000000/000000000000000000/000000000000000000/checkpoints/90356abe-2479-421d-a67b-fcd874fdcd85/MANIFEST-000017:<default>.
16:14:06.269 [main-EventThread] INFO  org.apache.distributedlog.bk.SimpleLedgerAllocator - Ledger allocator /stream/storage/000000000000000000/000000000000000000/000000000000000000/checkpoints/90356abe-2479-421d-a67b-fcd874fdcd85/MANIFEST-000017/<default>/allocation moved to phase HANDED_OVER : version = 1.
16:14:06.269 [main-EventThread] INFO  org.apache.distributedlog.bk.SimpleLedgerAllocator - Ledger allocator for /stream/storage/000000000000000000/000000000000000000/000000000000000000/checkpoints/90356abe-2479-421d-a67b-fcd874fdcd85/MANIFEST-000017/<default>/allocation moved version from 1 to 2.
16:14:06.269 [main-EventThread] INFO  org.apache.distributedlog.bk.SimpleLedgerAllocator - Ledger allocator /stream/storage/000000000000000000/000000000000000000/000000000000000000/checkpoints/90356abe-2479-421d-a67b-fcd874fdcd85/MANIFEST-000017/<default>/allocation moved to phase ALLOCATING : version = 2.
16:14:06.269 [main-EventThread] INFO  org.apache.distributedlog.logsegment.PerStreamLogSegmentCache - 000000000000000000/000000000000000000/000000000000000000/checkpoints/90356abe-2479-421d-a67b-fcd874fdcd85/MANIFEST-000017 added log segment (inprogress_000000000000000001 : [LogSegmentId:19144, firstTxId:202, lastTxId:-999, version:VERSION_V5_SEQUENCE_ID, completionTime:0, recordCount:0, regionId:0, status:0, logSegmentSequenceNumber:1, lastEntryId:-1, lastSlotId:-1, inprogress:true, minActiveDLSN:DLSN{logSegmentSequenceNo=1, entryId=0, slotId=0}, startSequenceId:-1]) to cache.
16:14:06.277 [main-EventThread] INFO  org.apache.distributedlog.BKLogWriteHandler - Deleting log segments older than 1611558846272 for 000000000000000000/000000000000000000/000000000000000000/checkpoints/90356abe-2479-421d-a67b-fcd874fdcd85/MANIFEST-000017:<default> : []
16:14:06.278 [DLM-/stream/storage-OrderedScheduler-13-0] INFO  org.apache.distributedlog.BKLogSegmentWriter - Flushing before closing log segment 000000000000000000/000000000000000000/000000000000000000/checkpoints/90356abe-2479-421d-a67b-fcd874fdcd85/MANIFEST-000017:<default>:inprogress_000000000000000001
16:14:06.280 [pulsar-timer-104-1] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://marianas/alphatrader/wing-calibration] [__compaction] Reconnecting after timeout
16:14:06.280 [BookKeeperClientWorker-OrderedExecutor-0-0] INFO  org.apache.distributedlog.BKLogSegmentWriter - Closing BKPerStreamLogWriter (abort=false) for 000000000000000000/000000000000000000/000000000000000000/checkpoints/90356abe-2479-421d-a67b-fcd874fdcd85/MANIFEST-000017:<default>:inprogress_000000000000000001 : lastDLSN = DLSN{logSegmentSequenceNo=1, entryId=0, slotId=0} outstandingTransmits = 1 writesPendingTransmit = 0
16:14:06.280 [BookKeeperClientWorker-OrderedExecutor-0-0] INFO  org.apache.distributedlog.BKLogSegmentWriter - Stream 000000000000000000/000000000000000000/000000000000000000/checkpoints/90356abe-2479-421d-a67b-fcd874fdcd85/MANIFEST-000017:<default>:inprogress_000000000000000001 aborted 0 writes
16:14:06.281 [pulsar-client-io-101-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://marianas/alphatrader/wing-calibration][__compaction] Subscribing to topic on cnx [id: 0x977756f3, L:/127.0.0.1:42032 - R:localhost/127.0.0.1:6650], consumerId 0
16:14:06.282 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:42032] Subscribing on topic persistent://marianas/alphatrader/wing-calibration / __compaction
16:14:06.282 [pulsar-io-50-41] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [marianas/alphatrader/persistent/wing-calibration-__compaction] Rewind from 13911:130 to 13911:130
16:14:06.282 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://marianas/alphatrader/wing-calibration] There are no replicated subscriptions on the topic
16:14:06.282 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://marianas/alphatrader/wing-calibration][__compaction] Created new subscription for 0
16:14:06.282 [pulsar-io-50-41] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:42032] Created subscription on topic persistent://marianas/alphatrader/wing-calibration / __compaction
16:14:06.282 [pulsar-client-io-101-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://marianas/alphatrader/wing-calibration][__compaction] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0
fayce66 commented 3 years ago

OK, what I have noticed is the following: if my producer produces more messages while my consumer is not started, the first time I start the consumer, it will receive all missed messages for the subscription. If I stop/restart the consumer with the same subscription, it will receive only the last message/key. Is that a normal behavior? In your test, your producer does not produce more messages before the second subscription new-sub is started...

BewareMyPower commented 3 years ago

I just did another test:

  1. Produce 10 messages.
  2. Compact the topic.
  3. Run consumer with sub1, only 1 message was received:
2021-01-28 15:53:32.400 INFO  [0x1031aedc0] SampleConsumer:49 | Receive: my-key => msg-9 from (10,9,-1,0)
  1. Produce another 10 messages.
  2. Run consumer with sub2, 11 messages were received:
2021-01-28 15:54:06.022 INFO  [0x110a19dc0] SampleConsumer:49 | Receive: my-key => msg-9 from (10,9,-1,0)
2021-01-28 15:54:06.025 INFO  [0x110a19dc0] SampleConsumer:49 | Receive: my-key => msg-0 from (10,10,-1,0)
2021-01-28 15:54:06.025 INFO  [0x110a19dc0] SampleConsumer:49 | Receive: my-key => msg-1 from (10,11,-1,0)
2021-01-28 15:54:06.025 INFO  [0x110a19dc0] SampleConsumer:49 | Receive: my-key => msg-2 from (10,12,-1,0)
2021-01-28 15:54:06.025 INFO  [0x110a19dc0] SampleConsumer:49 | Receive: my-key => msg-3 from (10,13,-1,0)
2021-01-28 15:54:06.025 INFO  [0x110a19dc0] SampleConsumer:49 | Receive: my-key => msg-4 from (10,14,-1,0)
2021-01-28 15:54:06.025 INFO  [0x110a19dc0] SampleConsumer:49 | Receive: my-key => msg-5 from (10,15,-1,0)
2021-01-28 15:54:06.025 INFO  [0x110a19dc0] SampleConsumer:49 | Receive: my-key => msg-6 from (10,16,-1,0)
2021-01-28 15:54:06.025 INFO  [0x110a19dc0] SampleConsumer:49 | Receive: my-key => msg-7 from (10,17,-1,0)
2021-01-28 15:54:06.025 INFO  [0x110a19dc0] SampleConsumer:49 | Receive: my-key => msg-8 from (10,18,-1,0)
2021-01-28 15:54:06.025 INFO  [0x110a19dc0] SampleConsumer:49 | Receive: my-key => msg-9 from (10,19,-1,0)

I think the behavior is normal. Or did I just misunderstand?

fayce66 commented 3 years ago

What do you mean by 'compact the topic' after producing 10 messages? you mean you run the pulsar-admin compact command again? if you started sub2 after the last 10 messages were produced, why is sub2 receiving everything if there's topic compaction?

Maybe my mistake but I don't understand the topic compaction as functioning like this. My understanding is that it should always return the last message/key, even if more messages have been produced in the meantime, that is the whole point. The consumer is not interested in past messages, only the last values, like a last-value queue...it defeats the purpose of topic compaction if we have to receive all messages the first time, and then only the last one the second time...what do you think? Can you try to restart consumer sub2 a second time without producing more messages? you should receive only last message now...

The way it is working now it's like the compaction stops for a given subscription at the last message produced while the subscription is alive. When new messages are produced, it's like to topic compaction feature stays at the same messageId for that subscription, so that the subscription gets all the delta when restarted, and then the topic compaction is updated again...

BewareMyPower commented 3 years ago

What do you mean by 'compact the topic' after producing 10 messages? you mean you run the pulsar-admin compact command again?

Yeah, the command is just a manual compaction for old messages. A manual compaction is just compacting the existed messages but not compact all old messages when a new message arrive, which is inefficient. Topic compaction's purpose is mainly for reducing storage.

If you have a consumer that subscribes a topic, even if no compaction was done, the consumer will always read the latest message because each time a message is acknowledged, the consume position will be persisted as a cursor. However, for some scenarios like what you said, once a newer message has been read, the old messages are no longer meaningful. We needn't store these messages any more so we can compact the topic. Then these messages are useless for new subscription.

See http://pulsar.apache.org/docs/en/concepts-topic-compaction/, There're two ways to do topic compaction:

Triggered automatically when the backlog reaches a certain size or can be triggered manually via the command line. See the Topic compaction cookbook

The command, is just triggering compaction manually. I think what you want is doing the compaction for each message arrived. It's unnecessary except you never acknowledge any message and restart consuming or use a new subscription frequently.

A suggestion is to use namespace policy or topic level policy for topic compaction.

fayce66 commented 3 years ago

OK, so basically when I restart my consumers, the best thing to do is to trigger topic compaction manually before I restarted them so that I get only the last message/key correct? if at least it works like this, then it is fine, but the documentation is confusing. I thought the compaction was done automatically behind the scenes when new messages are produced.

When you run compaction on a topic, Pulsar goes through a topic's backlog and removes messages that are obscured by later messages, i.e. it goes through the topic on a per-key basis and leaves only the most recent message associated with that key.

The doc should say "when you run manual compaction on a topic" that would be more clear.

BewareMyPower commented 3 years ago

OK, so basically when I restart my consumers, the best thing to do is to trigger topic compaction manually before I restarted them so that I get only the last message/key correct?

Yes.

The doc should say "when you run manual compaction on a topic" that would be more clear.

I think manual is redundant, because the automatic compaction does the same as the manual compaction. IMO, it should be emphasized that compaction here should be treated as an action that could be triggered. However, topic compaction is also a feature that is applied to a topic. So run compaction on a topic might be misunderstood as applying this feature on a topic.

fayce66 commented 3 years ago

Can we close this issue since you explained that pulsar behavior is normal regarding topic compaction?

BewareMyPower commented 3 years ago

OK, I think you can close it. It looks like issues can only be closed by author and committers.