streamnative / kop

Kafka-on-Pulsar - A protocol handler that brings native Kafka protocol to Apache Pulsar
https://streamnative.io/docs/kop
Apache License 2.0
452 stars 137 forks source link

[BUG] Topic cannot be deleted after producing transactional messages #1387

Open BewareMyPower opened 2 years ago

BewareMyPower commented 2 years ago

Describe the bug

To Reproduce

KoP configs:

messagingProtocols=kafka
protocolHandlerDirectory=./protocols
allowAutoTopicCreationType=partitioned
kafkaListeners=PLAINTEXT://127.0.0.1:9092
brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
brokerDeleteInactiveTopicsEnabled=false
brokerDeleteInactivePartitionedTopicMetadataEnabled=true
brokerDeduplicationEnabled=true
kafkaTransactionCoordinatorEnabled=true
brokerDeleteInactiveTopicsMode=delete_when_subscriptions_caught_up

First, create the partitioned topic via pulsar-admin:

./bin/pulsar-admin topics create-partitioned-topic my-topic-5 -p 1

Then, run the following Kafka application:

        final var bootstrapServers = "localhost:9092";
        final var topic = "my-topic-5";
        final var props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "X0");
        @Cleanup final var producer = new KafkaProducer<String, String>(props);
        producer.initTransactions();
        producer.beginTransaction();
        producer.send(new ProducerRecord<>(topic, 0, null, "hello"));
        producer.commitTransaction();

Then, try to delete the topic:

$ ./bin/pulsar-admin topics delete-partitioned-topic my-topic-5     
2022-07-06T16:45:48,350+0800 [AsyncHttpClient-7-1] WARN  org.apache.pulsar.client.admin.internal.BaseResource - [http://localhost:8080/admin/v2/persistent/public/default/my-topic-5/partitions?force=false&deleteSchema=false] Failed to perform http delete request: javax.ws.rs.ClientErrorException: HTTP 412 Precondition Failed
Topic has active producers/subscriptions

Reason: Topic has active producers/subscriptions

Expected behavior The topic should be deleted.

Additional context

Here are the essential broker logs:

2022-07-06T16:45:48,317+0800 [pulsar-web-47-2] ERROR org.apache.pulsar.broker.admin.impl.PersistentTopicsBase - [null] Failed to delete topic persistent://public/default/my-topic-5-partition-0
org.apache.pulsar.broker.service.BrokerServiceException$TopicBusyException: Topic has 1 connected producers/consumers
    at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$delete$32(PersistentTopic.java:1205) ~[org.apache.pulsar-pulsar-broker-2.10.0.jar:2.10.0]
    at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) ~[?:1.8.0_332]
    at java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683) ~[?:1.8.0_332]
    at java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010) ~[?:1.8.0_332]
    at org.apache.pulsar.broker.service.persistent.PersistentTopic.delete(PersistentTopic.java:1142) ~[org.apache.pulsar-pulsar-broker-2.10.0.jar:2.10.0]
    at org.apache.pulsar.broker.service.persistent.PersistentTopic.delete(PersistentTopic.java:1074) ~[org.apache.pulsar-pulsar-broker-2.10.0.jar:2.10.0]
    at org.apache.pulsar.broker.service.persistent.PersistentTopic.delete(PersistentTopic.java:1069) ~[org.apache.pulsar-pulsar-broker-2.10.0.jar:2.10.0]
    at org.apache.pulsar.broker.service.BrokerService.deleteTopic(BrokerService.java:1037) ~[org.apache.pulsar-pulsar-broker-2.10.0.jar:2.10.0]
    at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.internalDeleteTopic(PersistentTopicsBase.java:1020) ~[org.apache.pulsar-pulsar-broker-2.10.0.jar:2.10.0]
    at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.internalDeleteTopic(PersistentTopicsBase.java:1011) ~[org.apache.pulsar-pulsar-broker-2.10.0.jar:2.10.0]
    at org.apache.pulsar.broker.admin.v2.PersistentTopics.deleteTopic(PersistentTopics.java:961) ~[org.apache.pulsar-pulsar-broker-2.10.0.jar:2.10.0]

Here is the internal stats of my-topic-5:

