apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.14k stars 3.57k forks source link

A scheme for client to realize effective-once #13860

Open 1559924775 opened 2 years ago

1559924775 commented 2 years ago

Motivation

In the scenario where Pulsar publishes data, how does Pulsar ensure that data continues to be sent from the previous location when the client crashed? To achieve effectively-once, the data source must be replayable first. For example, let's say we are reading records from a file and publishing a message for each record we read. If this application crashes and restarts, we want to resume publishing from record next to the last successfully published record before the crash. Pulsar provides a deduplication method based on sequenceid. For this, we could use the record offset in the file as the sequence ID and, through that ID, recover which offset we need to read from after the crash:

ProducerConfiguration producerConf = new ProducerConfiguration(); producerConf.setProducerName("my-producer-name");
Producer producer = client.createProducer(topic, producerConf);
// Fictitious record reader class
RecordReader source = new RecordReader("/my/file/path");
long fileOffset = producer.getLastSequenceId(); if (fileOffset > 0) {
    source.seekToOffset(fileOffset); 
}
while (source.hasNext()) {
        long currentOffset =            source.currentOffset(); 
        Message msg = MessageBuilder.create()
        .setSequenceId(currentOffset)
        .setContent(source.next()).build();     
        producer.send(msg);
}

Now, let's consider the requirement that there are multiple files. We need to read the records in these files and publish them evenly to a multi partition topic. When the program crashes, we need to know which file and record we published last time? Therefore, we need to map these two data attributes to sequenceid. Let's go further. When users need more data attributes to describe the location of data, users need to map more data attributes to sequenceid. We find that there are some difficulties that make us unable to meet this demand. We list the problems as follows:

1.Users need to map multiple data attributes to sequenceid every time they publish, which may be a time-consuming operation, may have performance problems, and it is difficult to map multiple data attributes to a monotonically increasing 64 bit ID

2.Current producer Getlastsequenceid() can only get the maximum sequenceid of successful publishing of each partition. In the case of multi partition and asynchronous publishing, we can't guarantee that the data of a fileoffset is published successfully, and all previous data are published successfully. Suppose that a record of a smaller fileoffset in a partition is not sent successfully, The larger fileoffset in the other partition was sent successfully, and we restarted from producer If the fileoffset corresponding to getlastsequenceid() continues to be published, data will be lost.

Goal

In order to solve the above problem,We propose a solution:publish with progress.The following changes will be made: We abstract a Progress interface for users to implement. For example, in the above example, the Progress class implemented by the user needs to include filePath and fileOffset. Each piece of data published by the user needs to carry a Progress object that identifies the piece of data. The new publishing interface will be as follows: sendSync(T msg, Progress progress). We maintain the mapping relationship between the latest sequenceId and the Progress object that has been successfully released inside the sdk, and regularly check this mapping relationship for use in recovery. In this way, the user only needs to carry the Progress when sending, and when recovering, read the minimum publishing progress of each partition from the checkpoint and continue to send.

API Changes

Let's introduce some concepts first:

Progress:

The Progress interface is implemented by the user. By implementing the fields defined in the class, the user can locate a unique piece of data, and use this to locate the next data to be sent. In order to better understand the Progress interface, we give two examples: For example, the log collection service collects data from multiple log files and sends them to topics. Progress can be implemented as follows:

public class LogProgress implements Progress{
    long timestamp;
    long inode;
    long offset;

    @override
    public int compareTo(Progress progress) {
        LogProgress other = (LogProgress) progress; 
        if (inode = other.inode) {
            if (offset > other.offset) { 
                return 1;
            } else if (offset < other.offset) { 
                return -1;
            }
            return 0;
        } else {
            if (timestamp > other.timestamp) { 
                return 1;
            } else if (timestamp < other.timestamp) {
                return -1;
            }
            return 0;
         }
         return 0;
     }
}

These three fields can uniquely identify the location of a piece of data. After restarting, the information and the corresponding sequenceId can be obtained from the checkpoint (vide infra) and can continue to be sent.

For example, a scenario of streaming Computing: subscribe data from a topic A and publish it to topic B. progress can be realized as follows:

