This project provides a way to use the Completable Future API with Vert.x.
Using Completable Future with Vert.x may lead to some thread issues.
When you execute an asynchronous operation with Vert.x, the Handler<AsyncResult<T>>
is called on on the same thread as the method having enqueued the asynchronous operation. Thanks to this single-threaded aspect, Vert.x removes the need of synchronization and also it improves performances.
Completable Future uses a fork-join thread pool. So callbacks (dependent stages) are called in a thread of this pool, so it's not the caller thread.
This project provides the Completable Future API but enforces the Vert.x threading model:
xAsync
methods (without executor
), the callbacks are called on the Vert.x contextxAsync
methods with an Executor
parameter, this executor is used to execute the callback (does not enforce the Vert.x thread system)All methods taking a Vertx
instance as parameter exists with a Context
parameter.
// From a Vert.x instance
CompletableFuture<T> future = new VertxCompletableFuture(vertx);
// From a specific context
Context context = ...
CompletableFuture<T> future = new VertxCompletableFuture(context);
// From a completable future
CompletableFuture cf = ...
CompletableFuture<T> future = VertxCompletableFuture.from(context, cf);
// From a Vert.x future
Future<T> fut = ...
CompletableFuture<T> future = VertxCompletableFuture.from(context, fut);
You can also pass a Supplier
or a Runnable
:
// Run in context
CompletableFuture<String> future = VertxCompletableFuture.supplyAsync(vertx, () -> return "foo";);
// Run in Vert.x worker thread
CompletableFuture<String> future = VertxCompletableFuture.supplyBlockingAsync(vertx, () -> return "foo");
CompletableFuture<Void> future = VertxCompletableFuture.runAsync(vertx, () -> System.out.println(foo"));
CompletableFuture<Void> future = VertxCompletableFuture.runBlockingAsync(vertx, () -> System.out.println(foo"));
*BlockingAsync method uses a Vert.x worker thread and do not block the Vert.x Event Loop.
Once you have the VertxCompletableFuture
instance, you can use the CompletableFuture
API:
VertxCompletableFuture<Integer> future = new VertxCompletableFuture<>(vertx);
future
.thenApply(this::square)
.thenAccept(System.out::print)
.thenRun(System.out::println);
// Somewhere in your code, later...
future.complete(42);
You can compose, combine, join completable futures with:
The VertxCompletableFuture
class offers two composition operators:
allOf
- execute all the passed CompletableFuture
(not necessarily VertxCompletableFuture
) and calls dependant stages when all of the futures have been completed or one has failed anyOf
- execute all the passed CompletableFuture
(not necessarily VertxCompletableFuture
) and calls dependant stages when one of them has been completedUnlike CompletableFuture
, the dependent stages are called in the Vert.x context.
HttpClientOptions options = new HttpClientOptions().setDefaultPort(8080).setDefaultHost("localhost");
HttpClient client1 = vertx.createHttpClient(options);
HttpClient client2 = vertx.createHttpClient(options);
VertxCompletableFuture<Integer> requestA = new VertxCompletableFuture<>(vertx);
client1.get("/A").handler(resp -> {
resp.exceptionHandler(requestA::completeExceptionally)
.bodyHandler(buffer -> {
requestA.complete(Integer.parseInt(buffer.toString()));
});
}).exceptionHandler(requestA::completeExceptionally).end();
VertxCompletableFuture<Integer> requestB = new VertxCompletableFuture<>(vertx);
client2.get("/B").handler(resp -> {
resp.exceptionHandler(requestB::completeExceptionally)
.bodyHandler(buffer -> {
requestB.complete(Integer.parseInt(buffer.toString()));
});
}).exceptionHandler(requestB::completeExceptionally).end();
VertxCompletableFuture.allOf(requestA, requestB).thenApply(v -> requestA.join() + requestB.join())
.thenAccept(i -> {
tc.assertEquals(65, i);
async.complete();
});
}
You can transform a VertxCompletableFuture
to a Vert.x Future
with the toFuture
method.
You can also creates a new VertxCompletableFuture
from a Vert.x Future
using:
Future<Integer> vertxFuture = ...
VertxCompletableFuture<Integer> vcf = VertxCompletableFuture.from(vertx, vertxFuture);
vcf.thenAccept(i -> {...}).whenComplete((res, err) -> {...})