👉 cffu
(CompletableFuture Fu
🦝)是一个小小的CompletableFuture(CF)
辅助增强库,提升CF
使用体验并减少误用,在业务中更方便高效安全地使用CF
。
欢迎 👏 💖
cffu
的三种使用方式Cffu
类CompletableFutureUtils
工具类Kotlin
扩展方法cffu
功能介绍CF
的结果AllFastFail
/AnySuccess
/AllSuccess
/MostSuccess
)join
的方法Backport
支持Java 8
anyOf
方法allof/anyOf
方法CompletableFuture
类迁移到Cffu
类提供的功能有:
CF
的结果,而不是无返回结果的Void
(allOf
)allResultsFastFailOf
/ allResultsOf
/ mSupplyFastFailAsync
/ thenMApplyFastFailAsync
CF
的结果,而不是同一类型allTupleFastFailOf
/ allTupleOf
/ tupleMSupplyFastFailAsync
/ thenTupleMApplyFastFailAsync
action
,而不是要先包装成CompletableFuture
tupleMSupplyFastFailAsync
/ mSupplyMostSuccessAsync
/ thenTupleMApplyFastFailAsync
/ thenMRunFastFailAsync
catching
方法,而不是处理所有异常Throwable
(exceptionally
)AllFastFail
策略:当输入的多个CF
有失败时快速失败返回,而不再于事无补地等待所有CF
运行完成(allOf
)AnySuccess
策略:返回首个成功的CF
结果,而不是首个完成(但可能失败)的CF
(anyOf
)AllSuccess
策略:返回多个CF
中成功的结果,对于失败的CF
返回指定的缺省值MostSuccess
策略:指定时间内返回多个CF
中成功的结果,对于失败或还没有运行完成的CF
返回指定的缺省值All(Complete)
/Any(Complete)
策略:这2个是CompletableFuture
已有支持的策略CffuFactory#builder(executor)
方法join
的方法,join(timeout, unit)
方法cffuOrTimeout
/cffuCompleteOnTimeout
方法peek
处理方法whenComplete
方法会修改输入CF
的结果)CffuFactoryBuilder#forbidObtrudeMethods
方法IDE
能尽早提示出问题@NonNull
、@Nullable
、@CheckReturnValue
、@Contract
等join(timeout, unit)
/cffuOrTimeout
/peek
),还有completeExceptionallyAsync
方法CF
失败或还没有运行完成则返回指定的缺省值,getSuccessNow
方法CF
异常成业务异常,unwrapCfException
方法anyOf
方法:返回具体类型T
(类型安全),而不是返回Object
(CompletableFuture#anyOf
)allof
/anyOf
方法:输入更宽泛的CompletionStage
参数类型,而不是CompletableFuture
类(CompletableFuture#allOf/anyOf
)Backport
支持Java 8
,Java 9+
高版本的所有CF
新功能在Java 8
低版本直接可用,如
orTimeout
/completeOnTimeout
方法delayedExecutor
方法failedFuture
/ completedStage
/ failedStage
completeAsync
/ exceptionallyAsync
/ exceptionallyCompose
/ copy
Kotlin
更多cffu
的使用方式与功能说明详见 User Guide。
CompletableFuture
如何管理并发执行是个复杂易错的问题,业界有大量的工具、框架可以采用。
并发工具、框架的广度了解,可以看看如《七周七并发模型》、《Java虚拟机并发编程》、《Scala并发编程(第2版)》;更多关于并发主题的书籍参见书单。
其中CompletableFuture(CF)
有其优点:
Java
标准库内置
CompletableFuture
在2014年发布的Java 8
提供,有10年了CompletableFuture
的父接口Future
早在2004年发布的Java 5
中提供,有20年了Future
接口不支持 执行结果的异步获取与并发执行逻辑的编排,但也让广大Java
开发者熟悉了Future
这个典型的概念与工具Lock
)、CountDownLatch
、信号量(Semaphore
)、CyclicBarrier
和其它并发工具、框架一样,CompletableFuture
用于
值得更深入了解和应用。 💕
cffu
的三种使用方式cffu
支持三种使用方式:
Cffu
类
Java
语言时,推荐这种使用方式CompletableFuture
类的代码可以比较简单的迁移到Cffu
类,包含2步修改:CompletableFuture
改成Cffu
CompletableFuture
静态方法调用的地方,类名CompletableFuture
改成cffuFactory
实例CompletableFuture
类迁移到Cffu
类io.foldright:cffu
库CompletableFutureUtils
工具类
Cffu
类)、觉得这样增加了复杂性的话,cffu
库作为一个工具类来用CompletableFuture
使用的工具方法在业务项目中很常见,CompletableFutureUtils
提供了一系列实用可靠高效安全的工具方法cffu
功能没有提供(也没有想到实现方案) 😔io.foldright:cffu
库Kotlin
扩展方法
Kotlin
语言时,推荐这种使用方式io.foldright:cffu-kotlin
库在介绍功能点之前,可以先看看cffu
不同使用方式的示例。 🎪
Cffu
类public class CffuDemo {
private static final ExecutorService myBizThreadPool = Executors.newCachedThreadPool();
// Create a CffuFactory with configuration of the customized thread pool
private static final CffuFactory cffuFactory = CffuFactory.builder(myBizThreadPool).build();
public static void main(String[] args) throws Exception {
final Cffu<Integer> cf42 = cffuFactory
.supplyAsync(() -> 21) // Run in myBizThreadPool
.thenApply(n -> n * 2);
// Below tasks all run in myBizThreadPool
final Cffu<Integer> longTaskA = cf42.thenApplyAsync(n -> {
sleep(1001);
return n / 2;
});
final Cffu<Integer> longTaskB = cf42.thenApplyAsync(n -> {
sleep(1002);
return n / 2;
});
final Cffu<Integer> longTaskC = cf42.thenApplyAsync(n -> {
sleep(100);
return n * 2;
});
final Cffu<Integer> longFailedTask = cf42.thenApplyAsync(unused -> {
sleep(1000);
throw new RuntimeException("Bang!");
});
final Cffu<Integer> combined = longTaskA.thenCombine(longTaskB, Integer::sum)
.orTimeout(1500, TimeUnit.MILLISECONDS);
System.out.println("combined result: " + combined.get());
final Cffu<Integer> anySuccess = cffuFactory.anySuccessOf(longTaskC, longFailedTask);
System.out.println("any success result: " + anySuccess.get());
}
}
# 完整可运行的Demo代码参见
CffuDemo.java
。
CompletableFutureUtils
工具类public class CompletableFutureUtilsDemo {
private static final ExecutorService myBizThreadPool = Executors.newCachedThreadPool();
public static void main(String[] args) throws Exception {
final CompletableFuture<Integer> cf42 = CompletableFuture
.supplyAsync(() -> 21, myBizThreadPool) // Run in myBizThreadPool
.thenApply(n -> n * 2);
final CompletableFuture<Integer> longTaskA = cf42.thenApplyAsync(n -> {
sleep(1001);
return n / 2;
}, myBizThreadPool);
final CompletableFuture<Integer> longTaskB = cf42.thenApplyAsync(n -> {
sleep(1002);
return n / 2;
}, myBizThreadPool);
final CompletableFuture<Integer> longTaskC = cf42.thenApplyAsync(n -> {
sleep(100);
return n * 2;
}, myBizThreadPool);
final CompletableFuture<Integer> longFailedTask = cf42.thenApplyAsync(unused -> {
sleep(1000);
throw new RuntimeException("Bang!");
}, myBizThreadPool);
final CompletableFuture<Integer> combined = longTaskA.thenCombine(longTaskB, Integer::sum);
final CompletableFuture<Integer> combinedWithTimeout =
orTimeout(combined, 1500, TimeUnit.MILLISECONDS);
System.out.println("combined result: " + combinedWithTimeout.get());
final CompletableFuture<Integer> anySuccess = anySuccessOf(longTaskC, longFailedTask);
System.out.println("any success result: " + anySuccess.get());
}
}
# 完整可运行的Demo代码参见
CompletableFutureUtilsDemo.java
。
Kotlin
扩展方法private val myBizThreadPool: ExecutorService = Executors.newCachedThreadPool()
// Create a CffuFactory with configuration of the customized thread pool
private val cffuFactory: CffuFactory = CffuFactory.builder(myBizThreadPool).build()
fun main() {
val cf42 = cffuFactory
.supplyAsync { 21 } // Run in myBizThreadPool
.thenApply { it * 2 }
// Below tasks all run in myBizThreadPool
val longTaskA = cf42.thenApplyAsync { n: Int ->
sleep(1001)
n / 2
}
val longTaskB = cf42.thenApplyAsync { n: Int ->
sleep(1002)
n / 2
}
val longTaskC = cf42.thenApplyAsync { n: Int ->
sleep(100)
n * 2
}
val longFailedTask = cf42.thenApplyAsync<Int> { _ ->
sleep(1000)
throw RuntimeException("Bang!")
}
val combined = longTaskA.thenCombine(longTaskB, Integer::sum)
.orTimeout(1500, TimeUnit.MILLISECONDS)
println("combined result: ${combined.get()}")
val anySuccess: Cffu<Int> = listOf(longTaskC, longFailedTask).anySuccessOfCffu()
println("any success result: ${anySuccess.get()}")
}
# 完整可运行的Demo代码参见
CffuDemo.kt
。
cffu
功能介绍CF
的结果CompletableFuture
的allOf
方法没有返回结果,只是返回Void
。不方便获取所运行的多个CF
结果:
allOf
方法之后再通过入参CF
的读操作(如join
/get
)来获取结果
join
/get
)是阻塞的,增加了业务逻辑的死锁风险❗️
更多说明可以看看CompletableFuture原理与实践 - 4.2.2 线程池循环引用会导致死锁Action
并在Action
中设置外部的变量,需要注意多线程读写的线程安全问题 ⚠️
cffu
的allResultsFastFailOf
/allResultsOf
方法提供了返回多个CF
结果的功能,使用库的功能直接获取整体结果:
CF
(可以继续串接非阻塞的操作),自然减少了阻塞的读方法(如join
/get
)使用,尽量降低死锁风险示例代码如下:
public class AllResultsOfDemo {
public static final Executor myBizExecutor = Executors.newCachedThreadPool();
public static final CffuFactory cffuFactory = CffuFactory.builder(myBizExecutor).build();
public static void main(String[] args) throws Exception {
//////////////////////////////////////////////////
// CffuFactory#allResultsOf
//////////////////////////////////////////////////
Cffu<Integer> cffu1 = cffuFactory.completedFuture(21);
Cffu<Integer> cffu2 = cffuFactory.completedFuture(42);
Cffu<Void> all = cffuFactory.allOf(cffu1, cffu2);
// Result type is Void!
//
// the result can be got by input argument `cf1.get()`, but it's cumbersome.
// so we can see a lot of util methods to enhance `allOf` with result in our project.
Cffu<List<Integer>> allResults = cffuFactory.allResultsOf(cffu1, cffu2);
System.out.println(allResults.get());
//////////////////////////////////////////////////
// or CompletableFutureUtils#allResultsOf
//////////////////////////////////////////////////
CompletableFuture<Integer> cf1 = CompletableFuture.completedFuture(21);
CompletableFuture<Integer> cf2 = CompletableFuture.completedFuture(42);
CompletableFuture<Void> all2 = CompletableFuture.allOf(cf1, cf2);
// Result type is Void!
CompletableFuture<List<Integer>> allResults2 = allResultsOf(cf1, cf2);
System.out.println(allResults2.get());
}
}
# 完整可运行的Demo代码参见
AllResultsOfDemo.java
。
上面多个相同结果类型的CF
,cffu
还提供了返回多个不同类型CF
结果的方法,allTupleFastFailOf
/allTupleOf
方法。
示例代码如下:
public class AllTupleOfDemo {
public static final Executor myBizExecutor = Executors.newCachedThreadPool();
public static final CffuFactory cffuFactory = CffuFactory.builder(myBizExecutor).build();
public static void main(String[] args) throws Exception {
//////////////////////////////////////////////////
// allTupleFastFailOf / allTupleOf
//////////////////////////////////////////////////
Cffu<String> cffu1 = cffuFactory.completedFuture("21");
Cffu<Integer> cffu2 = cffuFactory.completedFuture(42);
Cffu<Tuple2<String, Integer>> allTuple = cffuFactory.allTupleFastFailOf(cffu1, cffu2);
System.out.println(allTuple.get());
//////////////////////////////////////////////////
// or CompletableFutureUtils.allTupleFastFailOf / allTupleOf
//////////////////////////////////////////////////
CompletableFuture<String> cf1 = CompletableFuture.completedFuture("21");
CompletableFuture<Integer> cf2 = CompletableFuture.completedFuture(42);
CompletableFuture<Tuple2<String, Integer>> allTuple2 = allTupleFastFailOf(cf1, cf2);
System.out.println(allTuple2.get());
}
}
# 完整可运行的Demo代码参见
AllTupleOfDemo.java
。
CompletableFuture
执行执行(即CompletableFuture
的*Async
方法),使用的缺省线程池是ForkJoinPool.commonPool()
。CPU
个线程,合适执行CPU
密集的任务;对于业务逻辑,往往有很多等待操作(如网络IO
、阻塞等待),并不是CPU
密集的。ForkJoinPool.commonPool()
是很危险的❗结果就是,
CompletableFuture
的*Async
方法时,几乎每次都要反复传入指定的业务线程池;这让CompletableFuture
的使用很繁琐易错 🤯RPC
回调),不合适或方便为业务提供线程池;这时使用Cffu
封装携带的线程池既方便又合理安全示例代码如下:
public class NoDefaultExecutorSettingForCompletableFuture {
public static final Executor myBizExecutor = Executors.newCachedThreadPool();
public static void main(String[] args) {
CompletableFuture<Void> cf1 = CompletableFuture.runAsync(
() -> System.out.println("doing a long time work!"),
myBizExecutor);
CompletableFuture<Void> cf2 = CompletableFuture
.supplyAsync(
() -> {
System.out.println("doing another long time work!");
return 42;
},
myBizExecutor)
.thenAcceptAsync(
i -> System.out.println("doing third long time work!"),
myBizExecutor);
CompletableFuture.allOf(cf1, cf2).join();
}
}
# 完整可运行的Demo代码参见
NoDefaultExecutorSettingForCompletableFuture.java
。
Cffu
支持设置缺省的业务线程池,规避上面的繁琐与危险。示例代码如下:
public class DefaultExecutorSettingForCffu {
public static final Executor myBizExecutor = Executors.newCachedThreadPool();
public static final CffuFactory cffuFactory = CffuFactory.builder(myBizExecutor).build();
public static void main(String[] args) {
Cffu<Void> cf1 = cffuFactory.runAsync(() -> System.out.println("doing a long time work!"));
Cffu<Void> cf2 = cffuFactory.supplyAsync(() -> {
System.out.println("doing another long time work!");
return 42;
}).thenAcceptAsync(i -> System.out.println("doing third long time work!"));
cffuFactory.allOf(cf1, cf2).join();
}
}
# 完整可运行的Demo代码参见
DefaultExecutorSettingForCffu.java
。
AllFastFail
/AnySuccess
/AllSuccess
/MostSuccess
)CompletableFuture
的allOf
方法会等待所有输入CF
运行完成;即使有CF
失败了也要等待后续CF
都运行完成,再返回一个失败的CF
。
CF
失败了,则快速失败不再做于事无补的等待cffu
提供了相应的allResultsFastFailOf
等方法allOf
/allResultsFastFailOf
两者都是,只有当所有的输入CF
都成功时,才返回成功结果CompletableFuture
的anyOf
方法返回首个完成的CF
(不会等待后续没有完成的CF
,赛马模式);即使首个完成的CF
是失败的,也会返回这个失败的CF
结果。
CF
结果,而不是首个完成但失败的CF
cffu
提供了相应的anySuccessOf
等方法anySuccessOf
只有当所有的输入CF
都失败时,才返回失败结果CF
中成功的结果,对于失败的CF
返回指定的缺省值
cffu
提供了相应的allSuccessOf
等方法CF
中成功的结果,对于失败或还没有运行完成的CF
返回指定的缺省值
CF
,结果会写到分布式缓存中避免重复计算,下次就有了cffu
提供了相应的mostSuccessResultsOf
等方法📔 关于多个
CF
的并发执行策略,可以看看JavaScript
规范Promise Concurrency
;在JavaScript
中,Promise
即对应CompletableFuture
。
JavaScript Promise
提供了4个并发执行方法:
Promise.all()
:等待所有Promise
运行成功,只要有一个失败就立即返回失败(对应cffu
的allResultsFastFailOf
方法)Promise.allSettled()
:等待所有Promise
运行完成,不管成功失败(对应cffu
的allResultsOf
方法)Promise.any()
:赛马模式,立即返回首个成功的Promise
(对应cffu
的anySuccessOf
方法)Promise.race()
:赛马模式,立即返回首个完成的Promise
(对应cffu
的anyOf
方法)PS:
JavaScript Promise
的方法命名真考究~ 👍
cffu
新加2个方法后,对齐了JavaScript Promise
规范的并发方法~ 👏
示例代码如下:
public class ConcurrencyStrategyDemo {
public static final Executor myBizExecutor = Executors.newCachedThreadPool();
public static final CffuFactory cffuFactory = CffuFactory.builder(myBizExecutor).build();
public static void main(String[] args) throws Exception {
////////////////////////////////////////////////////////////////////////
// CffuFactory#allResultsFastFailOf
// CffuFactory#anySuccessOf
// CffuFactory#mostSuccessResultsOf
////////////////////////////////////////////////////////////////////////
final Cffu<Integer> successAfterLongTime = cffuFactory.supplyAsync(() -> {
sleep(3000); // sleep LONG time
return 42;
});
final Cffu<Integer> failed = cffuFactory.failedFuture(new RuntimeException("Bang!"));
Cffu<List<Integer>> fastFailed = cffuFactory.allResultsFastFailOf(successAfterLongTime, failed);
// fast failed without waiting successAfterLongTime
System.out.println(fastFailed.exceptionNow());
Cffu<Integer> anySuccess = cffuFactory.anySuccessOf(successAfterLongTime, failed);
System.out.println(anySuccess.get());
Cffu<List<Integer>> mostSuccess = cffuFactory.mostSuccessResultsOf(
0, 100, TimeUnit.MILLISECONDS, successAfterLongTime, failed);
System.out.println(mostSuccess.get());
////////////////////////////////////////////////////////////////////////
// or CompletableFutureUtils#allResultsFastFailOf
// CompletableFutureUtils#anySuccessOf
// CompletableFutureUtils#mostSuccessResultsOf
////////////////////////////////////////////////////////////////////////
final CompletableFuture<Integer> successAfterLongTimeCf = CompletableFuture.supplyAsync(() -> {
sleep(3000); // sleep LONG time
return 42;
});
final CompletableFuture<Integer> failedCf = failedFuture(new RuntimeException("Bang!"));
CompletableFuture<List<Integer>> fastFailed2 = allResultsFastFailOf(successAfterLongTimeCf, failedCf);
// fast failed without waiting successAfterLongTime
System.out.println(exceptionNow(fastFailed2));
CompletableFuture<Integer> anySuccess2 = anySuccessOf(successAfterLongTimeCf, failedCf);
System.out.println(anySuccess2.get());
CompletableFuture<List<Integer>> mostSuccess2 = mostSuccessResultsOf(
0, 100, TimeUnit.MILLISECONDS, successAfterLongTime, failed);
System.out.println(mostSuccess2.get());
}
}
# 完整可运行的Demo代码参见
ConcurrencyStrategyDemo.java
。
join
的方法cf.join()
会「不超时永远等待」,在业务中很危险❗️当意外出现长时间等待时,会导致:
join(timeout, unit)
方法即支持超时的join
的方法;就像cf.get(timeout, unit)
之于 cf.get()
。
这个新方法使用简单类似,不附代码示例。
Backport
支持Java 8
Java 9+
高版本的所有CF
新功能在Java 8
低版本直接可用。
其中重要的Backport功能有:
orTimeout
/completeOnTimeout
方法delayedExecutor
方法failedFuture
/ completedStage
/ failedStage
completeAsync
/ exceptionallyAsync
/ exceptionallyCompose
/ copy
这些backport
的方法是CompletableFuture
的已有功能,不附代码示例。
anyOf
方法CompletableFuture.anyOf
方法返回类型是Object
,丢失具体类型,不类型安全,使用时需要转型也不方便。
cffu
提供的anySuccessOf
/anyOf
方法,返回具体类型T
,而不是返回Object
。
这个方法使用简单类似,不附代码示例。
allof/anyOf
方法CompletableFuture#allof/anyOf
方法输入参数类型是CompletableFuture
,而输入更宽泛的CompletionStage
类型;对于CompletionStage
类型的输入,则需要调用CompletionStage#toCompletableFuture
方法做转换。
cffu
提供的allof
/anyOf
方法输入更宽泛的CompletionStage
参数类型,使用更方便。
方法使用简单类似,不附代码示例。
可以参见:
API
文档
cffu
:Cffu.java
、CffuFactory.java
CompletableFuture utils
:CompletableFutureUtils.java
、ListenableFutureUtils.java
Kotlin extensions
:CffuExtensions.kt
、CompletableFutureExtensions.kt
、ListenableFutureExtensions.kt
CompletableFuture
类迁移到Cffu
类为了使用cffu
增强功能,可以迁移已有直接使用CompletableFuture
类的代码到Cffu
类。包含2步修改:
CompletableFuture
类改成Cffu
类CompletableFuture
静态方法调用的地方,类名CompletableFuture
改成cffuFactory
实例之所以可以这样迁移,是因为:
CompletableFuture
类的所有实例方法都在Cffu
类,且有相同的方法签名与功能CompletableFuture
类的所有静态方法都在CffuFactory
类,且有相同的方法签名与功能Java API
文档: https://foldright.io/api-docs/cffu/Kotlin API
文档: https://foldright.io/api-docs/cffu-kotlin/代码示例:
可以在 central.sonatype.com 查看最新版本与可用版本列表。
cffu
库(包含Java CompletableFuture
的增强CompletableFutureUtils
):
Maven
projects:<dependency>
<groupId>io.foldright</groupId>
<artifactId>cffu</artifactId>
<version>1.0.0-Alpha22</version>
</dependency>
Gradle
projects:// Gradle Kotlin DSL
implementation("io.foldright:cffu:1.0.0-Alpha22")
// Gradle Groovy DSL
implementation 'io.foldright:cffu:1.0.0-Alpha22'
cffu Kotlin
支持库:
Maven
projects:<dependency>
<groupId>io.foldright</groupId>
<artifactId>cffu-kotlin</artifactId>
<version>1.0.0-Alpha22</version>
</dependency>
Gradle
projects:// Gradle Kotlin DSL
implementation("io.foldright:cffu-kotlin:1.0.0-Alpha22")
// Gradle Groovy DSL
implementation 'io.foldright:cffu-kotlin:1.0.0-Alpha22'
cffu bom
:
Maven
projects:<dependency>
<groupId>io.foldright</groupId>
<artifactId>cffu-bom</artifactId>
<version>1.0.0-Alpha22</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Gradle
projects:// Gradle Kotlin DSL
implementation(platform("io.foldright:cffu-bom:1.0.0-Alpha22"))
// Gradle Groovy DSL
implementation platform('io.foldright:cffu-bom:1.0.0-Alpha22')
📌 TransmittableThreadLocal(TTL)
的cffu executor wrapper SPI
实现:
Maven
projects:<dependency>
<groupId>io.foldright</groupId>
<artifactId>cffu-ttl-executor-wrapper</artifactId>
<version>1.0.0-Alpha22</version>
<scope>runtime</scope>
</dependency>
Gradle
projects:// Gradle Kotlin DSL
runtimeOnly("io.foldright:cffu-ttl-executor-wrapper:1.0.0-Alpha22")
// Gradle Groovy DSL
runtimeOnly 'io.foldright:cffu-ttl-executor-wrapper:1.0.0-Alpha22'
CompletableFuture
Guide
CompletableFuture
的使用方式CompletableFuture
cffu
是 CompletableFuture-Fu
的缩写;读作C Fu
,谐音Shifu/师傅
。
嗯嗯,想到了《功夫熊猫》里可爱的小浣熊师傅吧~ 🦝