streamnative / pulsar-archived

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org
Apache License 2.0
73 stars 25 forks source link

ISSUE-13490: PIP-137: Pulsar Client Shared State API #3480

Open sijie opened 2 years ago

sijie commented 2 years ago

Original Issue: apache/pulsar#13490


Motivation

Sometimes in a distributed application or library that already uses Pulsar you need to some "state" across several instances of the application, for example:

Such cases are also very frequent while developing Pulsar IO Connectors or Pulsar Broker Protocol Handlers.

Currently you end up in adding some additional component to the application, like a Database, or in using the internal ZooKeeper or BookKeeper/Distributed Log components supporting Pulsar. This is usually awkward both for the developers and for system administrators.

We can provide a built-in mechanism in the Pulsar client API to support building such shared data structures.

In fact since Pulsar 2.8.0 we have the Exclusive Producer, that allows you to use Pulsar as a consistent write-ahead-log for replicated state machines.

We can provide an API to handle a shared distributed Java Object: each client can access the Object and mutate the State, ensuring consistency.

This is a sample implementation: https://github.com/eolivelli/pulsar-shared-state-manager

Goal

It is not a goal to implement a Pulsar backed Database system

API Changes

public interface SharedStateManager<V, O> extends AutoCloseable {

    /**
     * Read from the current state.
     * @param reader a function that accesses current state and returns a value
     * @param latest ensure that the value is the latest
     * @return an handle to the result of the operation
     */
    <K> CompletableFuture<K> read(Function<V, K> reader, boolean latest);

    /*
     * Execute a mutation on the state.
     * The operationsGenerator generates a list of mutations to be
     * written to the log, the operationApplier function
     * is executed to mutate the state after each successful write
     * to the log. Finally the reader function can read from
     * the current status before releasing the write lock.
     * @param operationsGenerator generates a list of mutations
     * @param operationApplier apply each mutation to the current state
     * @param reader read from the status while inside the write lock
     * @param <K> the returned data type
     * @param <O> the operation type
     * @return a handle to the completion of the operation
     */
    <K> CompletableFuture<K> write(Function<V, List<O>> operationsGenerator,
                                     Function<V, K> reader);

    @Override            
    void close();                            

    interface SharedStateManagerBuilder {

        <O> SharedStateManagerBuilder withOpSerializer(Function<O, byte[]> opSerializer);
        <O> SharedStateManagerBuilder withOpDeserializer(Function<byte[], O> opDeserializer);
        <V> SharedStateManagerBuilder withDatabaseInitializer(Supplier<V> databaseInitializer);
        <V, O> SharedStateManagerBuilder withChangeLogApplier(BiConsumer<V, O> changeLogApplier);
        <V, O> SharedStateManagerBuilder<V, O> build();
    }

}

PulsarMap recipe, interface:

public interface PulsarMap<K,V> extends AutoCloseable {

    /**
     * Get the value associated to a Key.
     * @param key the key
     * @param latest ensure that the value is the latest
     * @return an handle to the result of the operation
     */
    default CompletableFuture<V> get(K key, boolean latest) {
        return getOrDefault(key, null, latest);
    }

    /**
     * Get the value associated to a Key.
     * @param key the key
     * @param defaultValue a value in case that the key is not bound to any value
     * @param latest ensure that the value is the latest
     * @return an handle to the result of the operation
     */
    CompletableFuture<V> getOrDefault(K key, V defaultValue, boolean latest);

    /**
     * Scan the database
     * @param filter a filter on the key
     * @param processor the function to process the data
     * @param latest ensure that the value observed is the latest
     * @return an handle to the result of the operation
     */
    CompletableFuture<?> scan(Function<K, Boolean> filter, BiConsumer<K, V> processor, boolean latest);

    /**
     * Update a binding, the operation may be executed multiple times, until the operation succeeds.
     * If the operation returns null the  value will be removed
     * @param key the key
     * @param operation a function that modifies the value
     * @return an handle to the completion of the operation
     */
    CompletableFuture<V> update(K key, BiFunction<K, V, V> operation);

