apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.19k stars 3.58k forks source link

PIP-137: Pulsar Client Shared State API #13490

Open eolivelli opened 2 years ago

eolivelli commented 2 years ago

Motivation

Sometimes in a distributed application or library that already uses Pulsar you need to some "state" across several instances of the application, for example:

Such cases are also very frequent while developing Pulsar IO Connectors or Pulsar Broker Protocol Handlers.

Currently you end up in adding some additional component to the application, like a Database, or in using the internal ZooKeeper or BookKeeper/Distributed Log components supporting Pulsar. This is usually awkward both for the developers and for system administrators.

We can provide a built-in mechanism in the Pulsar client API to support building such shared data structures.

In fact since Pulsar 2.8.0 we have the Exclusive Producer, that allows you to use Pulsar as a consistent write-ahead-log for replicated state machines.

We can provide an API to handle a shared distributed Java Object: each client can access the Object and mutate the State, ensuring consistency.

This is a sample implementation: https://github.com/eolivelli/pulsar-shared-state-manager

Goal

It is not a goal to implement a Pulsar backed Database system

API Changes

public interface SharedStateManager<V, O> extends AutoCloseable {

    /**
     * Read from the current state.
     * @param reader a function that accesses current state and returns a value
     * @param latest ensure that the value is the latest
     * @return an handle to the result of the operation
     */
    <K> CompletableFuture<K> read(Function<V, K> reader, boolean latest);

    /*
     * Execute a mutation on the state.
     * The operationsGenerator generates a list of mutations to be
     * written to the log, the operationApplier function
     * is executed to mutate the state after each successful write
     * to the log. Finally the reader function can read from
     * the current status before releasing the write lock.
     * @param operationsGenerator generates a list of mutations
     * @param operationApplier apply each mutation to the current state
     * @param reader read from the status while inside the write lock
     * @param <K> the returned data type
     * @param <O> the operation type
     * @return a handle to the completion of the operation
     */
    <K> CompletableFuture<K> write(Function<V, List<O>> operationsGenerator,
                                     Function<V, K> reader);

    @Override            
    void close();                            

    interface SharedStateManagerBuilder {

        <O> SharedStateManagerBuilder withOpSerializer(Function<O, byte[]> opSerializer);
        <O> SharedStateManagerBuilder withOpDeserializer(Function<byte[], O> opDeserializer);
        <V> SharedStateManagerBuilder withDatabaseInitializer(Supplier<V> databaseInitializer);
        <V, O> SharedStateManagerBuilder withChangeLogApplier(BiConsumer<V, O> changeLogApplier);
        <V, O> SharedStateManagerBuilder<V, O> build();
    }

}

PulsarMap recipe, interface:

public interface PulsarMap<K,V> extends AutoCloseable {

    /**
     * Get the value associated to a Key.
     * @param key the key
     * @param latest ensure that the value is the latest
     * @return an handle to the result of the operation
     */
    default CompletableFuture<V> get(K key, boolean latest) {
        return getOrDefault(key, null, latest);
    }

    /**
     * Get the value associated to a Key.
     * @param key the key
     * @param defaultValue a value in case that the key is not bound to any value
     * @param latest ensure that the value is the latest
     * @return an handle to the result of the operation
     */
    CompletableFuture<V> getOrDefault(K key, V defaultValue, boolean latest);

    /**
     * Scan the database
     * @param filter a filter on the key
     * @param processor the function to process the data
     * @param latest ensure that the value observed is the latest
     * @return an handle to the result of the operation
     */
    CompletableFuture<?> scan(Function<K, Boolean> filter, BiConsumer<K, V> processor, boolean latest);

    /**
     * Update a binding, the operation may be executed multiple times, until the operation succeeds.
     * If the operation returns null the  value will be removed
     * @param key the key
     * @param operation a function that modifies the value
     * @return an handle to the completion of the operation
     */
    CompletableFuture<V> update(K key, BiFunction<K, V, V> operation);

    /**
     * Update multiple bindings, the operation may be executed multiple times, until the operation succeeds.
     * For each key the operation returns null the value will be removed
     * @param filter a filter to skip processing some keys and reduce the usage of resources
     * @param operation a function that modifies the value
     * @return a handle to the completion of the operation
     */
    CompletableFuture<?> updateMultiple(Function<K, Boolean> filter, BiFunction<K, V, V> operation);

    /**
     * Delete all bindings.
     * @return a handle to the completion of the operation
     */
    CompletableFuture<?> clear();

