funnycoding / blog

A Github Issue Blog
22 stars 0 forks source link

《Java并发编程实战》6.任务执行(下) #32

Open funnycoding opened 4 years ago

funnycoding commented 4 years ago

6.3 找出可利用的并行性

Executor 框架使确定执行策略更加容易,但如果要使用 Executor,必须将任务表述为一个 Runnable。在大多数服务器应用程序中都存在一个「明显的任务边界」:单个用户请求。

但有时候,任务边界并非是显而易见的,例如在很多「桌面应用程序中」。即使是 「服务器应用程序」,在单个客户请求中扔可能存在可发掘的「并行性」,例如 「数据库服务器」。

> 程序清单 6-9 错误的 `Timer` 行为: ```java // Timer 因抛出异常错误结束的情景 public class OutOfTime { public static void main(String[] args) throws InterruptedException { System.out.println("开始执行"); Timer timer = new Timer(); // 执行到这里时 Timer因为抛出了 未预料的异常,之后的代码也无法继续进行了 timer.schedule(new ThrowTask(),1); SECONDS.sleep(1); timer.schedule(new ThrowTask(),1); SECONDS.sleep(5); } static class ThrowTask extends TimerTask { @Override public void run() { throw new RuntimeException(); } } } ``` 在这一节中将开发一些「不同版本」 的组件,并且每个版本都实现了「不同程度」的并发性。 该示例组件实现浏览器程序中的 页面渲染(Page-Rendering)功能,它的作用是将 HTML 页面绘制到图像缓存中。为了简便,假设 HTML 页面只包含「标签文本」,以及预定义大小的「图片」和 URL。 #### 6.3.1 示例:串行的页面渲染器 最简单的方法就是对 `HTML` 文档进行「串行处理」。当遇到文本标签时,将其绘制到「图像缓存」中。 当遇到图像引用时,先通过网络获取它,然后再将其绘制到图像缓存中。 优点是这种方法非常简单:程序只需要将输入中的每个元素处理一次(甚至不需要缓存文档),**但这种方法会让用户等待很长的时间**,因为获取图片可能需要的时间很久,他们必须一直等待,直到显示所有文本。 另一种「串行」执行的方法更好一些,它先绘制文本元素,同时为图像预留出矩形的占位空间,在处理完了第一遍文本后,程序再开始下载图像,并将他们绘制到相应的占位空间中。 在 **程序清单 6-10** 的 `SingleThreadRenderer`中给出了上述这种方法的实现。 图像下载过程的大部分时间都是在 「等待 I/O」 操作执行完成。 在这期间 CPU 几乎没有任何工作。 因此,这种串行执行方法没有充分地利用 CPU,使得用户看到最终页面之前要等待过长的时间。 通过将问题「分解」为多个独立的任务「并发执行」,能够获得更高的 「CPU 利用率」和「响应灵敏度」。 > 程序清单 6-10 串行地渲染页面元素: ```java // 串行渲染页面元素 public abstract class SingleThreadRenderer { void renderPage(CharSequence source) { // 先渲染文字 renderText(source); // 定义页面图像引用 List imageData = new ArrayList<>(); // 通过source分析出其包含的图像信息 并将其添加到之前定义的 imageData 中 for (ImageInfo imageInfo : scanForImageInfo(source)) { imageData.add(imageInfo.downloadImage()); } // 渲染页面图片 for (ImageData data : imageData) { renderImage(data); } } // 代表页面中图像元素的数据类 interface ImageData { } // 承载图像信息的接口,其实现了下载图像的方法 interface ImageInfo { ImageData downloadImage(); } abstract List scanForImageInfo(CharSequence source); abstract void renderImage(ImageData data); abstract void renderText(CharSequence source); } ``` #### 6.3.2 携带结果的任务 Callable 与Future `Executor` 框架使用 `Runnable` 作为其「基本的任务表示形式」。 `Runnable` 是一种有很大局限性的抽象,虽然 `run` 能写入到日志文件或者将结果放入某个 「共享的数据结构」,但它**不能** 「返回一个值」 或 「抛出一个受检查的异常」。 许多任务实际上都是存在「延迟计算」这种情况的,例如「执行数据库查询」,「从网络上获取资源」,或者计算某个复杂的功能。 对于上述类型的任务 `Callable`是一种更好的抽象:它认为主入口点 `call` 方法将返回一个值,并可能「抛出一个异常」。(要使用 `Callable` 来表示一个无返回值的任务,可使用 `Callable`)。 ```java // java.util.concurrent/Callable.java @FunctionalInterface public interface Callable { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; } ``` 在 `Executor` 中包含了一些「辅助方法」能将其他类型的任务「封装」为一个`Callable`,例如 `Runnable` 和 `java.security.PrivilegedAction`。 `Runnable` 和 `Callable` 描述的都是「抽象的计算任务」。 这些任务通常是「有范围」的 —— 都有一个明确的起始点,并且最终会结束。 `Executor` 执行的任务有 4 个生命周期阶段:**「创建、提交、开始、完成」**。 由于有些任务可能执行时间很长,因此通常希望能够取消这些任务。 在 `Executor` 框架中,已提交但尚未开始的任务可以取消,但是对于那些**「已经开始」**的任务,只有当它们能**「响应」**中断操作时,才能取消。取消一个已经「完成」的任务不会有任何影响(第7章将进一步介绍取消操作)。 `Future` 表示一个任务的生命周期,并提供了相应的方法来判断是否已经完成或取消,以及「获取任务的结果」和「取消任务」等。 在 **程序清单 6-11** 中给出了 `Callable` 和 `Future`。 `Future` 规范中包含的**「隐含意义」** 是:任务的生命周期「只能前进」,不能后退,就像 `ExecutorService` 的生命周期一样,当某个任务完成后,它就只能永远停留在「完成」状态上。 `get`方法的行为「取决于任务的状态」(尚未开始、正在运行、已完成)。 如果任务已经完成,那么 `get` 会立即返回或者抛出一个 `Exception` ,如果任务没有完成,那么 `get` 将阻塞并直到任务完成。如果任务抛出了异常,那么 `get` 将该异常封装为 `ExecutionException`并重新抛出。 如果任务被取消,那么 `get` 将抛出 `CancellationException`。如果 `get` 抛出了 `ExecutionException`,那么可以通过 `getCause` 来获得被封装的「初始异常」。 ![](https://xuyanxin-blog-bucket.oss-cn-beijing.aliyuncs.com/blog/20200414110111.png) 【看了一下源码,更加实际的体会上述可能抛出异常的多种情况】 > **程序清单 6-11** `Callable` 与 `Future` 接口 ```java @FunctionalInterface public interface Callable { V call() throws Exception; } public interface Future { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; } ``` 可以通过许多方法创建一个 `Future` 来 「描述」 任务。 `ExecutorService` 中的所有 `submit` 方法都将返回一个 `Future`,从而将一个 `Runnable` 或 `Callable` 提交给 `Executor`,并得到一个 `Future` 用来获得任务的执行结果或者取消任务。 还可以显式地位某个指定的 `Runnable` 或 `Callable` 实例化一个 `FutureTask`。(由于 `FutureTask` 实现了`Runnable` ,因此可以将它提交给 `Executor` 来执行,或者直接调用它的 `run` 方法) 从 `Java6` 开始,`ExecutorService` 的实现可以改写为 `AbstractExecutorService` 中的 `newTaskFor`方法,从而根据已提交的 `Runnable` 或 `Callable` 来控制 `Future`的「实例化过程」。 在默认实现中仅创建了一个新的 `FutureTask` 如下所示: > 程序清单 6-12 `ThreadPoolExecutor` 中的 `newTaskFor` **的默认实现**: ```java protected RunnableFuture newTaskFor(Callable callable) { return new FutureTask(callable); } ``` 实际上 `newTaskFor` 这个方法不再 `ThreadPoolExecutor.java` 中,而是在 ``AbstractExecutorService.java` 中,但是 `ThreadPoolExecutor` 继承 该类,说是默认实现也没问题。】 将 `Runnable` 或 `Callbale` 提交到 `Executor` 的过程中,包含了一个 「安全发布」的过程(参见3.5) —— 将 `Runnable` 或 `Callable` 从「提交线程」 发布到最终 「执行任务的线程」。 类似地,在设置 `Future` 结果的过程中也包含了一个安全发布,将这个结果从计算它的线程发布到任何通过 `get` 获得它的线程。 #### 6.3.3 示例: 使用 Future 实现页面的渲染器 为了使页面渲染器实现更高的「并发性」,我们将渲染过程分解为「两个任务」: - 渲染所有文本 (CPU密集型) - 下载所有的图像 (I/O 密集型) 这样将 CPU密集型 和 I/O 密集型的任务分解开,即使在单 CPU 系统上也能提升性能。 `Callable` 和 `Future` 有助于表示这些协同任务之间的交互,在 **程序清单 6-13** 的 `Future Renderer` 中创建了一个 `Callable` 来下载所有的图像,并将其提交到一个 `ExecutorService`。 这将返回一个 「描述任务执行情况」的 `Future`。 当主任务需要图像时,它会等待 `Future.get` 的调用结果。 如果幸运的话,当开始请求时所有图像就已经下载完成了,即使没有,下载图像的任务也已经提前开始了。 > 程序清单 6-13 使用 `Future` 等待图像下载: ```java // 使用 Future 等待下载图像任务的完成 public abstract class FutureRenderer { private final ExecutorService executor = Executors.newCachedThreadPool(); void renderPage(CharSequence source) { final List imageInfos = scanForImageInfo(source); // 创建一个 Callable 开启一个线程专门下载图片 Callable> task = () -> { final List result = new ArrayList<>(); for (ImageInfo imageInfo : imageInfos) { result.add(imageInfo.downloadImage()); } return result; }; // 提交这个 Callable 到 Executor 中,获得一个返回的 Future final Future> future = executor.submit(task); // 渲染文字 renderText(source); try { // 开始获取下载图片的结果 final List imageData = future.get(); // 渲染图片 imageData.forEach(this::renderImage); } catch (InterruptedException e) { // 重新声明线程的中断状态 Thread.currentThread().interrupt(); // 此时这个 Future 的结果已经不需要了,所以关闭这个任务 future.cancel(true); } catch (ExecutionException e) { // 抛出异常 throw LaunderThrowable.launderThrowable(e); } } interface ImageData { } interface ImageInfo { ImageData downloadImage(); } abstract void renderText(CharSequence source); abstract List scanForImageInfo(CharSequence s); abstract void renderImage(ImageData imageData); } ``` `get` 方法拥有 「状态依赖」 的内在特性,因而调用者不需要知道任务的状态,此外在任务 「提交」 和 「获得结果」 中包含的安全发布属性也确保了这个方法是 「线程安全」的。 `Future.get` 的异常处理代码将处理两个可能的问题: - 任务遇到了一个 Exception - 或者调用 `get` 的线程在获得结果之前被中断(参见 5.5.2 和 5.4 节) `FutureRenderer` 使得渲染文本任务与下载图像数据的任务「并发」 地执行。 当所有图像下载完成后,会显示到页面上。 这将提升「用户体验」,不仅使用户更快地看到结果,还有效地利用了「并行性」,但我们还可以做的更好:**「用户不必等待所有图像都下载完成,而是希望每下载完成一个就显示出一个」**。 #### 6.3.4 在异构任务并行化中存在的具现 上个例子中,我们尝试并行地执行两个**「不同类型」**的任务 —— 「下载图像」与 「渲染页面」。然而,通过对 **「异构任务」** 进行并行化来获得重大的性能提升是很困难的。 如果工作类型相同,比如都是洗完,那么两个人可以很好地分摊工作,一个负责清洗,一个负责烘干,增加人手可以直接提升工作效率。然而,如果将「不同类型的任务」 平均分配给每个工人却并不容易。 当人数增加时,如果没有在「相似」 的任务之间找出细粒度的「并行性」,那么这种方法带来的好处将减少。 当在多个工人之间分配「异构」 任务时,还有一个问题就是各个任务的「大小」可能完全不同。 如果将两个任务 A 和 B 分配给两个工人,但是「 A 的执行时间是 B 的10倍。」,那么整个过程也只能加速 **9%** <---【整个任务的耗时时间取决于耗时最长的任务】 当在多个工人之间分解任务时,还需要一定的任务协调开销:「为了使任务分解能提高性能,这种开销不能高于并行性实现的提升。」 <---**【也就是协调任务的开销如果比并行执行任务的开销还大,那么并行执行任务也就没有意义了】** `FutureRenderer` 使用了两个任务,其中一个负责**「渲染文本」**,另一个负责**「下载图像」**。 如果渲染文本的速度远高于下载图像的速度(这个可能性很大),那么程序的最终性能与串行执行时的性能差别不大(因为需要等待 下载图像,而渲染文本的耗时可以忽略不计,所以和串行执行几乎差不多),而并行执行任务的代码却更复杂了。**<---【增加了复杂度,却没有提升多少性能】** 因此,虽然做了许多工作来并发执行「异构任务」 以提高并发度,但从中获得的 「并发性」 却十分有限。 (在 11.4.2 节 和 11.4.3节) 中的示例说明了同一个问题。 只有当大量「相互独立」 且 「同构」(相同类型工作)的任务可以进行并发处理时,才能体现出将程序的工作负载分配到「多个任务」 中带来的真正性能提升。 #### 6.3.5 CompletionService:Executor 与 BlockingQueue 如果向 `Executor` 提交了一组计算任务,并且希望在计算完成后获得结果,那么可以保留与任务关联的 `Future`,然后反复使用 `get`方法,同时将参数 `timeout` 指定为 0,从而通过轮询来判断任务是否完成。 这种方法虽然可行,却有些「繁琐」。还有一种更好的方法:**「完成服务」**(`CompletionService`) `CompletionService` 将 `Executor` 和 `BlockingQueue` 的功能融合在一起。 你可以将 `Callable` 任务提交给它来执行,然后使用类似于队列操作的 `take` 和 `poll` 等方法来获得已完成的结果,而这些结果会在完成时被封装为 `Future`。 `ExecutorCompletionService` 实现了 `CompletionService`,并将**「计算部分」**委托给了一个 `Executor`。 `ExecutorCompletionService` 的实现非常简单。 在构造函数中创建一个 `BlockingQueue` 来保存计算完成的结果。 当计算完成时,调用 `FutureTask` 中的 `done` 方法。 当提交某个任务时,该任务将首先包装为一个 `QueueingFuture` ,这是 `FutureTask` 的一个子类,然后再改写子类的 `done` 方法,并将结果放入 `BlockingQueue` 中,如 **程序清单 6-14** 所示 —— `take` 和 `pool` 方法委托于 `BlockingQueue` ,这些方法在得到结果之前将被「阻塞」。 > 程序清单 6-14 由 `ExecutorCompletionService` 使用的 `QueueingFuture` 类: ```java // JDK 8 的 QueueingFuture private class QueueingFuture extends FutureTask { QueueingFuture(RunnableFuture task) { super(task, null); this.task = task; } protected void done() { completionQueue.add(task); } private final Future task; } // JDK 6的 QueueingFuture ,书中给的例子 private class QueueingFuture extends FutureTask { QueueingFuture(Callable c) {super(c);} QueueingFuture(RunnableFuture t,V r) { super(t r); } protected void done() { completionQueue.add(this); } } ``` 可以看到随着 `JDK` 的演化,底层的实现还是有些许不同的地方的。 #### 6.3.6 示例:使用 CompletionService 实现页面渲染器 可以通过 `CompletionService` 从两个方面来提高页面渲染器的性能: - 缩短总运行时间 - 提高响应性 为每一个图像的下载都创建一个「独立任务」,并在线程中执行它们,从而将「串行」的下载过程转变为「并行」过程 ——> 这将减少下载所有图像的总时间。 此外,通过从 `CompletionService` 中获取结果以及使每张图片在下载完成后「立刻」 显示出来,能使用户获得一个更加「动态」和「更高响应性」 的用户界面,如下面的代码所示: > **程序清单 6-15** 使用 `CompletionService` 使页面元素在下载完成后立即显示出来: ```java // 为每个图片分配一个线程进行下载,并且当其下载完成后立即进行渲染 public abstract class Renderer { private final ExecutorService executor; // 通过传入 ExecutorService 获得不同的特性 public Renderer(ExecutorService executor) { this.executor = executor; } void renderPage(CharSequence source) { final List info = scanForImageInfo(source); // 初始化 ExecutorCompletionService final ExecutorCompletionService completionService = new ExecutorCompletionService<>(executor); // 为每个图片分配一个线程进行下载 for (final ImageInfo imageInfo : info) { completionService.submit(imageInfo::downloadIamge); } // 渲染页面文字 renderText(source); try { for (int t = 0; t < info.size(); t++) { // 获取下载任务关联的 Future final Future f = completionService.take(); // 获取下载任务的结果 ——> ImageData final ImageData imageData = f.get(); // 渲染页面图片 renderImage(imageData); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (ExecutionException e) { throw LaunderThrowable.launderThrowable(e); } } interface ImageData { } interface ImageInfo { ImageData downloadIamge(); } abstract void renderText(CharSequence s); abstract List scanForImageInfo(CharSequence s); abstract void renderImage(ImageData i); } ``` 多个 `ExecutorCompletionService` 可以 「共享」 一个 `Executor`。 因此可以创建一个对于「特定计算」私有,又能「共享」一个公共 `Executor` 的 `ExecutorCompletionService` 。因此,`CompletionService` 的作用就相当于一组**「计算句柄」**,这与 `Future` 作为单个计算句柄是非常类似的。 通过记录提交给 `CompletionService` 的任务数量,并计算出已经获得的已完成结果的数量,即使使用一个 「共享的」 `Executor`,也能知道已经获得了所有任务结果的「时间」。、 #### 6.3.7 为任务设置时限 有时候,如果某个任务无法在指定时间内完成,那么将不再需要它结果,此时可以放弃这个任务。【具有强时效性的任务】 例如:某个 Web 应用程序从外部的广告服务器上获取广告信息,但如果该应用程序在两秒内得不到响应,那么将显示一个默认的广告,这样即使不能获得广告信息,也不会「降低」 网站的响应性能。 类似地,一个门户网站可以从多个数据源「并行地」获取数据,但可能值会在「指定的时间」内等待数据,一旦超出了等待时间,那么将只显示已经获得的数据。 在有限的时间内执行任务的主要困难在于:**「确保得到答案的时间不会超过限定的时间」**,或者在限定的时间内无法获得答案时做出对应的处理。 在支持「时间限制」的 `Future.get` 中支持这种需求: 当结果可用时,它将立即返回,如果在指定时限内没有计算出结果,将抛出 `TimeoutException`。 在使用时限任务时需要注意,当这些任务超时后应该 「立即停止」,从而避免为无效的任务浪费计算资源。 要实现这个功能,可以由 「任务本身」 来管理它自己的限定时间,并且在超时后 「中止」 或 「取消」 任务。 此时可再次使用 `Future` ,如果一个限时的 `get` 方法抛出了 `TimeoutException` ,那么可以通过 `Future` 来取消任务。 如果编写的任务是「可取消」的(参见第7章),那么可以提前中止它,以免消耗过多的资源。 在程序清单 6-13 和 6-16 的代码中使用了这项技术。【提前中止】 程序清单 6-16 给出了限时 `Future.get` 的一种 「典型应用」 —— 在生成的页面中包括「响应用户请求的内容」以及从广告服务器上获得的「广告」。 它将获取广告的「任务」 提交给一个 Executor,然后计算剩余的文本页面内容,最后等待广告信息,直到超出指定的时间。(传递给 `get` 的 `timeout` 参数的计算方法是: 将指定时间 - 当前时间。 这样可能得到 「负数」,但 `java.util.concurrent` 中所有与「时限」 相关的方法都将负数视为零,因此不需要额外的代码来处理这种情况)。 如果 `get` 超时,那么将取消获取广告的任务,并转而使用默认的广告信息。(`Future.cancel` 的参数为 `true` 表示任务线程可以在运行中中断,详见 第7章) > **程序清单 6-16** 在指定时间内获取广告信息: ```java // 使用有时限的任务来放弃超时的失效 Task public class RederWithTimeBudget { // 广告信息,初始化时使用默认广告信息 private static final Ad DEFAULT_AD = new Ad(); // 超时时间 private static final long TIME_BUDGET = 1000; // 初始化任务执行框架 private static final ExecutorService exec = Executors.newCachedThreadPool(); // 这里没有处理被中断的异常,而是将其抛给了调用者进行处理 Page renderPageWithAd() throws InterruptedException { // 结束时间 final long endNanos = System.nanoTime() + TIME_BUDGET; // 提交一个获取广告的任务到任务执行框架中 final Future f = exec.submit(new FetchAdTask()); // Render the page while waiting for the ad 在等待获取广告的同事,渲染这个页面 final Page page = renderPageBody(); Ad ad; try { // Only wait for the remaining time budget // 获取任务执行时间,如果超时则直接抛弃任务 ,如果获取成功则将其赋值给之前定义的广告引用 final long timeLeft = endNanos - System.nanoTime(); ad = f.get(timeLeft, TimeUnit.NANOSECONDS); } catch (ExecutionException e) { // 发生异常时,将广告信息设置为默认信息 ad = DEFAULT_AD; } catch (TimeoutException e) { // 如果获取广告的任务超时,不仅将广告设置为默认信息,同时关闭这个获取广告的任务 ad = DEFAULT_AD; f.cancel(true); } page.setAd(ad); return page; } // 渲染页面的方法 Page renderPageBody() { return new Page(); } // 页面信息类 static class Page { // 设置页面内的广告内容 public void setAd(Ad ad) { } } // 获取广告的行为 static class FetchAdTask implements Callable { @Override public Ad call() throws Exception { return new Ad(); } } // 默认广告信息 static class Ad { } } ``` #### 6.3.8 示例:旅行预订门户网站 「预定时间」 方法可以很容易地 「扩展」 到任意数量的任务上。 例如这样一个旅行预定门户网站:用户输入旅行的「日期」 和其他要求,门户网站获取并显示来自多条航线,旅店或汽车租赁公司的报价。 在获取不同公司报价的过程中,可能会调用「Web服务」,「访问」 数据库,执行一个 EDI 事务或其他机制。在这种情况下,不应该让页面的响应时间 受限于 「最慢服务」的响应时间,而应该只显示在 「指定时间」内接收到的信息。 对于没有及时响应的服务提供者,页面可以忽略它们,或者显示一个提示信息,例如"未在指定时间内获取到 xxx 信息"。 从一个公司获得报价的过程 与 从其他公司获得报价的过程无关。【也就是这些获得报价的任务是独立的】,因此可以将获取报价的过程当成「一个任务」,从而使获得报价的过程能「并发执行」。 创建 n 个任务,将其提交到一个线程池,保留 n 个 `Future`,并使用限时的 `get` 方法通过 `Future` 串行地获取每一个结果 ,这一切都很简单,但还有个更简单的方法 ——> `invokeAll`。 下面的示例代码中使用了支持限时的 `invokeAll` ,将多个任务提交到 「一个」 `ExecutorService` 并获得结果。 `InvokeAll` 方法的参数为 「一组任务」,并返回一组 `Future`。 这两个集合有着相同的结构,`invokeAll` 按照任务集合中迭代器的顺序将所有的 `Future` 添加到返回的集合中,从而使调用者能降各个 `Future` 与其表示的 `Callable` 关联起来。 当所有任务都执行完毕时,或者调用线程被中断时,又或者超过指定时限时, `invokeAll` 将返回。 当超过 「指定时限」后,任务还未完成的任务都会「取消」。 当 `invokeAll` 返回后,每个任务要么正常地完成,要么被取消。 而客户端代码可以调用 `get` 或 `isCancelled` 来判断具体是什么情况。 > 程序清单 6-17 在预定时间内请求旅游报价: ```java // Requesting travel quotes under a time budget // 使用 invokeAll 来获取一组报价,这个类的设计非常严谨 public class TimeBudget { private static ExecutorService exec = Executors.newCachedThreadPool(); // 获取报价的方法 在这里调用 QuoteTask 中的方法 public List getRankedTravelQuotes(TravelInfo travelInfo, Set companies, Comparator ranking, long time, TimeUnit unit) throws InterruptedException { final List tasks = new ArrayList<>(); // 轮询调用每个旅行社指定 TravelInfo 的报价 for (TravelCompany company : companies) { tasks.add(new QuoteTask(company, travelInfo)); } // 通知任务执行框架开始这一组任务,并获取其 Future final List> futures = exec.invokeAll(tasks, time, unit); // 用来保存真正获取到的报价信息 其数量与获取报价任务的数量相等 final List quotes = new ArrayList<>(tasks.size()); // 获取任务的迭代器 final Iterator taskIter = tasks.iterator(); // 遍历 Future 获取其任务执行完成的信息 for (Future f : futures) { final QuoteTask task = taskIter.next(); try { quotes.add(f.get()); } catch (ExecutionException e) { // 发生异常时 ,在 task列表中 增加一个 获取失败的报价类 quotes.add(task.getFailureQuote(e.getCause())); } catch (CancellationException e) { // 收集因任务关闭导致获取报价失败的类 quotes.add(task.getTimeoutQuote(e)); } } // 排序 Collections.sort(quotes, ranking); return quotes; } } // 获取报价类的具体实现 class QuoteTask implements Callable { // 旅行社 private final TravelCompany company; // 不同航线 private final TravelInfo info; public QuoteTask(TravelCompany company, TravelInfo info) { this.company = company; this.info = info; } // 获取失败的报价信息 TravelQuote getFailureQuote(Throwable t) { return null; } // 获取超时的报价信息 TravelQuote getTimeoutQuote(CancellationException e) { return null; } @Override public TravelQuote call() throws Exception { // 调用旅行社的获取具体航线信息报价的方法 return company.solicitQuote(info); } } // 代表不同旅行社的类 interface TravelCompany { // 返回具体报价信息 TravelQuote solicitQuote(TravelInfo travelInfo) throws Exception; } // 报价 interface TravelQuote { } // 不同航线的信息 interface TravelInfo { } ``` **【这个类的设计真的太严谨了,非常具有参考价值】** ### 小结: 通过围绕「任务执行」 来设计应用程序,可以简化开发过程,并有助于实现并发。 `Executor` 框架将任务提交于执行策略「解耦」,同时支持多种不同类型的「执行策略」。 当需要创建线程来执行任务时,可以考虑 「`Executro`」。 要想在将应用程序分解为不同任务时获得「最大的好处」:必须定义清晰的「任务边界」。 某些应用程序中存在着比较明显的任务边界,而在其他一些程序中则需要进一步分析才能揭示出粒度更细的并行性。 ### 个人总结: 首先,引入了 「**任务执行**」 这个概念,任务在 Java 中的具体载体就是 `Runnable` 和 `Callable`。 第一个改造 —— Web服务器中,串行化接受用户请求的故武器改造为每个请求都创建一个线程,让这个线程去处理用户的请求。 #### 1、从单线程串行执行任务到使用多个线程并发执行任务: 从「串行化」到「多线程」处理 Web 服务器的请求: ```java /** * 一个单线程串行执行的 Web Server */ public class SingleThreadWebServer { public static void main(String[] args) throws IOException { ServerSocket socket = new ServerSocket(80); while (true) { // 接收客户端的请求 Socket connection = socket.accept(); // 以串行的形式处理请求 handleRequest(connection); } } // 具体对请求做处理的逻辑,在这里我们不需要关心 private static void handleRequest(Socket connection) { } } // 改造后 // 为每个请求都创建一个线程来对其进行处理 public class ThreadPerTaskWebServer { public static void main(String[] args) throws IOException { ServerSocket serverSocket = new ServerSocket(80); while (true) { final Socket connection = serverSocket.accept(); Runnable task = () -> handleRequest(connection); } } private static void handleRequest(Socket connection) { // 在这里做具体的业务逻辑处理 } } ``` **优点:** - 提高了响应性,之前需要在主循环中进行请求的处理,现在需要做的是在主循环中创建线程并将请求分配给线程,不需要进行处理。 - 提高了吞吐率,因为如果有因为等待 I/O ,等待获取锁,等待某些资源可用的情况的话,其他线程可以继续任务的执行,而不必非得等待阻塞完成后才顺序执行。 **致命缺陷:** - **线程生命周期的开销非常高。** 创建和销毁线程都是有代价的。 - **额外的资源消耗。** 过多的线程会导致线程的闲置,而这些闲置的线程相当于额外的消耗了系统的资源。 - **降低了稳定性。** 系统内存被耗尽时会抛出 `OutOfMemoryError` 异常,而从这个错误中恢复很危险,所以我们要做的就是避免出现这个错误。 #### 2、控制线程数量与确定边界:Executor 任务执行框架 于是,为了使任务被并行处理,而又不导致系统崩溃与出现额外的开销,就需要对任务的数量进行限制,于是引出了 `「Executor」` 框架。 书中在这里给 **「任务」** 与 线程之间的关系 下了一个清晰的定义: 「任务」 是一组逻辑工作单元,线程则是使任务异步执行的机制。 **任务也就是我们要完成的业务逻辑,线程则是任务具体运行的载体。** 「线程池」做了帮我们管理线程数量以及提前创建线程的工作,它是 `Executor` 任务执行框架组成的一部分。 Java 中 Executor 接口的代码非常简单: ```java public interface Executor { void execute (Runnable commoand); } ``` 这个 `execute` 就是执行线程中的「任务」 逻辑。 但是这个 `Executor` 的意义又非常大: - `Executor`为灵活且强大的**「异步任务执行框架」**提供了**基础**,**该框架能支持多种不同类型的任务**「执行策略」。 它提供了一种标准的方法将任务的**「提交」与「执行」**过程**解耦**,使用 `Runnbale`来表示任务。 - `Executor`的实现还提供了对**「生命周期」**的支持,以及「统计信息收集」、「应用程序管理机制」和「性能监视」等机制。 - `Executor`基于「生产者—消费者」模式,提交任务的操作相当于「生产者」(生产待完成的工作单元),执行任务的线程则相当于 「消费者」(执行完这些工作单元)。如果要在程序中实现一个 「生产者—消费者」的设计,最简单的方式就是通过使用 `Executor`。 【解耦,多种功能支持,生产者—消费者 模式最简单的实现方式, 文章后面的内容基本都围绕着 `Executor` 来展开。】 下面两个例子则是在 `Executor` 这个框架下 为每个连接启动一个线程以及以同步方式「串行」 执行所有任务的两种方式: ```java // 为每个请求都创建一个线程的 Executor public class ThreadPerTaskExecutor implements Executor { @Override public void execute(Runnable command) { new Thread(command).start(); } } // 以同步串行的形式执行所有任务的 Executor public class WithinThreadExecutor implements Executor { @Override public void execute(Runnable command) { command.run(); } } ``` `Executor`因为将任务的提交与任务的具体执行 进行了解耦,所以可以很轻易的**更换**「执行策略」,每种执行策略背后对应的都是一种对资源的管理,最佳的策略则取决于你的需求和可用的计算资源。 由于 `Executor` 执行任务的背后具体的执行者是线程池中的 「工作线程」 所以下面自然而然的引出了 「线程池」 这个概念对应的知识点。 首先是「线程池的优点」: - 可以有效控制线程数量,保证线程尽量处于忙碌姿态,同时不要创建过多的线程导致资源耗尽。 - 省去了重复创建/销毁线程的开销。 - 线程被提前准备好,提高了响应性,而不需要等使用的时候再去等待线程创建。 - 配置灵活,可以参数改变方便。 #### 3、 任务执行框架中的线程池类型介绍: 通过工厂方法创建不同的**「线程池」**: - `newFixedThreadPool`,创建一个**固定长度的线程池**,每提交一个任务该线程池中就创建一个线程,直到达到线程池「最大数量」。这时线程池的规模将不再变化(如果有线程在执行时遇到「未预期」的 `Exception`而结束,那么线程池会补充一个新的线程) <---【也就是对于意外减员的应对情况】 - `newCachedThreadPool`,创建了一个**「可缓存」的线程池**,如果线程池的当前规模超过了「处理器的需求」,那么将「回收」空闲的线程,而当需求增阿基时,则可以添加新的线程,「该线程池的规模不存在任何限制」。<---【那么是否存在线程创建过多导致资源耗尽的问题?】 - `newSingleThreadExecutor`,一个**单线程的线程池**,它创建单个工作线程来执行任务,如果这个线程「异常结束」,会创建另一个线程进行代替。`newSingleThreadExecutor`能确保依照任务在队列中的顺序来「串行」执行(例如FIFO,LIFO,优先级) 单线程的 `Executor`提供了大量的「内部」同步机制,从而确保了任务执行的任何内存写入操作对于后续任务来说都是「可见」的。这意味着,即使这个线程会时不时被「另一个线程替代」,但对象总是可以「安全」地「封闭」在「任务线程」 中。 - `newScheduledThreadPool`,创建了一个**固定长度的线程池**,而且以**「延迟」或定时的方式来执行任务**,类似于 `Timer`。 #### 4、 Executor 的生命周期与不同的关闭方式: 由于 JVM 只有在所有 **「非守护线程」** 全部终止后才会退出,而如果 `Executor` 没有正确的关闭的话,那么线程将一直存在,而导致 JVM 无法退出,所以 `Executor` 的 「关闭」 非常重要,这就涉及到了 **`Executor` 的生命周期* 为了解决 `Executor` 的生命周期问题,新增了一个接口 `ExecutorService` 扩展了 `Executor` 类,增加了一些管理生命周期的方法,还有一些用于提交任务的便利方法。 ```java public interface ExecutorService extends Executor { // 平缓的关闭一个任务执行框架,不再接受新的任务,同时等待已经提交的任务执行完成,包括未开始的任务 void shutdown(); // 粗暴的关闭任务执行框架,尝试取消所有运行中的任务,并且不再启动队列中尚未开始执行的任务 List shutdownNow(); // 返回该任务执行框架是否被调用 shutDown 方法 的结果 boolean isShutdown(); // 获取任务执行框架是否已经终止的状态 如果所有任务在关闭后都已完成,则返回true。 请注意,除非先调用shutdown或shutdownNow,否则isTerminated永远不会为真。 boolean isTerminated(); // 阻塞直到关闭请求后所有任务完成执行,或者发生超时,或者当前线程被中断(以先发生者为准)。 boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; // ... 其他用于任务提交的便利方法 ↓ // 提交一个 Callable 代表了一个任务,同时返回 Future,可以通过 get 获取任务执行的结果 Future submit(Callable task); // 对应的 Runnbale 形式的 submit Future submit(Runnable task); List> invokeAll(Collection> tasks) throws InterruptedException; List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException; T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException; T invokeAny(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; } ``` 下面就提供了一个 「支持」 关闭操作的使用 `Executor` 执行任务的 Web 服务器的例子: ```java // 支持关闭操作的 Web 服务器 public class LifecycleWebServer { private final ExecutorService exec = Executors.newCachedThreadPool(); // 开始执行任务 public void start() throws IOException { ServerSocket socket = new ServerSocket(80); // 只要任务执行框架没有被通知关闭,则一直执行主循环,同时可能因为 ExecutorService 被关闭抛出 拒绝异常,此时需要对异常进行处理 while (!exec.isShutdown()) { try { final Socket conn = socket.accept(); exec.execute(() -> handleRequest(conn)); } catch (RejectedExecutionException e) { // 当 ExecutorService 关闭后提交的任务会抛出这个异常 如果不是因为 ExecutorService 被关闭而抛出的该异常,则打印该异常信息。因为当 「阻塞队列饱和」时,被提交的任务也会被拒绝。 if (!exec.isShutdown()) { log("task submission reject", e); } } } } private void log(String msg, Exception e) { Logger.getAnonymousLogger().log(Level.WARNING, msg, e); } // 这里定义了一个接口,模拟我们自己定义的业务接口 interface Request { } // 对连接进行业务处理,首先判断封装的程序是否处于关闭状态,如果是,则调用 ExecutorService 的shutdown() 方法,否则进行任务分发 void handleRequest(Socket connection) { Request req = readRequest(connection); if (isShutdownRequest(req)) { stop(); } else { dispatchRequest(req); } } private void dispatchRequest(Request req) { //...啥也不做 } private void stop() { exec.shutdown(); } private boolean isShutdownRequest(Request req) { return false; } private Request readRequest(Socket s) { return null; } } ``` 这里增加的是对 关闭操作的支持,以及对被拒绝任务异常的处理方法是比较有学习价值的地方。 并且在看到这行代码时我产生了疑惑 ` if (!exec.isShutdown())` 说明并不只是在 `ExecutorService ` 被调用 `shutdown` 之后提交的任务会被拒绝,肯定还有其他的任务被拒绝的情况,于是查了一下发现: ![](https://xuyanxin-blog-bucket.oss-cn-beijing.aliyuncs.com/blog/20200414205657.png) 抛出 `RejectedExcutionException` 的两种情况: 1. `Executor` 被关闭。 2. `Executor` 中使用了一个有界的队列,并且队列已满,此时再被提交的任务就会被拒绝。 #### 5、延迟任务与周期性执行任务 **延迟执行任务**与**定时执行任务**也是非常常见的两种任务类型。 在 Java5 之前一般使用 `Timer` 来支持这两种场景,但是 因为 Time 存在诸多问题 : - `Timer`在执行多个定时任务时,只会启动一个线程,如果其中某个任务阻塞时间过长,会导致 `TimerTask` 的定时精确性被破坏。 - 例如某个周期 `TimerTask` 需要 每 10ms 执行一次,而另一个 `TimerTask` 需要执行 40ms,那么这个周期任务或者在 40ms 任务执行完成后快速连续地调用 4次,或者彻底「丢失」4次调用(取决于它是基于**「固定速率」**还是基于**「固定延迟」来**进行调度)。 - `TimerTask` 如果抛出了未检查异常,则整个 `Timer` 被取消,那些已经被调度,但是尚未执行的 TimerTask 将不会执行,这个问题称为 「线程泄漏」。 而 `SCheduledThreadPoolExecutor` 能正确处理这些错误,于是在 `Java5.0` 或更高的 `JDK`中,很少使用 `Timer`。 这里还介绍了构建自己的调度服务可以使用的组件 —— `DelayQueue`,其实现了 阻塞队列 `BlockingQueue`,并为 `SCheduledThreadPoolExecutor` 提供「调度」功能。 `DelayQueue` 管理着一组 `Delayed` 对象。每个 `Delayed` 对象都有一个相应的延迟时间:在 `DelayQueue` 中,只有某个元素「逾期」后,才能从 `DelayQueue` 中执行 `take` 操作,从 `DelayQueue` 中返回的对象将根据它们的 「延迟时间」 进行排序。 #### 6、通过一个完整的例子演化从线程到任务执行框架的方方面面 关于任务的几个概念: - `Runnable` :线程接口, `run` 方法可以写入日志文件或者将结果放入共享的数据结构,但是 run 方法没有返回值,并且不能抛出受检查的异常。 - `Runnable` 对象可以直接通过 `Thread` 对象创建线程实例 - `Runnable` 也可以通过线程池的 `submit` 和 `execte` 方法成为线程池中的工作线程 - `Runnbale` 可以进一步封装为 `FutureTask`然后提交给线程池执行。 - `Callable`:其入口是 `call` 方法,可以返回一个值,并且可以抛出一个**「受检查的异常」**。 - `Callable` 不能直接创建线程 - `Callable` 不能通过 `execute` 提交给线程池与其中的工作线程绑定。 - `FutureTask` :对 `Runnable` 和 `Callable` 的进一步封装,并且这种任务存在返回值。相比直接把`Runnable`和`Callable`扔给线程池,`FutureTask`的功能更多,它可以监视任务在池子中的状态。用`Runnable`和`Callable`创建`FutureTask`的方法稍有不同。 `Runnable` 和 `Callable` 描述的都是「抽象的计算任务」。 这些任务通常是「有范围」的 —— 都有一个明确的起始点,并且最终会结束。 `Executor` 执行的任务有 4 个生命周期阶段:**「创建、提交、开始、完成」**。 `Future` 规范中包含的**「隐含意义」** 是:任务的生命周期「只能前进」,不能后退,就像 `ExecutorService` 的生命周期一样,当某个任务完成后,它就只能永远停留在「完成」状态上。 ### 类图: ![](https://xuyanxin-blog-bucket.oss-cn-beijing.aliyuncs.com/blog/20200414215257.png)