In the current compact topic implementation, there are no time or capacity-based retention policies on the compacted ledger. This means that unless a user intervenes manually, the cardinality of the compacted topic can only increase over time. This makes it challenging to manage compact topic resource usage as the cluster has no automatic mechanism to do so. It also makes compacted topics unsuitable for a number of use cases such as windowing, limited retention FIFO cache, etc.
The scope of this proposal is to add a user-configurable, automatic retention capability to the compacted ledger component of compact topics. Once configured, the brokers would be responsible for expunging messages from the compacted ledger that violate the user-declared retention policy.
To adhere to the principle of least surprise, the behaviour of the retention mechanism should wherever possible, be identical to that of a regular topic (or indeed the uncompacted ledger of the compact topic).
For this discussion, we will use the term ‘marked message’ to refer to a message that a retention policy would target for deletion. In practice, this means that the message is older than the retention interval (RetentionPolicies.sizeInMins) or is not in the last area of storage of a size equal to the retention size (RetentionPolicies.sizeInMB).
Admin client
PulsarAdmin admin = ...;
admin.topicPolicies().setCompactedRetention(
“topic-name”,
new RetentionPolicies(sizeInMins, sizeInMB)
);
Compactor
Uncompacted topic data comprises multiple ledgers. These ledgers are closed and new ones are created as messages are produced into the topic. Ledgers are immutable and it is not possible to remove marked messages in an existing ledger. However, we can eventually free storage resources when all messages in the ledger have been marked by the retention policy. Therefore we release storage on a per-ledger basis.
Compacted topic data comprises a single ledger, created by the compactor process when compaction begins and then closed when it ends. While a retention policy can identify marked messages, we cannot release storage in a similar incremental manner to uncompacted data.
Instead the compactor (org.apache.pulsar.compaction.TwoPhaseCompactor) can exclude marked messages as it writes a compacted ledger. This will allow for a retention window to be applied on the new ledger, and the old ledger will be deleted thus freeing resources. An implication of this is that retention enforcement is wholly dependent on compactor execution.
Broker
The API residing in org.apache.pulsar.broker.admin will need to be extended to manage query/persist actions on Topic and Namespace compact retention policies.
Metadata persistence
In the case of topic-scoped compaction retention policies, we must add additional metadata entities/attributes to org.apache.pulsar.broker.service.TopicPoliciesService and associate classes so that the compact retention policy can be persisted and queries. We must perform a similar set of changes to org.apache.pulsar.broker.resources.NamespaceResources for the namespace-scoped compaction retention policies.
Documentation
The following documentation artefacts will need to be updated:
Users can iterate over messages manually and then send tombstones based on observed message timestamps and/or cumulative message size. These messages will then be removed from the next compacted ledger during the next compaction.
Topic compaction implemented as a retention policy using the existing ledger set.
Anything else?
There are two configurations that currently influence the logical compacted event log: the topic’s retention policy and the compactor’s time of execution. There exists therefore some subtle interactions in the regions where these configurations overlap. What if a recent key update is removed from the uncompacted ledgers by a retention policy before the update is merged into the compacted ledger by a compaction event? If this behaviour is not desirable, it is clearly necessary to run compaction at an interval smaller than that of any possible retention window. This change introduces a third influence: the compacted ledger’s retention policy, and this will make the overall log behaviour harder to reason about in these small, overlapping regions.
Motivation
In the current compact topic implementation, there are no time or capacity-based retention policies on the compacted ledger. This means that unless a user intervenes manually, the cardinality of the compacted topic can only increase over time. This makes it challenging to manage compact topic resource usage as the cluster has no automatic mechanism to do so. It also makes compacted topics unsuitable for a number of use cases such as windowing, limited retention FIFO cache, etc.
It should be noted that compact topics with retention are supported by Kafka (
cleanup.policy = [delete, compact]
), and are heavily used within KStreams to create look-up tables with short-lived records. Reference: KIP-71: Enable log compaction and deletion to co-existGoal
The scope of this proposal is to add a user-configurable, automatic retention capability to the compacted ledger component of compact topics. Once configured, the brokers would be responsible for expunging messages from the compacted ledger that violate the user-declared retention policy.
To adhere to the principle of least surprise, the behaviour of the retention mechanism should wherever possible, be identical to that of a regular topic (or indeed the uncompacted ledger of the compact topic).
API Changes
org.apache.pulsar.client.admin.NamespacePolicies
RetentionPolicies getCompactedRetention(String namespace);
RetentionPolicies getCompactedRetention(String namespace, boolean applied);
CompletableFuture<RetentionPolicies> getCompactedRetentionAsync(namespace topic);
CompletableFuture<RetentionPolicies> getCompactedRetentionAsync(namespace topic, boolean applied);
RetentionPolicies removeCompactedRetention(String namespace);
CompletableFuture<RetentionPolicies> removeCompactedRetentionAsync(String namespace);
void removeCompactedRetention(String namespace, RetentionPolicies policies);
CompletableFuture<void> removeCompactedRetentionAsync(String namespace, RetentionPolicies policies);
org.apache.pulsar.client.admin.TopicPolicies
RetentionPolicies getCompactedRetention(String topic);
RetentionPolicies getCompactedRetention(String topic, boolean applied);
CompletableFuture<RetentionPolicies> getCompactedRetentionAsync(String topic);
CompletableFuture<RetentionPolicies> getCompactedRetentionAsync(String topic, boolean applied);
RetentionPolicies removeCompactedRetention(String topic);
CompletableFuture<RetentionPolicies> removeCompactedRetentionAsync(String topic);
void removeCompactedRetention(String topic, RetentionPolicies policies);
CompletableFuture<void> removeCompactedRetentionAsync(String topic, RetentionPolicies policies);
Implementation
Terminology
For this discussion, we will use the term ‘marked message’ to refer to a message that a retention policy would target for deletion. In practice, this means that the message is older than the retention interval (
RetentionPolicies.sizeInMins
) or is not in the last area of storage of a size equal to the retention size (RetentionPolicies.sizeInMB
).Admin client
Compactor
Uncompacted topic data comprises multiple ledgers. These ledgers are closed and new ones are created as messages are produced into the topic. Ledgers are immutable and it is not possible to remove marked messages in an existing ledger. However, we can eventually free storage resources when all messages in the ledger have been marked by the retention policy. Therefore we release storage on a per-ledger basis.
Compacted topic data comprises a single ledger, created by the compactor process when compaction begins and then closed when it ends. While a retention policy can identify marked messages, we cannot release storage in a similar incremental manner to uncompacted data.
Instead the compactor (
org.apache.pulsar.compaction.TwoPhaseCompactor
) can exclude marked messages as it writes a compacted ledger. This will allow for a retention window to be applied on the new ledger, and the old ledger will be deleted thus freeing resources. An implication of this is that retention enforcement is wholly dependent on compactor execution.Broker
The API residing in
org.apache.pulsar.broker.admin
will need to be extended to manage query/persist actions on Topic and Namespace compact retention policies.Metadata persistence
In the case of topic-scoped compaction retention policies, we must add additional metadata entities/attributes to
org.apache.pulsar.broker.service.TopicPoliciesService
and associate classes so that the compact retention policy can be persisted and queries. We must perform a similar set of changes toorg.apache.pulsar.broker.resources.NamespaceResources
for the namespace-scoped compaction retention policies.Documentation
The following documentation artefacts will need to be updated:
Alternatives
Anything else?
There are two configurations that currently influence the logical compacted event log: the topic’s retention policy and the compactor’s time of execution. There exists therefore some subtle interactions in the regions where these configurations overlap. What if a recent key update is removed from the uncompacted ledgers by a retention policy before the update is merged into the compacted ledger by a compaction event? If this behaviour is not desirable, it is clearly necessary to run compaction at an interval smaller than that of any possible retention window. This change introduces a third influence: the compacted ledger’s retention policy, and this will make the overall log behaviour harder to reason about in these small, overlapping regions.