    /**
     * List all keys.
     * @param latest ensure that we are up-to-date
     * @return a handle to the completion of the operation
     */
    default CompletableFuture<Collection<K>> listKeys(boolean latest) {
        List<K> result = new CopyOnWriteArrayList<>();
        return scan((k) -> true, (k,v) -> {
            result.add(k);
        }, latest).thenApply(___ -> result);
    }

    /**
     * Delete a binding
     * @param key the key
     * @return a handle to the completion of the operation
     */
    default CompletableFuture<V> delete(K key) {
        return update(key, (k, v) -> null);
    }

    /**
     * Update a binding only if the value matches the expected value
     * @param key the key
     * @param expectedValue the expected value, null means that the binding does not exist
     * @return a handle to the completion of the operation
     */
    default CompletableFuture<V> replace(K key, V expectedValue, V value) {
        return update(key, (k,v ) -> Objects.equals(v, expectedValue) ? value : v);
    }
    /**
     * Update a binding
     * @param key the key
     * @return a handle to the completion of the operation
     */
    default CompletableFuture<V> put(K key, V value) {
        return update(key, (k,v )-> value);
    }

    /**
     * Update a binding only the key is not not bound.
     * @param key the key
     * @return a handle to the completion of the operation
     */
    default CompletableFuture<V> putIfAbsent(K key, V value) {
        return replace(key, null, value);
    }

}

Implementation

The proposal is to add this SharedStateManager API as part of the Java Pulsar Client API:

This way the API and the implementation will be available to every Pulsar Client user and also for Pulsar IO Connectors and Pulsar Broker Protocol Handlers.

An alternative is to put it in the pulsar-adapters repository, but that would make it harder to discover the API and also it will require Pulsar IO Adapters and Broker Protocol Handlers to bundle copies of this new API into the .nar files.

The SharedStateManager holds in memory a reference to a Java object, that represents the State. There is a non-partitioned Pulsar topic that stores all the changes on the Java object.

In order to update the State the local SharedStateManager performs these steps:

When you are reading the State you have two ways:

If you want to ensure strong consistency you perform a "read" operation together with a dummy write operation, so inside the implicit Lock acquired by the Exclusive producer.

at bootstrap we read fully the topic (from the beginning to the tail) in order to build the State. We do not want to require to the Client application to store locally the State.

This sample PulsarMap implementation, describes how to use the SharedStateManager:

public class PulsarMapImpl <K,V> implements PulsarMap<K,V> {
    private static ObjectMapper mapper = new ObjectMapper();
    private final PulsarSharedStateManager<Map<K,V>, MapOp> stateManager;
    private final SerDe<K> keySerDe;
    private final SerDe<V> valueSerDe;

    public PulsarMapImpl(PulsarSharedStateManager.PulsarSharedStateManagerBuilder builder,
                         SerDe<K> keySerDe,
                         SerDe<V> valueSerDe
    ) {
        this.keySerDe = keySerDe;
        this.valueSerDe = valueSerDe;
        this.stateManager = builder
                .withOpSerializer(this::serializeOp)
                .withOpDeserializer(this::deserializeOp)
                .withDatabaseInitializer(() -> new ConcurrentHashMap<K,V>())
                .<Map<K,V>, MapOp>withChangeLogApplier(this::applyOp)
                .build();
    }

    @Override
    public CompletableFuture<V> getOrDefault(K key, V defaultValue, boolean latest) {
        return stateManager.read(map -> map.getOrDefault(key, defaultValue), latest);
    }

    @Override
    public CompletableFuture<?> scan(Function<K, Boolean> filter, BiConsumer<K, V> processor, boolean latest) {
        return stateManager.read(map -> {
            map.forEach((k,v) -> {
                if (filter.apply(k)) {
                    processor.accept(k, v);
                }
            });
            return null;
        }, latest);
    }

    @Override
    public CompletableFuture<V> update(K key, BiFunction<K, V, V> operation) {
        return stateManager.write(map -> {
            V currentValue = map.get(key);
            V finalValue = operation.apply(key, currentValue);
            if (finalValue == null) {
                return Collections.singletonList(MapOp.DELETE(key));
            } else {
                return Collections.singletonList(MapOp.PUT(key, finalValue));
            }
        }, map -> map.get(key));
    }

    @Override
    public CompletableFuture<?> updateMultiple(Function<K, Boolean> filter, BiFunction<K, V, V> operation) {
        return stateManager.write(map -> {
            List<MapOp> updates = new ArrayList<>();
            map.forEach((key, currentValue) -> {
                V finalValue = operation.apply(key, currentValue);
                if (finalValue == null) {
                    updates.add(MapOp.DELETE(key));
                } else {
                    updates.add(MapOp.PUT(key, finalValue));
                }
            });
            return updates;
        }, null);
    }

