apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.15k stars 3.57k forks source link

[PIP-?] Optimization of massive delayed message scenarios #15500

Open JunFu0814 opened 2 years ago

JunFu0814 commented 2 years ago

Motivation

Business scene

Delayed messages are a common scenario in the Message business system. For example, group-opening reminders in group buying activities, automatic deductions when continuous monthly subscriptions expire, and coupon expiration reminders can all be implemented with delayed messages.

Current Mode

Currently Pulsar implements arbitrarily delayed messages based on in-memory time rounds. The Producer side can send a mixture of ordinary messages and delayed messages to topics and persist them to Bookkeeper. Now delayed messages can only support shard mode subscriptions,each subscription group will identify delayed messages and add them to the time wheel. The default time granularity of the time wheel is 1 second. The time wheel stores the delayed timestamp, ledgerId and EntryId related index information. When the time arrives, the message will be read and sent to the Consumer according to the ledgerId and EntryId.

Current Problems

  1. The size of the memory is limited. A message needs 24B to store 3 long fields. Assuming that on a Broker, 10 million delayed messages require 228M of off-heap memory.
  2. The deletion cycle of Ledger is longer. When delayed messages and ordinary messages are stored in the same Ledger, if they are stored in a delayed message with a large time span, it will affect the deletion of the Ledger, because the messages that are not consumed will not be cleared, and the memory will be occupied for a long time, such as after 1 month. It's time to spend.
  3. Delayed message recovery scenarios are complicated. If a topic has a large number of delayed messages, when the topic is transferred or the broker goes down, a large number of delayed messages need to be re-indexed.

Goat

Optimization ideas

Reduce the delay message magnitude for the current build time round. We can use the delay class to divide the overall delayed messages into time ranges, so that we only need to care about the most recent delayed messages. For example, a message with a relatively large time span is stored in a common topic (delay level topic), and then a certain mechanism is used to ensure that the message that is about to expire is placed in the time wheel over time, so that to a certain extent, the message can be greatly improved. Reduce the magnitude of delayed messages on the time wheel, thereby mitigating the impact of the problems described above.

image

Implementation details

  1. When the Producer sends a message, it will calculate which partition of the delay level topic the message needs to be sent to based on the delay time, where the delay level topic is an internal partition topic. For example, if the delay span of a business is 1 year, you can create a 364-day partition Topic. Partion-0 stores delayed messages with time >= 1 day and < 2 days, and partion-363 stores time >= 364 days and < Delayed messages for 365 days, if the delay time is less than 1 day, will be sent directly to the real business topic, and the time wheel will run normally. For the partition division rule of the delay level topic, we can determine it according to the policy and the maximum delay time passed in from the Producer.
  2. A set of timing tasks are required to synchronize the messages in the delay level topic to the business topic. The synchronization process can be understood as a complete consumption and production process, and there will be message persistence in this process. Usually, this capability can be maintained by the Broker. In order to prevent the Broker from being overloaded, the ability of Functions can be used to achieve delayed message synchronization.

Questions

  1. The delay level topic is automatically created by the client, and the data retention mechanism needs to be considered.
  2. Delay level Topic life cycle management. When the main service topic is deleted, does the delay level topic need to be deleted synchronously? How to deal with delayed messages that have not been delivered at this time?
  3. The user specifies an unreasonable policy and maximum delay time on the client side, which may lead to the existence of too many partitions in the delay level topic.
  4. Is there a scenario of dynamically expanding the delay level of the Topic's partition?

Proposed Changes

Client API


    //for delay level topic
    private boolean delayLevelTopicEnabled = false;

    private String delayLevelTopicName = null;

    private DelayLevelTopicPartitionStrategy partitionStrategy = new DelayLevelTopicPartitionStrategy();

public class DelayLevelTopicPartitionStrategy {

    private long messageMaximumDelayTimeSeconds = 60 * 60 * 24 * 30;

    private PartitionBy partitionBy = PartitionBy.DAY;

    public enum PartitionBy {
        YEAR,MONTH,DAY,HOUR
    }
}

    /**
     * This config determines whether to delay the level of topic when delaying message delivery.
     * Turn on this option to get better performance in scenarios with a large number of delayed messages and long delays.
     * Not turned on by default.
     *
     * @param delayLevelTopicEnabled
     * @return the producer builder instance
     */
    ProducerBuilder<T> enableDelayLevelTopic(boolean delayLevelTopicEnabled);

    /**
     * The name of the delay level topic can be specified.
     * If not specified, it will be generated based on the current topic name.
     *
     * @param delayLevelTopicName the name of the delay level topic
     * @return the producer builder instance
     */
    ProducerBuilder<T> delayLevelTopic(String delayLevelTopicName);

    /**
     * Partition strategy for delay level topic.
     * We will decide how many partitions this topic will be divided into according to the
     * messageMaximumDelayTimeSeconds and partitionBy.
     *
     * @param strategy the strategy of delay level topic partitioned by
     * @return the producer builder instance
     */
    ProducerBuilder<T> delayLevelTopicPartitionStrategy(DelayLevelTopicPartitionStrategy strategy);
    ProducerBuilder<byte[]> builder = pulsarClient.newProducer();
    builder.enableDelayLevelTopic(true).
            delayLevelTopic("persistent://public/default/delay-level-topic-test").
            delayLevelTopicPartitionStrategy(new DelayLevelTopicPartitionStrategy(60 * 60 * 24 * 30, DelayLevelTopicPartitionSt
rategy.PartitionBy.DAY));

Broker

TODO

Functions

TODO : Timing synchronization level delay message in topic to business topic

hpvd commented 2 years ago

there is already another PIP with this PIP-number: PIP-156: Build and Run Pulsar Server on Java 17 https://github.com/apache/pulsar/issues/15207

please see https://github.com/apache/pulsar/wiki/Pulsar-Improvement-Proposal-%28PIP%29

hpvd commented 2 years ago

sorry for bothering again, just upcounting 1 doesn't do the trick: There is already PIP 157, see PIP-157: Bucketing topic metadata to allow more topics per namespace https://github.com/apache/pulsar/issues/15254 @lhotari could you help?

github-actions[bot] commented 2 years ago

The issue had no activity for 30 days, mark with Stale label.

netudima commented 1 year ago

it looks like addressed by https://github.com/apache/pulsar/issues/16763