public class SCProgress implements Progress{
    long timestamp;
    MessageId messageId; // subscribed from topic A
    @override
    public int compareTo(Progress progress) {
        SCProgress other = (SCProgress) progress; 
        if (timestamp > other.timestamp) {
            return 1;
        } else if (timestamp < other.timestamp) {
            return -1;
        }
        return messageId.compareTo(progress.messageId);
     }
}

ProgressInfo:

It is not exposed to users. It contains the fields of progress and sequenceid, which are used to associate progress and sequenceid.

Progress persistence:

Start the checkpoint thread in the producer, and periodically save the latest published Progress and sequenceId of each partition through the saveProgressInfo of the ProgressInfoStore interface. By default, we provide a way to persist to files, and users can also implement the ProgressInfoStore interface to customize the persistence method. In addition, users can customize the save method through the binary data of ProgressInfo returned by the producer's saveProgressInfo interface.

Progress recovery:

The progress information is loaded into the memory from the persistent place, including the Progress and sequenceId of the last save of each partition, and then set to the initSequenceId of each partition. The user can implement the ProgressInfoStore interface to customize the loading method, and the user can also directly pass in a binary data through the producer's loadProgress(ProgressInfo progressInfoByte) to restore.

image 1.ProducerConfigurationData Changes Add the configuration item needProgress to identify whether to enable the publishing mode with progress. progressCheckpointIntervalSecondes configures the persistence period of progress information, which will be persisted to the local file by default. Users can implement the custom persistence method of ProgressInfoStore.

private boolean needProgress = false;
private long progressCheckpointIntervalSecondes = 60;

2.Add Progress interface It is implemented by the user adding the required fields, representing the progress information of the user data, and needs to implement the compareTo, deserialize, serialize interfaces.

package org.apache.pulsar.client.api;

import java.io.Serializable;
/**
* Represents the message location, can locate a unique message, must have the characteristics of sequential increase
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Progress extends Comparable, Serializable {

    /**
    * Serialize the progress object
    * @return the binary data obtained by serializing the progress object
    */
    public byte[] serialize();

    /**
    * Deserialize progressByte to progress object
    * @return object obtained by deserializing progressByte
    */
    public Progress deserialize(byte[] progressByte);
}

3.Add ProgressInfo Including two members, Progress and sequenceId, this object is stored when doing checkpoint.

package org.apache.pulsar.client.impl;

import org.apache.pulsar.client.api.Progress;

/**
 * A class that maintains the correspondence between progress and sequenceId
 */
public class ProgressInfo {

    private Progress progress;
    private long sequenceId;

    public Progress getProgress() {
        return progress;
    }

    public void setProgress(Progress progress) {
        this.progress = progress;
    }

    public long getSequenceId() {
        return sequenceId;
    }

    public void setSequenceId(long sequenceId) {
        this.sequenceId = sequenceId;
    }
}

4.Add ProgressManager A class for managing progress-related operations, where lastProgressPublished represents the progress and sequenceId of the successfully published msg, and lastProgressPushed represents the progress and sequenceId of the msg that has been sent but has not yet received an ack.

package org.apache.pulsar.client.impl;

import org.apache.pulsar.client.api.Progress;

public class ProgressManager {

    /**
     * ProgressInfo of the message that has been sent and received an acknowledgment
     */
    public ProgressInfo lastProgressInfoPublished;

    /**
     * ProgressInfo has been sent for messages that have not yet received an acknowledgment
     */
    public ProgressInfo lastProgressInfoPushed;

    public void updateLastProgressInfoPublished(Progress progress, long sequenceId) {
        lastProgressInfoPublished.setProgress(progress);
        lastProgressInfoPublished.setSequenceId(sequenceId);
    }

    public void updateLastProgressInfoPushed(Progress progress, long sequenceId) {
        lastProgressInfoPushed.setProgress(progress);
        lastProgressInfoPushed.setSequenceId(sequenceId);
    }

    public ProgressInfo getLastProgressInfoPushed() {
        return lastProgressInfoPushed;
    }

    public ProgressInfo getLastProgressInfoPublished() {
        return lastProgressInfoPublished;
    }

    /**
     *  recover progressInfo from progressByte
     * @param progressByte the binary data obtained by saveProgress
     * @return progressInfo
     */
    public ProgressInfo loadProgressInfo(byte[] progressByte){
        // recover progress from progressByte
        lastProgressInfoPublished = lastProgressInfoPushed = null;
        return lastProgressInfoPublished;
    }