    /**
     * Update multiple bindings, the operation may be executed multiple times, until the operation succeeds.
     * For each key the operation returns null the value will be removed
     * @param filter a filter to skip processing some keys and reduce the usage of resources
     * @param operation a function that modifies the value
     * @return a handle to the completion of the operation
     */
    CompletableFuture<?> updateMultiple(Function<K, Boolean> filter, BiFunction<K, V, V> operation);

    /**
     * Delete all bindings.
     * @return a handle to the completion of the operation
     */
    CompletableFuture<?> clear();

    /**
     * List all keys.
     * @param latest ensure that we are up-to-date
     * @return a handle to the completion of the operation
     */
    default CompletableFuture<Collection<K>> listKeys(boolean latest) {
        List<K> result = new CopyOnWriteArrayList<>();
        return scan((k) -> true, (k,v) -> {
            result.add(k);
        }, latest).thenApply(___ -> result);
    }

    /**
     * Delete a binding
     * @param key the key
     * @return a handle to the completion of the operation
     */
    default CompletableFuture<V> delete(K key) {
        return update(key, (k, v) -> null);
    }

    /**
     * Update a binding only if the value matches the expected value
     * @param key the key
     * @param expectedValue the expected value, null means that the binding does not exist
     * @return a handle to the completion of the operation
     */
    default CompletableFuture<V> replace(K key, V expectedValue, V value) {
        return update(key, (k,v ) -> Objects.equals(v, expectedValue) ? value : v);
    }
    /**
     * Update a binding
     * @param key the key
     * @return a handle to the completion of the operation
     */
    default CompletableFuture<V> put(K key, V value) {
        return update(key, (k,v )-> value);
    }

    /**
     * Update a binding only the key is not not bound.
     * @param key the key
     * @return a handle to the completion of the operation
     */
    default CompletableFuture<V> putIfAbsent(K key, V value) {
        return replace(key, null, value);
    }

}

Implementation

The proposal is to add this SharedStateManager API as part of the Java Pulsar Client API:

This way the API and the implementation will be available to every Pulsar Client user and also for Pulsar IO Connectors and Pulsar Broker Protocol Handlers.

An alternative is to put it in the pulsar-adapters repository, but that would make it harder to discover the API and also it will require Pulsar IO Adapters and Broker Protocol Handlers to bundle copies of this new API into the .nar files.

The SharedStateManager holds in memory a reference to a Java object, that represents the State. There is a non-partitioned Pulsar topic that stores all the changes on the Java object.

In order to update the State the local SharedStateManager performs these steps:

When you are reading the State you have two ways:

If you want to ensure strong consistency you perform a "read" operation together with a dummy write operation, so inside the implicit Lock acquired by the Exclusive producer.

at bootstrap we read fully the topic (from the beginning to the tail) in order to build the State. We do not want to require to the Client application to store locally the State.

This sample PulsarMap implementation, describes how to use the SharedStateManager:

public class PulsarMapImpl <K,V> implements PulsarMap<K,V> {
    private static ObjectMapper mapper = new ObjectMapper();
    private final PulsarSharedStateManager<Map<K,V>, MapOp> stateManager;
    private final SerDe<K> keySerDe;
    private final SerDe<V> valueSerDe;

    public PulsarMapImpl(PulsarSharedStateManager.PulsarSharedStateManagerBuilder builder,
                         SerDe<K> keySerDe,
                         SerDe<V> valueSerDe
    ) {
        this.keySerDe = keySerDe;
        this.valueSerDe = valueSerDe;
        this.stateManager = builder
                .withOpSerializer(this::serializeOp)
                .withOpDeserializer(this::deserializeOp)
                .withDatabaseInitializer(() -> new ConcurrentHashMap<K,V>())
                .<Map<K,V>, MapOp>withChangeLogApplier(this::applyOp)
                .build();
    }

    @Override
    public CompletableFuture<V> getOrDefault(K key, V defaultValue, boolean latest) {
        return stateManager.read(map -> map.getOrDefault(key, defaultValue), latest);
    }

    @Override
    public CompletableFuture<?> scan(Function<K, Boolean> filter, BiConsumer<K, V> processor, boolean latest) {
        return stateManager.read(map -> {
            map.forEach((k,v) -> {
                if (filter.apply(k)) {
                    processor.accept(k, v);
                }
            });
            return null;
        }, latest);
    }

