redisson / redisson

Redisson - Easy Redis Java client and Real-Time Data Platform. Valkey compatible. Sync/Async/RxJava/Reactive API. Over 50 Redis or Valkey based Java objects and services: Set, Multimap, SortedSet, Map, List, Queue, Deque, Semaphore, Lock, AtomicLong, Map Reduce, Bloom filter, Spring, Tomcat, Scheduler, JCache API, Hibernate, RPC, local cache...
https://redisson.pro
Apache License 2.0
23.25k stars 5.34k forks source link

Implement Watchdog for RPermitExpirableSemaphore #3046

Open mrniko opened 4 years ago

regbo commented 2 years ago

IDK how helpful this is, but here is how we implemented a solution fo this issue. Basically we added a polling future (using the threadly library) which auto extends the permit until a settable callback future is completed.

Some of the code is from internal util libraries, but most of it should make sense. I am sure it would be more efficient to use a universal per-client watchdog.

import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import org.redisson.api.RFuture;
import org.redisson.api.RPermitExpirableSemaphore;
import org.threadly.concurrent.future.FutureUtils;
import org.threadly.concurrent.future.ListenableFuture;
import org.threadly.concurrent.future.ListenableFutureTask;
import org.threadly.concurrent.future.SettableListenableFuture;

import com.lfp.data.redisson.client.RedissonUtils;
import com.lfp.joe.core.function.Throws.ThrowingSupplier;
import com.lfp.joe.threads.Threads;

import javassist.util.proxy.MethodHandler;
import javassist.util.proxy.Proxy;
import javassist.util.proxy.ProxyFactory;

public interface RPermitExpirableSemaphoreLFP extends RPermitExpirableSemaphore {

    public static RPermitExpirableSemaphoreLFP from(@Nonnull RPermitExpirableSemaphore semaphore) {
        Objects.requireNonNull(semaphore);
        if (semaphore instanceof RPermitExpirableSemaphoreLFP)
            return (RPermitExpirableSemaphoreLFP) semaphore;
        RPermitExpirableSemaphoreLFP proxySemaphore;
        try {
            proxySemaphore = (RPermitExpirableSemaphoreLFP) Constants.PROXY_CONSTRUCTOR.newInstance();
        } catch (InstantiationException | IllegalAccessException | IllegalArgumentException
                | InvocationTargetException e) {
            throw new RuntimeException(e);
        }
        ((Proxy) proxySemaphore).setHandler(new MethodHandler() {

            @Override
            public Object invoke(Object self, Method thisMethod, Method proceed, Object[] args) throws Throwable {
                if (proceed != null)
                    return proceed.invoke(self, args);
                return thisMethod.invoke(semaphore, args);
            }
        });
        return proxySemaphore;
    }

    /**
     * 
     * acquire
     */

    default <U> ListenableFuture<SettableListenableFuture<U>> acquireAsyncKeepAlive(@Nullable Duration leaseTime,
            @Nullable Executor executor) {
        if (leaseTime == null)
            leaseTime = Constants.LEASE_TIME_DEFAULT;
        if (executor == null)
            executor = Constants.EXECUTOR_DEFAULT;
        return RPermitExpirableSemaphoreLFP.<U>acquireAsyncKeepAlive(this, leaseTime, executor,
                acquireAsync(leaseTime.toMillis(), TimeUnit.MILLISECONDS)).map(Optional::orElseThrow);
    }

    default <U> ListenableFuture<SettableListenableFuture<U>> acquireAsyncKeepAlive() {
        return acquireAsyncKeepAlive(null, null);
    }

    default <U> ListenableFuture<U> acquireAsyncKeepAliveSupply(@Nullable ThrowingSupplier<? extends U, ?> onAcquired) {
        return acquireAsyncKeepAliveSupply(onAcquired, null, null);
    }

    default <U> ListenableFuture<U> acquireAsyncKeepAliveSupply(@Nullable ThrowingSupplier<? extends U, ?> onAcquired,
            @Nullable Duration leaseTime, @Nullable Executor executor) {
        return this.<U>acquireAsyncKeepAliveFlatSupply(flatSupply(onAcquired), leaseTime, executor);

    }

    default <U> ListenableFuture<U> acquireAsyncKeepAliveFlatSupply(
            @Nullable ThrowingSupplier<? extends Future<? extends U>, ?> onAcquired) {
        return acquireAsyncKeepAliveFlatSupply(onAcquired, null, null);
    }