    /**
     * serialize the progressInfo into binary data
     * @return
     */
    public byte[] saveProgressInfo(){
        return null;
    }

}

5.Add ProgressInfoStore interface. The user implements the persistence method of binary data obtained by calling producer.saveProgress(). We provide the default implementation of ProgressInfoStore, FileProgressInfoStore, which is persisted to local files.

package org.apache.pulsar.client.impl;

/**
 * persistent progressInfo
 */
public interface ProgressInfoStore {

    /**
     * read progressInfo into binary
     * @return
     */
    byte[] readProgressInfo();

    /**
     * persistent progressInfo
     * @param progressInfoByte
     */
    void writeProgressInfo(byte[] progressInfoByte);
}

6.Add ProgressMessageImpl On the basis of messageimpl, add the member variable progress.

public class ProgressMessageImpl<T> implements Message<T> {

    private final Message<T> msg;

    public Progress progress;

    public ProgressMessageImpl(Message<T> msg, Progress progress) {
        this.msg = msg;
        this.progress = progress;
    }
}

7.Producer interface changes.

/**
* Send a message asynchronously.
*
* <p>When the producer queue is full, by default this method will complete the future with an exception
* {@link PulsarClientException.ProducerQueueIsFullError}
*
* <p>See {@link ProducerBuilder#maxPendingMessages(int)} to configure the producer queue size and
* {@link ProducerBuilder#blockIfQueueFull(boolean)} to change the blocking behavior.
*
* <p>Use {@link #newMessage()} to specify more properties than just the value on the message to be sent.
*
* @param message
* a byte array with the payload of the message
* @param progress
* the attribute of the data used to locate the data
* @return a future that can be used to track when the message will have been safely persisted
*/
CompletableFuture<MessageId> sendAsync(T msg, Progress progress);

/**
* recover the progressInfo of each partition from progressByte, and return the minimum progress
* @param progressByte
* progress information of each partition,this information can be generated by calling saveprogress
* @return the minimum progress
*/
Progress loadProgressInfo(byte[] progressByte);

/**
* serialize the progressInfo of each partition into binary data
* @return binary data composed of the progress and sequenceId of each partition
*/
byte[] saveProgressInfo();

/**
* recover the progressInfo of each partition from progressInfoStore, and return the minimum progress
* @param progressByte
* progress information of each partition,this information can be generated by calling saveprogress
* @return the minimum progress
*/
Progress loadProgressInfo();

/**
* store the progressInfo of each partition into progressStore
* @return binary data composed of the progress and sequenceId of each partition
*/
void saveProgressInfo();

/**
* compare the progress of each partition to return the minimum progress
* @return the minimum progress
*/
Progress getMinProgress();

8.PartitionedProducerImpl and ProducerImpl changs New method that implements the Producer interface. ProducerImpl adds ProgressManager members. PartitionedProducerImpl will traverse and call the methods of each partition producer to implement its own loadProgress, saveProgress and getMinProgress, and they will eventually call the corresponding methods of ProgressManager. ProducerBase:

@Override
public CompletableFuture<MessageId> sendAsync(T message, Progress progress) {
    if (!conf.isNeedProgress()) {
        // exception
    }
    try {
        return newMessage().value(message).progress(progress).sendAsync();
    } catch (SchemaSerializationException e) {
        return FutureUtil.failedFuture(e);
    }
}

@Override
public Progress loadProgressInfo() {
    byte[] progressInfo = progressInfoStore.readProgressInfo();
    return this.loadProgress(progressInfo);
}

@Override
public void saveProgressInfo() {
    byte[] progressInfo = this.saveProgress();
    progressInfoStore.writeProgressInfo(progressInfo);
}

PartitionedProducerImpl:

@Override
public void loadProgressInfo(byte[] progressByte) {
    if (!conf.isNeedProgress()) {
        return;
    }
    producers.values().stream().forEach(producer -> {
        byte[] partitionProgressBuf = null;
        // split progressBuf
        producer.loadProgress(partitionProgressBuf);});
}

@Override
public byte[] saveProgressInfo() {
    if (!conf.isNeedProgress()) {
        return null;
    }
    byte[] progressByte = null;
    producers.values().stream().forEach(
            producer -> {
                byte[] partitionProgressBuf = producer.saveProgress();
                // assembling progressBuf
            }
    );
    return progressBuf;
}