    @Override
    public CompletableFuture<V> update(K key, BiFunction<K, V, V> operation) {
        return stateManager.write(map -> {
            V currentValue = map.get(key);
            V finalValue = operation.apply(key, currentValue);
            if (finalValue == null) {
                return Collections.singletonList(MapOp.DELETE(key));
            } else {
                return Collections.singletonList(MapOp.PUT(key, finalValue));
            }
        }, map -> map.get(key));
    }

    @Override
    public CompletableFuture<?> updateMultiple(Function<K, Boolean> filter, BiFunction<K, V, V> operation) {
        return stateManager.write(map -> {
            List<MapOp> updates = new ArrayList<>();
            map.forEach((key, currentValue) -> {
                V finalValue = operation.apply(key, currentValue);
                if (finalValue == null) {
                    updates.add(MapOp.DELETE(key));
                } else {
                    updates.add(MapOp.PUT(key, finalValue));
                }
            });
            return updates;
        }, null);
    }

    @Override
    public CompletableFuture<?> clear() {
        return stateManager.write(map -> {
            return Collections.singletonList(MapOp.CLEAR());
        }, Function.identity());
    }

    @AllArgsConstructor
    @Data
    private static class MapOp<K,V> {
        private final static int TYPE_CLEAR = 0;
        private final static int TYPE_PUT =  1;
        private final static int TYPE_DELETE =  2;
        private final int type;
        private final K key;
        private final V value;

        static <K,V> MapOp<K,V> CLEAR() {
            return new MapOp(TYPE_CLEAR, null, null);
        }
        static <K,V> MapOp<K,V> PUT(K key, V value) {
            return new MapOp(TYPE_PUT, key, value);
        }
        static <K,V> MapOp<K,V> DELETE(K key) {
            return new MapOp(TYPE_DELETE, key, null);
        }
    }

    public <K,V> void applyOp(Map<K, V> map, MapOp<K,V> op) {
        switch (op.getType()) {
            case MapOp.TYPE_CLEAR:
                map.clear();
                break;
            case MapOp.TYPE_PUT:
                map.put(op.getKey(), op.getValue());
                break;
            case MapOp.TYPE_DELETE:
                map.remove(op.getKey());
                break;
            default:
                log.warn("Ignore MapOp {} on {}", op, this);
                break;
        }
    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    private static final class SerializedMapOp {
        private int type;
        private byte[] key;
        private byte[] value;
    }

    private byte[] serializeOp(MapOp<K,V> op) {
        try {
            SerializedMapOp ser = new SerializedMapOp(op.getType(),
                    op.getKey() != null ? keySerDe.serialize(op.getKey()) : null,
                    op.getValue() != null ? valueSerDe.serialize(op.getValue()) : null);
            return mapper.writeValueAsBytes(ser);
        } catch (IOException err) {
            throw new RuntimeException(err);
        }
    }

    private MapOp<K,V> deserializeOp(byte[] value) {
        try {
            SerializedMapOp ser =  mapper.readValue(value, SerializedMapOp.class);
            return new MapOp<K, V>(ser.getType(),
                    ser.getKey() != null ? keySerDe.deserialize(ser.getKey()) : null,
                    ser.getValue() != null ? valueSerDe.deserialize(ser.getValue()) : null);
        } catch (IOException err) {
            throw new RuntimeException(err);
        }
    }

    @Override
    public void close() {
        stateManager.close();
    }
}

Future works and other considerations

Depending on the implementation of the Shared State (this is up to the developer, so the user of the new API) you need to set infinite retention on the support topic, otherwise you may lose some changes from the commit log.

Pulsar is very flexible and initially we can let the user configure properly the system, this is because we want to provide the basic API to easily build a Shared State Manager, using the Exclusive Producer API together with the Reader API.

In the future we can implement more advanced features like making checkpoints or leveraging compacted topics, but this can be done as a follow up work.

Reject Alternatives

None

github-actions[bot] commented 2 years ago

The issue had no activity for 30 days, mark with Stale label.