sleroy / spring-cqrs-arch

Extension for the Spring framework using CQRS principles. It's useful to implement quickly an webapp.
https://www.sylvainleroy.com
Apache License 2.0
26 stars 12 forks source link

Evaluate using rxjava to replace the asynchronous code. #3

Open sleroy opened 5 years ago

sleroy commented 5 years ago

Evaluate using rxjava to replace the asynchronous code.

https://github.com/ReactiveX/RxJava

sleroy commented 5 years ago

Futures

Futures were introduced in Java 5 (2004). They're basically placeholders for a result of an operation that hasn't finished yet. Once the operation finishes, the Future will contain that result. For example, an operation can be a Runnable or Callable instance that is submitted to an ExecutorService. The submitter of the operation can use the Future object to check whether the operation isDone(), or wait for it to finish using the blocking get() method.

Example:

/**

}

public static void main(String[] args) throws Exception{ ExecutorService exec = Executors.newSingleThreadExecutor(); Future f = exec.submit(new MyCallable());

System.out.println(f.isDone()); //False

System.out.println(f.get()); //Waits until the task is done, then prints 1

}

CompletableFutures

CompletableFutures were introduced in Java 8 (2014). They are in fact an evolution of regular Futures, inspired by Google's Listenable Futures, part of the Guava library. They are Futures that also allow you to string tasks together in a chain. You can use them to tell some worker thread to "go do some task X, and when you're done, go do this other thing using the result of X". Using CompletableFutures, you can do something with the result of the operation without actually blocking a thread to wait for the result. Here's a simple example:

/**

/**

public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newSingleThreadExecutor(); CompletableFuture f = CompletableFuture.supplyAsync(new MySupplier(), exec); System.out.println(f.isDone()); // False CompletableFuture f2 = f.thenApply(new PlusOne()); System.out.println(f2.get()); // Waits until the "calculation" is done, then prints 2 }

RxJava

RxJava is whole library for reactive programming created at Netflix. At a glance, it will appear to be similar to Java 8's streams. It is, except it's much more powerful.

Similarly to Futures, RxJava can be used to string together a bunch of synchronous or asynchronous actions to create a processing pipeline. Unlike Futures, which are single-use, RxJava works on streams of zero or more items. Including never-ending streams with an infinite number of items. It's also much more flexible and powerful thanks to an unbelievably rich set of operators.

Unlike Java 8's streams, RxJava also has a backpressure mechanism, which allows it to handle cases in which different parts of your processing pipeline operate in different threads, at different rates.

The downside of RxJava is that despite the solid documentation, it is a challenging library to learn due to the paradigm shift involved. Rx code can also be a nightmare to debug, especially if multiple threads are involved, and even worse - if backpressure is needed.

If you want to get into it, there's a whole page of various tutorials on the official website, plus the official documentation and Javadoc. You can also take a look at some of the videos such as this one which gives a brief intro into Rx and also talks about the differences between Rx and Futures.

Bonus: Java 9 Reactive Streams

Java 9's Reactive Streams aka Flow API are a set of Interfaces implemented by various reactive streams libraries such as RxJava 2, Akka Streams, and Vertx. They allow these reactive libraries to interconnect, while preserving the all important back-pressure.

sleroy commented 5 years ago

https://stackoverflow.com/questions/35329845/difference-between-completablefuture-future-and-rxjavas-observable/46572647

sleroy commented 5 years ago

Version 3.x (Javadoc)

single dependency: Reactive-Streams
continued support for Java 6+ & Android 2.3+
fixed API mistakes and many limits of RxJava 2
intended to be a replacement for RxJava 2 with relatively few binary incompatible changes
Java 8 lambda-friendly API
non-opinionated about source of concurrency (threads, pools, event loops, fibers, actors, etc.)
async or synchronous execution
virtual time and schedulers for parameterized concurrency
test and diagnostic support via test schedulers, test consumers and plugin hooks

Learn more about RxJava in general on the Wiki Home.