@Override
public Progress getMinProgress() {
    Progress minProgress = null;
    for (ProducerImpl<T> producer : producers.values()) {
        Progress progress = producer.queryMinProgress();
        if (minProgress == null) {
            minProgress = progress;
        } else {
            if (minProgress.compare(progress) < 0) {
                minProgress = progress;
            }
        }
    }
    return null;
}

ProducerImpl:

private ProgressManager progressManager;

@Override
public void loadProgressInfo(byte[] progressByte) {
    if (conf.isNeedProgress() && progressManager != null) {
       ProgressInfo progressInfo = progressManager.loadProgress(progressBuf);
        this.lastSequenceIdPublished = progressInfo.getSequenceId();
        this.lastSequenceIdPushed = progressInfo.getSequenceId();
        this.msgIdGenerator = progressInfo.getSequenceId() + 1L;
    }
}

@Override
public byte[] saveProgressInfo() {
    if (conf.isNeedProgress() && progressManager != null) {
        return progressManager.saveProgress();
    }
    return null;
}

@Override
public Progress getMinProgress() {
    ProgressInfo persistentProgressInfos = progressManager.getPersistentProgressInfos();
    if (persistentProgressInfos != null) {
        return persistentProgressInfos.getProgress();
    }
    return null;
}

9.TypedMessageBuilderImpl and TypedMessageBuilder interface changes Add progress members. Modify getMessage and return the ProgressMessageImpl object if publishing with progress is configured. TypedMessageBuilder:

/**
 * Sets the progress of the message.
 * This progress can locate the message.
 * @param progress the progress of the message
 * @return the message builder instance
 */
TypedMessageBuilder<T> progress(Progress progress);

TypedMessageBuilderImpl:

public Message<T> getMessage() {
    beforeSend();
    MessageImpl<T> messageImpl = MessageImpl.create(msgMetadata, content, schema, producer != null ? producer.getTopic() : null);
    if (progress != null) {
        return new ProgressMessageImpl<>(messageImpl, progress);
    }
    return messageImpl;
}

10.Add ProgressSendCallback Add progress members based on SendCallback

package org.apache.pulsar.client.impl;

import org.apache.pulsar.client.api.Progress;

/**
 * SendCallback with progress
 */
public interface ProgressSendCallback extends SendCallback{

    /**
     * Set the progress of the message corresponding to this callback
     * @param progress
     */
    public void setProgress(Progress progress);

    /**
     * return the progress of the message corresponding to this callback
     * @return the progress of the message corresponding to this callback
     */
    public Progress getProgress();

}

11.OpSendMsg class changes Add the lastCallback field, so that it is convenient to get the sendCallback of the last data in the entry when the ack is received.

protected static final class OpSendMsg {
    SendCallback lastCallback;
    static OpSendMsg create(List<MessageImpl<?>> msgs, ByteBufPair cmd, long sequenceId,
                        SendCallback callback, SendCallback lastCallback) {
        OpSendMsg op = RECYCLER.get();
        op.msgs = msgs;
        op.cmd = cmd;
        op.callback = callback;
        op.sequenceId = sequenceId;
        op.createdAt = System.nanoTime();
        op.uncompressedSize = 0;
        op.lastCallback = lastCallback;
        for (int i = 0; i < msgs.size(); i++) {
            op.uncompressedSize += msgs.get(i).getUncompressedSize();
        }
        return op;
    }

    static OpSendMsg create(List<MessageImpl<?>> msgs, ByteBufPair cmd, long lowestSequenceId,
                            long highestSequenceId,  SendCallback callback, SendCallback lastCallback) {
        OpSendMsg op = RECYCLER.get();
        op.msgs = msgs;
        op.cmd = cmd;
        op.callback = callback;
        op.sequenceId = lowestSequenceId;
        op.highestSequenceId = highestSequenceId;
        op.createdAt = System.nanoTime();
        op.uncompressedSize = 0;
        op.lastCallback = lastCallback;
        for (int i = 0; i < msgs.size(); i++) {
            op.uncompressedSize += msgs.get(i).getUncompressedSize();
        }
        return op;
    }
}