    @Override
    public CompletableFuture<?> clear() {
        return stateManager.write(map -> {
            return Collections.singletonList(MapOp.CLEAR());
        }, Function.identity());
    }

    @AllArgsConstructor
    @Data
    private static class MapOp<K,V> {
        private final static int TYPE_CLEAR = 0;
        private final static int TYPE_PUT =  1;
        private final static int TYPE_DELETE =  2;
        private final int type;
        private final K key;
        private final V value;

        static <K,V> MapOp<K,V> CLEAR() {
            return new MapOp(TYPE_CLEAR, null, null);
        }
        static <K,V> MapOp<K,V> PUT(K key, V value) {
            return new MapOp(TYPE_PUT, key, value);
        }
        static <K,V> MapOp<K,V> DELETE(K key) {
            return new MapOp(TYPE_DELETE, key, null);
        }
    }

    public <K,V> void applyOp(Map<K, V> map, MapOp<K,V> op) {
        switch (op.getType()) {
            case MapOp.TYPE_CLEAR:
                map.clear();
                break;
            case MapOp.TYPE_PUT:
                map.put(op.getKey(), op.getValue());
                break;
            case MapOp.TYPE_DELETE:
                map.remove(op.getKey());
                break;
            default:
                log.warn("Ignore MapOp {} on {}", op, this);
                break;
        }
    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    private static final class SerializedMapOp {
        private int type;
        private byte[] key;
        private byte[] value;
    }

    private byte[] serializeOp(MapOp<K,V> op) {
        try {
            SerializedMapOp ser = new SerializedMapOp(op.getType(),
                    op.getKey() != null ? keySerDe.serialize(op.getKey()) : null,
                    op.getValue() != null ? valueSerDe.serialize(op.getValue()) : null);
            return mapper.writeValueAsBytes(ser);
        } catch (IOException err) {
            throw new RuntimeException(err);
        }
    }

    private MapOp<K,V> deserializeOp(byte[] value) {
        try {
            SerializedMapOp ser =  mapper.readValue(value, SerializedMapOp.class);
            return new MapOp<K, V>(ser.getType(),
                    ser.getKey() != null ? keySerDe.deserialize(ser.getKey()) : null,
                    ser.getValue() != null ? valueSerDe.deserialize(ser.getValue()) : null);
        } catch (IOException err) {
            throw new RuntimeException(err);
        }
    }

    @Override
    public void close() {
        stateManager.close();
    }
}

Future works and other considerations

Depending on the implementation of the Shared State (this is up to the developer, so the user of the new API) you need to set infinite retention on the support topic, otherwise you may lose some changes from the commit log.

Pulsar is very flexible and initially we can let the user configure properly the system, this is because we want to provide the basic API to easily build a Shared State Manager, using the Exclusive Producer API together with the Reader API.

In the future we can implement more advanced features like making checkpoints or leveraging compacted topics, but this can be done as a follow up work.

Reject Alternatives

None

RobertIndie commented 2 years ago

This PIP number is duplicated with https://github.com/apache/pulsar/issues/13408

eolivelli commented 2 years ago

@mattisonchao yes you could, but most of the code is already in the attached github repo.

eolivelli commented 2 years ago

we talked in the Community meeting about this PIP.

The main concern from @merlimat is the exposing a "put" operation will give a false illusion that you have something like a Map, but actually every "write" operation will be "slow" because we need to acquire the Producer lock.

@merlimat suggested to split this into two distinct interfaces: a API for readers and one for writers, possibly extending the work done on PIP-104 TableViews (https://github.com/apache/pulsar/issues/12356).

I am fine with extending PIP-104 in that direction (to add a writer side of the TableView), but I believe that the API proposed here is going to give a API to achieve the goal of having a generic "Java object" as share state, by letting the developer of the Shared State Object deal with the internal representation. Using the TableView API will forces users to represent their Java object as a set of properties or a single object with one fixed key. It is feasible, but not straightforward from my point of view. I really would like to mimic Pravega.io StateSynchronizer support, that does not force you to see your topic as a "table".

@merlimat I will work in parallel on extending the TableView API to add writer support, I am sure it will bring a big value to the users. But I still thing that we should add this "Recipe" to the Pulsar Adapters repository

@merlimat WDTY ?

github-actions[bot] commented 2 years ago

The issue had no activity for 30 days, mark with Stale label.

github-actions[bot] commented 2 years ago

The issue had no activity for 30 days, mark with Stale label.