$ ./bin/pulsar-admin topics partitioned-stats-internal my-topic-5
{
  "metadata" : {
    "partitions" : 1
  },
  "partitions" : {
    "persistent://public/default/my-topic-5-partition-0" : {
      "entriesAddedCounter" : 2,
      "numberOfEntries" : 2,
      "totalSize" : 156,
      "currentLedgerEntries" : 2,
      "currentLedgerSize" : 156,
      "lastLedgerCreatedTimestamp" : "2022-07-06T16:44:42.352+08:00",
      "waitingCursorsCount" : 0,
      "pendingAddEntriesCount" : 0,
      "lastConfirmedEntry" : "236:1",
      "state" : "LedgerOpened",
      "ledgers" : [ {
        "ledgerId" : 236,
        "entries" : 0,
        "size" : 0,
        "offloaded" : false,
        "underReplicated" : false
      } ],
      "cursors" : {
        "pulsar.dedup" : {
          "markDeletePosition" : "236:-1",
          "readPosition" : "236:0",
          "waitingReadOp" : false,
          "pendingReadOps" : 0,
          "messagesConsumedCounter" : 0,
          "cursorLedger" : 237,
          "cursorLedgerLastEntry" : 0,
          "individuallyDeletedMessages" : "[]",
          "lastLedgerSwitchTimestamp" : "2022-07-06T16:44:42.363+08:00",
          "state" : "Open",
          "numberOfEntriesSinceFirstNotAckedMessage" : 1,
          "totalNonContiguousDeletedMessagesRange" : 0,
          "subscriptionHavePendingRead" : false,
          "subscriptionHavePendingReplayRead" : false,
          "properties" : { }
        }
      },
      "schemaLedgers" : [ ],
      "compactedLedger" : {
        "ledgerId" : -1,
        "entries" : -1,
        "size" : -1,
        "offloaded" : false,
        "underReplicated" : false
      }
    }
  }
}
BewareMyPower commented 2 years ago

Using an idempotent producer works. It looks like this bug is not related to Pulsar.

        final var bootstrapServers = "localhost:9092";
        final var topic = "my-topic-4";
        final var props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        @Cleanup final var producer = new KafkaProducer<String, String>(props);
        producer.send(new ProducerRecord<>(topic, 0, null, "hello"));

The stats is similar (no producer, only 1 pulsar.dedup non-durable cursor), but the topic can be deleted.

$ ./bin/pulsar-admin topics partitioned-stats-internal my-topic-4   
{
  "metadata" : {
    "partitions" : 1
  },
  "partitions" : {
    "persistent://public/default/my-topic-4-partition-0" : {
      "entriesAddedCounter" : 1,
      "numberOfEntries" : 1,
      "totalSize" : 51,
      "currentLedgerEntries" : 1,
      "currentLedgerSize" : 51,
      "lastLedgerCreatedTimestamp" : "2022-07-06T16:35:10.711+08:00",
      "waitingCursorsCount" : 0,
      "pendingAddEntriesCount" : 0,
      "lastConfirmedEntry" : "234:0",
      "state" : "LedgerOpened",
      "ledgers" : [ {
        "ledgerId" : 234,
        "entries" : 0,
        "size" : 0,
        "offloaded" : false,
        "underReplicated" : false
      } ],
      "cursors" : {
        "pulsar.dedup" : {
          "markDeletePosition" : "234:-1",
          "readPosition" : "234:0",
          "waitingReadOp" : false,
          "pendingReadOps" : 0,
          "messagesConsumedCounter" : 0,
          "cursorLedger" : 235,
          "cursorLedgerLastEntry" : 0,
          "individuallyDeletedMessages" : "[]",
          "lastLedgerSwitchTimestamp" : "2022-07-06T16:35:10.724+08:00",
          "state" : "Open",
          "numberOfEntriesSinceFirstNotAckedMessage" : 1,
          "totalNonContiguousDeletedMessagesRange" : 0,
          "subscriptionHavePendingRead" : false,
          "subscriptionHavePendingReplayRead" : false,
          "properties" : { }
        }
      },
      "schemaLedgers" : [ ],
      "compactedLedger" : {
        "ledgerId" : -1,
        "entries" : -1,
        "size" : -1,
        "offloaded" : false,
        "underReplicated" : false
      }
    }
  }
}
$ ./bin/pulsar-admin topics delete-partitioned-topic my-topic-4  
zhouxy0809 commented 2 years ago

you could be delete topic with --force options when topics has connected producers or consumers.

BewareMyPower commented 2 years ago

Yeah, but this issue is more about why the topic cannot be deleted. It's okay to delete the topic with -f option, but if there were some producers, the topic could be created automatically and might lead to some unexpected results.

BTW, the cause of this issue is that there's a InternalProducer object, which extends the broker's Producer class, is writing the transactional marker when the topic is deleted.

zhouxy0809 commented 2 years ago

Thanks for your prompt reply and helpful suggestion,has any commits to fixed this issue up to now?

BewareMyPower commented 2 years ago

See https://github.com/streamnative/kop/pull/1388. Actually, the topic could be deleted after waiting for a while. Because at the time producer.commitTransaction() returns, the related transaction marker might not be written. After the marker being written successfully, the topic could be deleted.