hazelcast / hazelcast

Hazelcast is a unified real-time data platform combining stream processing with a fast data store, allowing customers to act instantly on data-in-motion for real-time insights.
https://www.hazelcast.com
Other
6.11k stars 1.83k forks source link

add support of CompletableFuture/ListenableFuture to ExecutorService #3622

Open vtsyryuk opened 10 years ago

vtsyryuk commented 10 years ago

basically i would like to have ability to code as follows:

final IExecutorService remoteExecutor = hazelcastInstance.getExecutorService("name");
final ICompletableFuture<SomeValue> future = remoteExecutor.submit(some_task);

ideally to have ability of using either CompletableFuture compatible with Java8+ or ListenableFuture (google.guava)

as of now, i am using the following workaround:

final IExecutorService remoteExecutor = hazelcastInstance.getExecutorService("name");
final ListeningExecutorService threadPoolExecutor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
final ICompletableFuture<SomeValue> future = remoteExecutor.submit(some_task);
final ListenableFuture<SomeValue> task = JdkFutureAdapters.listenInPoolThread(future, threadPoolExecutor);
Futures.addCallback(task, new TaskCallback<SomeValue>(), threadPoolExecutor);
ajermakovics commented 10 years ago

Another option would be to use the remoteExecutor.submit(some_task, callback) API. If you wrap Guava's SettableFuture in an ExecutionCallback interface then you can use that future as a ListenableFuture.

vtsyryuk commented 10 years ago

@ajermakovics, thank you for your prompt response. Apparently, it's better idea what you have suggested, will it be smth like as follows?

import java.io.Serializable;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.hazelcast.core.ExecutionCallback;

public final class TaskExecutionCallback<T extends Serializable> implements ExecutionCallback<T>, Serializable {

    private static final long serialVersionUID = 1L;
    private final SettableFuture<T> future = SettableFuture.create();

    public ListenableFuture<T> getFuture() {
        return future;
    }

    @Override
    public void onResponse(T response) {
        future.set(response);
    }

    @Override
    public void onFailure(Throwable t) {
        future.setException(t);
    }
}
ajermakovics commented 10 years ago

Yes, along those lines. Though, depending on your use case, you could pass a SettableFuture in the constructor or make TaskExecutionCallback extend ForwardingListenableFuture.

vtsyryuk commented 10 years ago

Is the following version makes sense and summarizes the discussion?

import java.io.Serializable;

import com.google.common.util.concurrent.ForwardingListenableFuture;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.hazelcast.core.ExecutionCallback;

public final class TaskExecutionCallback<T extends Serializable> 
                    extends ForwardingListenableFuture<T> 
                    implements ExecutionCallback<T>, Serializable {

    private static final long serialVersionUID = 1L;
    private final SettableFuture<T> future = SettableFuture.create();

    @Override
    public void onResponse(T response) {
        future.set(response);
    }

    @Override
    public void onFailure(Throwable t) {
        future.setException(t);
    }

    @Override
    protected ListenableFuture<T> delegate() {
        return future;
    }
}

A client code might be as follows:

final IExecutorService remoteExecutor = hazelcastInstance.getExecutorService("name");
final TaskExecutionCallback<Integer> taskCallback = new TaskExecutionCallback<Integer>();
remoteExecutor.submit(new Callable<Integer>() {

    @Override
    public Integer call() throws Exception {
        return 1;
    }
}, taskCallback);

Futures.transform(taskCallback, new Function<Integer, Integer>() {

    @Override
    public Integer apply(Integer input) {
        return input * 2;
    }
});

// taskCallback.cancel(true); <-- the way task can be cancelled

Conclusion

The only concern i have is that TaskExecutionCallback is not of type ExecutionCallback, it would have mixed responsibility/role (i.e. Future + ExecutionCallback) - not sure it's ok from the design perspective

also, will i be able to run transformation on remote service? e.g. (hazelcast node) as follows:

Futures.transform(taskCallback, new Function<Integer, Integer>() {

    @Override
    public Integer apply(Integer input) {
        return input * 2;
    }
}, *remoteExecutor*);
ajermakovics commented 10 years ago

If it extends ForwardingListenableFuture then you can use TaskExecutionCallback as a ListenableFuture. It achieves the same thing but might be slightly more convenient.

vtsyryuk commented 10 years ago

By the way, I also was wondering what is the reason ClientExecutionService is hidden in HazelcastClient?

mmedenjak commented 7 years ago

@vtsyryuk Hi! Unfortunately we can't change the return type until a new major version. We will probably introduce this change in 4.0. Until then you can use the ExecutionCallback as @ajermakovics suggested or use the durable executor which returns DurableExecutorServiceFuture (which extends ICompletableFuture).

azotcsit commented 1 year ago

We execute Hazelcast callables from a WebFlux-based application. Originally, we modified the above example to use SettableListenableFuture and it worked fine for us. After upgrade to Spring 3 (where SettableListenableFuture is deprecated in favor of CompletableFuture), we modified the code to use CompletableFuture. I'd like to share the code as it might be useful for somebody else.

Adapter:

import com.hazelcast.core.ExecutionCallback;
import java.util.concurrent.CompletableFuture;

public class ExecutionCallbackToCompletableFutureAdapter<T> implements ExecutionCallback<T> {

    private final CompletableFuture<T> completableFuture = new CompletableFuture<>();

    @Override
    public void onResponse(T response) {
      completableFuture.complete(response);
    }

    @Override
    public void onFailure(Throwable t) {
      completableFuture.completeExceptionally(t);
    }

    public CompletableFuture<T> getCompletableFuture() {
      return completableFuture;
    }
  }

A client code might be as follows:

IExecutorService executorService = hazelcastInstance.getExecutorService("my-executor-service");
Callable<Integer> callable = () -> 1;

ExecutionCallbackToCompletableFutureAdapter<Integer> callback = new ExecutionCallbackToCompletableFutureAdapter<>();
executorService.submit(callable, callback);
Mono<Integer> resultMono = Mono.fromCompletionStage(callback.getCompletableFuture());