    default <U> ListenableFuture<U> acquireAsyncKeepAliveFlatSupply(
            @Nullable ThrowingSupplier<? extends Future<? extends U>, ?> onAcquired, @Nullable Duration leaseTime,
            @Nullable Executor executor) {
        return this.<U>acquireAsyncKeepAlive(leaseTime, executor).flatMap(sfuture -> {
            var future = getNonNull(onAcquired);
            linkCompletion(future, sfuture);
            return sfuture;
        });
    }

    /**
     * 
     * try acquire
     */

    default <U> ListenableFuture<Optional<SettableListenableFuture<U>>> tryAcquireAsyncKeepAlive(
            @Nullable Duration waitTime, @Nullable Duration leaseTime, @Nullable Executor executor) {
        if (waitTime == null)
            waitTime = Constants.WAIT_TIME_DEFAULT;
        if (leaseTime == null)
            leaseTime = Constants.LEASE_TIME_DEFAULT;
        if (executor == null)
            executor = Constants.EXECUTOR_DEFAULT;
        return RPermitExpirableSemaphoreLFP.acquireAsyncKeepAlive(this, leaseTime, executor,
                tryAcquireAsync(waitTime.toMillis(), leaseTime.toMillis(), TimeUnit.MILLISECONDS));
    }

    default <U> ListenableFuture<Optional<SettableListenableFuture<U>>> tryAcquireAsyncKeepAlive() {
        return tryAcquireAsyncKeepAlive(null, null, null);
    }

    default <U> ListenableFuture<U> tryAcquireAsyncKeepAliveSupply(
            @Nullable ThrowingSupplier<? extends U, ?> onAcquired, ThrowingSupplier<? extends U, ?> onNotAcquired) {
        return tryAcquireAsyncKeepAliveSupply(onAcquired, onNotAcquired, null, null, null);
    }

    default <U> ListenableFuture<U> tryAcquireAsyncKeepAliveSupply(
            @Nullable ThrowingSupplier<? extends U, ?> onAcquired, ThrowingSupplier<? extends U, ?> onNotAcquired,
            @Nullable Duration waitTime, @Nullable Duration leaseTime, @Nullable Executor executor) {
        return this.<U>tryAcquireAsyncKeepAliveFlatSupply(flatSupply(onAcquired), flatSupply(onNotAcquired), waitTime,
                leaseTime, executor);
    }

    default <U> ListenableFuture<U> tryAcquireAsyncKeepAliveFlatSupply(
            @Nullable ThrowingSupplier<? extends Future<? extends U>, ?> onAcquired,
            @Nullable ThrowingSupplier<? extends Future<? extends U>, ?> onNotAcquired) {
        return tryAcquireAsyncKeepAliveFlatSupply(onAcquired, onNotAcquired, null, null, null);
    }

    default <U> ListenableFuture<U> tryAcquireAsyncKeepAliveFlatSupply(
            @Nullable ThrowingSupplier<? extends Future<? extends U>, ?> onAcquired,
            @Nullable ThrowingSupplier<? extends Future<? extends U>, ?> onNotAcquired, @Nullable Duration waitTime,
            @Nullable Duration leaseTime, @Nullable Executor executor) {
        return this.<U>tryAcquireAsyncKeepAlive(waitTime, leaseTime, executor).flatMap(sfutureOp -> {
            if (sfutureOp.isEmpty())
                return Threads.Futures.asListenable(getNonNull(onNotAcquired));
            var sfuture = sfutureOp.get();
            var future = getNonNull(onAcquired);
            linkCompletion(future, sfuture);
            return sfuture;

        });
    }

    private static <U> ListenableFuture<Optional<SettableListenableFuture<U>>> acquireAsyncKeepAlive(
            @Nonnull RPermitExpirableSemaphore semaphore, @Nonnull Duration leaseTime, @Nonnull Executor executor,
            @Nonnull RFuture<String> acquireRFuture) {
        Objects.requireNonNull(semaphore);
        Objects.requireNonNull(leaseTime);
        Objects.requireNonNull(executor);
        Objects.requireNonNull(acquireRFuture);
        var acquireFuture = RedissonUtils.asListenableFuture(acquireRFuture, executor);
        return acquireFuture.map(permitId -> {
            if (permitId == null)
                return Optional.empty();
            return Optional.of(acquireAsyncKeepAlive(semaphore, leaseTime, executor, permitId));
        });
    }