12.Add ProgressPartitionMessageRouterImpl Since each partition maintains the distribution progress independently, it is necessary to ensure that the same data can be sent to the same partition every time.

package org.apache.pulsar.client.impl;

import org.apache.pulsar.client.api.HashingScheme;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Progress;
import org.apache.pulsar.client.api.TopicMetadata;

import static org.apache.pulsar.client.util.MathUtils.signSafeMod;

public class ProgressPartitionMessageRouterImpl extends MessageRouterBase {
    private static final long serialVersionUID = 1L;

    public ProgressPartitionMessageRouterImpl(HashingScheme hashingScheme) {
        super(hashingScheme);
    }

    @Override
    public int choosePartition(Message<?> msg, TopicMetadata topicMetadata) {
        // If the message has a key, it supersedes the round robin routing policy
        if (msg.hasKey()) {
            return signSafeMod(hash.makeHash(msg.getKey()), topicMetadata.numPartitions());
        }
        if (!(msg instanceof ProgressMessageImpl)) {
            // exeception
        }
        ProgressMessageImpl progressMessage = (ProgressMessageImpl) msg;
        Progress progress = progressMessage.getProgress();
        return signSafeMod(progress.hashCode(), topicMetadata.numPartitions());
    }
}

Implementation

image

In order to let us understand its principle and implementation details more clearly, let's look at a specific example, which includes the process of publishing, client crashing, and restarting to continue publishing.

image

  1. As shown in the figure above.The left side of the figure is the user's data, assuming that its progress has only one field. a, b, c, and d represent the value of the progress, and the right side is the two partitions of the topic. image
  2. Now we assume that the number of packets is 2 and start sending data. image
  3. The data is sent successfully, we update the progress of each partition, for example, the partition1 is (progress:b, sequenceId:2).Assuming that checkpoint is triggered at this time, the persistent progress information of each partition is partition1:(progress:b, sequenceId:2), partition2:(progress:d, sequenceId:2). image
  4. Several pieces of data msg5~msg8 were released. image
  5. The client crashed. We noticed that the client's persistent progress information at this time is partition1:(progress:b, sequenceId:2), partition2:(progress:d, sequenceId:2). The lastSequenceId of both partitions of the broker is 4 image
  6. The client executes loadProgress to recover the progress information from the checkpoint and loads it into memory. Obtain the minimum publishing progress b at this time, and find the last published data according to the progress b and resend it. image
  7. Continue to publish msg3, and msg3 is routed to partition2. Since the progress c of msg3 is not greater than the progress d of partition2, it is deduplicated. image
  8. Continue to publish msg5 and msg6, their progress is greater than the progress b of partition1, and will not be deduplicated by the client, but because their sequenceId is not greater than 4 in the broker, they are deduplicated by the broker. image
  9. Continue to publish msg7 and msg8. Their progress is greater than the progress d of partition2 and will not be deduplicated by the client, but because their sequenceId is not greater than 4 in the broker, they will be deduplicated by the broker. image
  10. Continue to publish msg9 and msg10, the comparison progress and sequenceId will not be deduplicated, the publish is successful, and the progress information will be updated

Demo

PulsarClient client = PulsarClient.builder()
    .serviceUrl(url)
    .authentication(AuthenticationFactory.token("eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJhZG1pbiJ9..."))
    .build();
Producer<byte[]> producer = client.newProducer()
    .topic(topic)
    .producerName("test")
    .create();
// Recovery progressInfo to memory from progress store and get the current minimum progress
Progress minProgress = producer.loadProgressInfo();
// Get the next message to be sent and the corresponding progress according to the minProgress
// ...
// progressAfterMinProgress1 is the next send position of minProgress
producer.send(msg1, progressAfterMinProgress1);
producer.send(msg2, progressAfterMinProgress2);
// ...

If you need pdf. here: A.plan.for.client.to.realize.effective-once.pdf

michaeljmarshall commented 2 years ago

@1559924775 - are you able to make this plain markdown? It'd be easier to review, searchable within github, and wouldn't require downloading a pdf. Thanks.

1559924775 commented 2 years ago

@michaeljmarshall OK.

github-actions[bot] commented 2 years ago

The issue had no activity for 30 days, mark with Stale label.

github-actions[bot] commented 2 years ago

The issue had no activity for 30 days, mark with Stale label.