    private static <U> SettableListenableFuture<U> acquireAsyncKeepAlive(RPermitExpirableSemaphore semaphore,
            Duration leaseTime, Executor executor, String permitId) {
        final Duration buffer;
        {
            var bufferMillis = Double.valueOf(leaseTime.toMillis() * Constants.BUFFER_PERCENTAGE).longValue();
            bufferMillis = Math.min(bufferMillis, Constants.BUFFER_MAX.toMillis());
            buffer = Duration.ofMillis(bufferMillis);
        }
        var asyncTask = new Callable<ListenableFuture<Boolean>>() {

            private long expiresAt = System.currentTimeMillis() + leaseTime.toMillis();

            @Override
            public ListenableFuture<Boolean> call() throws Exception {
                var nextExpiresAt = expiresAt + leaseTime.toMillis();
                Callable<ListenableFuture<Boolean>> task = () -> {
                    var updateLeaseTimeMillis = nextExpiresAt - System.currentTimeMillis();
                    if (updateLeaseTimeMillis < 0)
                        // executor may not have scheduled in time
                        return FutureUtils.immediateResultFuture(false);
                    var updateLeaseTimeFuture = semaphore.updateLeaseTimeAsync(permitId, updateLeaseTimeMillis,
                            TimeUnit.MILLISECONDS);
                    return RedissonUtils.asListenableFuture(updateLeaseTimeFuture, executor).resultCallback(success -> {
                        if (Boolean.TRUE.equals(success))
                            expiresAt = nextExpiresAt;
                    });
                };
                var lfutureTask = new ListenableFutureTask<ListenableFuture<Boolean>>(task, executor);
                var keepAliveDelayMillis = expiresAt - buffer.toMillis() - System.currentTimeMillis();
                if (keepAliveDelayMillis <= 0)
                    executor.execute(lfutureTask);
                else
                    CompletableFuture.delayedExecutor(keepAliveDelayMillis, TimeUnit.MILLISECONDS, executor)
                            .execute(lfutureTask);
                return lfutureTask.flatMap(Function.identity());
            }
        };
        var resultFuture = new SettableListenableFuture<U>(false);
        // continue until result, cancelled or unexpected error
        var keepAlivePollingFuture = FutureUtils.executeWhile(asyncTask, success -> {
            return Boolean.TRUE.equals(success) && !resultFuture.isDone();
        });
        resultFuture.listener(() -> keepAlivePollingFuture.cancel(true));
        keepAlivePollingFuture.resultCallback(v -> {
            if (!Boolean.TRUE.equals(v))
                resultFuture.setFailure(new IllegalStateException("update scheduling failed"));
            else
                // should be no op
                resultFuture.setResult(null);
        });
        keepAlivePollingFuture.failureCallback(t0 -> {
            // only need to release on cancel or unexpected error
            semaphore.releaseAsync(permitId).whenComplete((nil, t1) -> {
                if (t1 != null)
                    t0.addSuppressed(t1);
                resultFuture.setFailure(t0);
            });
        });
        return resultFuture;
    }

    private static <U> ThrowingSupplier<? extends Future<? extends U>, ?> flatSupply(
            ThrowingSupplier<? extends U, ?> throwingSupplier) {
        if (throwingSupplier == null)
            return null;
        return () -> Threads.Futures.immediateFuture(throwingSupplier);
    }

    private static <U> Future<? extends U> getNonNull(ThrowingSupplier<? extends Future<? extends U>, ?> supplier) {
        Future<? extends U> future;
        if (supplier == null)
            future = null;
        else
            try {
                future = supplier.get();
            } catch (Throwable t) {
                return FutureUtils.immediateFailureFuture(t);
            }
        if (future == null)
            return FutureUtils.immediateResultFuture(null);
        return future;
    }

    private static <U> void linkCompletion(Future<? extends U> future0, Future<? extends U> future1) {
        Objects.requireNonNull(future0);
        Objects.requireNonNull(future1);
        Threads.Futures.forwardCompletion(future0, future1);
        Threads.Futures.forwardCompletion(future1, future0);
    }

    static enum Constants {
        ;

        private static final float BUFFER_PERCENTAGE = .2f;
        private static final Duration BUFFER_MAX = Duration.ofSeconds(3);
        private static final Duration WAIT_TIME_DEFAULT = Duration.ZERO;
        private static final Duration LEASE_TIME_DEFAULT = Duration.ofSeconds(5);
        private static final Executor EXECUTOR_DEFAULT = Threads.Pools.centralPool();
        private static final Constructor<?> PROXY_CONSTRUCTOR;
        static {
            var proxyFactory = new ProxyFactory();
            proxyFactory.setUseCache(false);
            proxyFactory.setInterfaces(new Class<?>[] { RPermitExpirableSemaphoreLFP.class });
            try {
                PROXY_CONSTRUCTOR = proxyFactory.createClass().getConstructor();
            } catch (NoSuchMethodException | SecurityException e) {
                throw new RuntimeException(e);
            }
        }
    }

}
zxdposter commented 5 months ago

@mrniko hi guys any progress for this ?