funnycoding / blog

A Github Issue Blog
22 stars 0 forks source link

《Java并发编程实战》7.取消与关闭(中) 停止基于线程的服务 #34

Open funnycoding opened 4 years ago

funnycoding commented 4 years ago

应用程序通常会创建拥有多个线程的服务,例如线程池。

并且这些服务的生命周期通常比创建它们的方法的生命周期更长。 如果应用程序准备退出,那么这些服务所拥有的线程也需要结束。

由于无法通过抢占式的方法来停止线程,因此它们需要「自行结束」。 **正确的封装原则**是:**除非拥有某个线程,否则不能对线程进行操控。** 例如:中断线程或者修改线程的优先级等。 在线程 API 中,并没有对线程的所有权给出正式的定义:**线程由 Thread 对象表示**,并且像其他对象一样可以被自由共享。 然而线程有一个**相应的所有者**,即**创建该线程的类**。 因此**线程池是其工作者线程的所有者**<---工作线程在线程池内被创建,**如果要中断这些线程,那么应该使用线程池**。【<---通过线程池来做中断,而不是直接通过工作线程本身来中断?】 与其他封装对象一样,**线程的所有权是不可传递的**:应用程序可以拥有服务,服务也可以拥有工作线程,但应用程序并不能拥有工作者线程,因此**应用程序不能直接停止工作者线程**。 相反,服务应该提供**生命周期方法(Lifecycle Method)**来关闭它自己以及它所拥有的线程。 这样,当应用程序关闭该服务时,服务就可以关闭所有的线程了。 在 `ExecutorService` 中提供了 `shutdon` 和 `shutdownNot` 等方法。 同样,在其他拥有线程的服务中,也应该提供类似的关闭机制。 > 对于持有线程的服务,只要服务的存在时间大于创建线程的方法的存在时间,那么就应该提供生命周期方法。 #### 7.2.1 示例:日志服务 在大多数服务器应用程序中都会用到日志,例如在代码中插入 `println` 语句就是一种简单的日志。 像 `PrintWriter` 这样的**字符流类是线程安全**的,因此这种简单的方法不需要显示的同步。 但是如果需要在**单条日志信息中写入多行**,那么要通过**「客户端加锁」**来避免多个不正确地交错输出。如果两个线程同时把多行栈追中信息**(Stack Trace)** 添加到同一个流中,并且每行信息对应一个 `println` 调用,那么这些**信息在输出中将交错在一起**,看上去就是一些虽然庞大但毫无意义的栈追踪信息。 然而,在11.6节中将看到这种**内联日志功能会给一些高容量的(Highvolume)应用程序带来一定的性能开销。** 另一种替代方法是通过调用 `log` 方法**将日志消息放入某个「队列」中,并由「其他线程」来处理。** 在**程序清单 7-13** 的 `LogWriter` 中给出了一个简单的日志服务示例,其中日志操作是在单独的**「日志线程」** 中执行的。产生日志信息的线程并不会将消息直接写入输出流,而是由 `LogWriter` 通过 `BlockingQueue` 将信息提交给日志线程,并由日志线程写入。 这是一种 **多生产者单消费者(Multiple-Producer,Single-Consumer)** 的设计方式:每个调用 `log` 的操作都相当于一个生产者,而后台的日志线程则相当于消费者。 如果消费者的处理速度低于生产者的生成速度,那么 `BlockingQueue` 将阻塞生产者,直到日志线程有能力处理新的日志消息。 > **程序清单 7-13** 不支持关闭的 生产者—消费者 日志服务: ```java // 不支持关闭的 生产者-消费者服务,没有终止线程的方法 public class LogWriter { private final BlockingQueue queue; private final LoggerThread logger; private static final int CAPACITY = 1000; public LogWriter(Writer writer) { this.queue = new LinkedBlockingQueue(CAPACITY); this.logger = new LoggerThread(writer); } // 开始产生日志 public void start() { logger.start(); } // 将日志放入阻塞队列中 public void log(String msg) throws InterruptedException { queue.put(msg); } // 消费者,将队列中的日志写入文件 private class LoggerThread extends Thread { private final PrintWriter writer; public LoggerThread(Writer writer) { this.writer = new PrintWriter(writer, true); } @Override public void run() { try { while (true) { writer.println(queue.take()); } } catch (InterruptedException e) { // 忽略 } finally { writer.close(); } } } } ``` 为了使像 `LogWriter` 这样的服务在软件产品中能发挥实际的作用,还需要实现一种 **终止日志线程**的方法,从而避免使 `JVM` 无法正常关闭。 要**停止日志线程是很容易**的,因为它会反复调用 `take`,而 `take`方法可以响应中断。 **如果将日志线程修改为捕获到 `InterruptException` 时退出,那么只需要中断日志线程就可以停止服务。** **<--- 【使线程退出的一种解决方案,但并不是一个完善的关闭机制,会丢失队列中的信息】** 然而**,如果只是日志线程退出,那么还不是一种完备的关闭机制**。 这种直接关闭的做法会丢失那些正在等待被写入到日志的信息,不仅如此,其他下次线程将在调用 `log` 时被阻塞,**因为日志消息队列是满的**,因此这些线程将**无法解除阻塞状态**。 当**取消一个 生产者—消费者 操作时,需要同时取消生产者和消费者**。 在中断日志线程时会处理消费者,但在这个示例中,由于**生产者并不是专门的线程,因此要取消它们将非常困难**。 另一种关闭 `LogWriter` 的方法是:设置某个 **「已请求关闭」**的标志位,以避免进一步调教日志消息,如**程序清单 7-14所示** 在收到关闭请求后,消费者会把队列中的所有消息写入日志,并解除所有在调用 `log` 时阻塞的生产者。 然而这个方法中存在着 **「竞态条件」** 问题,使得该方法并不可靠。 `log` 的实现是一种**「先判断,再运行」**的代码序列:生产者发现该服务还没有关闭,因此在关闭服务后仍然会将日志消息放入队列,这同样会使得生产者可能在调用 log 时阻塞并无法接触阻塞状态。 可以通过一些**技巧**来降低这种情况的发生概率(例如:**在宣布队列在清空之前,让消费者等待数秒**)但**这些都没有解决问题的本质**:即使很小的概率也可能导致程序发生故障。 > **程序清单 7-14** : 通过一种不可靠的方式为日志服务增加关闭支持: ```java public void log(String msg) throws InterruptedException { if(!shutdownRequested) { queue.put(msg); } else { throw new IllegalStateException("logger is shut down") } } ``` 为 `LogWriter` 提供**可靠的关闭操作**的方法是**解决竞态条件问题**,因而要使**日志消息的提交成为 「原子操作」**。然而我们不希望在消息加入队列时持有一个锁,因为 `put` 方法本身就可以阻塞。 我们采用的方法是:通过**原子方式**来**检查关闭请求**,并且有条件地**递增一个计数器**来"保持"提交消息的权利。 > 程序清单 7-15 向 LogWriter 添加可靠的取消操作: ```java // 存在可靠的取消服务的 LogWriter 类 通过原子方式检查关闭请求,有条件地递增一个计数器来保证提交消息的权利 public class LogService { private final BlockingQueue queue; private final LoggerThread loggerThred; private final PrintWriter writer; @GuardedBy("this") private boolean isShutdown; @GuardedBy("this") private int reservations; public LogService(PrintWriter writer) { this.queue = new LinkedBlockingQueue<>(); this.loggerThred = new LoggerThread(); this.writer = new PrintWriter(writer); } public void start() { loggerThred.start(); } /** * 关闭服务的方法,将中断标志设为true,并且申请中断该线程 */ public void stop() { synchronized (this) { isShutdown = true; } // 中断该线程 loggerThred.interrupt(); } /** * 产生 log 信息的方法,当服务被终止时 调用该方法抛出异常 * * @param msg 日志信息 * @throws InterruptedException */ public void log(String msg) throws InterruptedException { synchronized (this) { if (isShutdown) { throw new IllegalStateException("线程已被终止"); } // 工作队列中待处理信息的数量 ++reservations; } queue.put(msg); } // 写入日志的工作线程 private class LoggerThread extends Thread { @Override public void run() { try { while (true) { try { synchronized (LogService.this) { if (isShutdown && reservations == 0) { break; } } final String msg = queue.take(); synchronized (LogService.this) { --reservations; } // 写入数据 writer.println(msg); } catch (InterruptedException e) { // 重试 } } } finally { writer.close(); } } } } ``` **【↑ 可以看到这里增加了可靠的对服务的取消的方法,使用内置锁保证了变量不会产生竞态条件问题,使日志消息的提交成为原子操作。】** #### 7.2.2 关闭 ExecutorService 在 [6.2.4]() 节中,我们看到 `ExecutorService` 提供了两种关闭方法: - 使用 `shutdown` 正常关闭。 - 使用 `shutdownNow` 强行关闭。 在进行强行关闭时,`shutdownNow` **首先关闭正在执行的任务,然后返回所有未启动的任务清单。** 这两种关闭的方式的差别在于各自的「安全性」 和 「响应性」: **强行关闭的关闭速度更快**,但相应的,其风险也更大,因为**任务很可能在执行到一半时被结束**。 **而正常关闭虽然速度慢,但却更安全,因为 `ExecutorService` 会一直等到队列中的所有任务都执行完才关闭。** 在其他拥有线程的服务中,也应该考虑提供类似的关闭方式以供选择。 **【↑ 意思是我们需要提供一个快速关闭的服务,和一个安全关闭的服务,这样可以适用于不同的场景。】** 简单的程序可以直接在 `main` 函数中启动和关闭全局的 `ExecutorService` 。 而在**复杂程序**中,通常会将 `ExecutorService` **封装在某个更高级别的服务中**,并且**该服务能提供其自己的生命周期方法**。 例如**程序清单 7-16** 中 `LogService` 的一种变化形式,它将**管理线程**的工作**委托**给一个 `ExecutorService`,而不是由其自行管理。 **通过封装 `ExecutorService` 可以将「所有权链(Ownership Chain 」 从应用程序扩展到服务以及线程,所有权链上的各个成员都将管理它所拥有的服务或线程的生命周期。** **【↑ 这句话很重要,点明了封装 `ExecutorService` 的意义所在。】** > **程序清单 7-16** 使用 ExecutorService 的日志服务: ```java public class LogService { private final ExecutorService exec = newSingleThreadExecutor(); ... public void start() { } // 封装 ExecutorService 的关闭该方法 public void stop() throws InterruptException { try { exec.shutdown(); exec.awatiTermination(TIMEOUT,UNIT) } finally { writer.closer(); } } public void log(String msg) { try { exec.execute(new WriterTask(msg)); } catch (RejectedExecutionException ignored) { // 忽略该异常} } } ``` #### 7.2.3 "毒丸" 对象 **另一种关闭 「生产者—消费者」 服务的方式就是使用「毒丸」 Poison Pill 对象。** **所谓 「毒丸」 是指: 一个放在队列上的对象**,其含义是:**当得到这个对象的时候,立即停止**。 在`FIFO` 先进先出队列中,"毒丸" 对象将确保消费者在关闭之前首先完成队列中的所有工作,在提交 "毒丸" 对象之前提交的所有工作都会被处理,而生产者在提交了毒丸对象之后,将不会再提交任何工作。 【↑ 也就是代表了关闭意义的一个对象】 在程序清单 `7-17`、`7-18`、`7-19` 中给出一个单 生产者—消费者的 桌面搜索示例(来自程序清单 5-8) ,在这个示例中,使用了 "毒丸" 对象来关闭服务 > **程序清单 7-17** 通过 毒丸 对象来关闭服务:(7-18,7-19 分别为具体的生产者和消费者代码片段,这里融合在一个完整的类中) ```java // 使用毒丸对象终止服务 public class IndexingService { private static final int CAPACITY = 1000; private static final File POISON = new File(""); private final IndexerThread consumer = new IndexerThread(); private final CrawlerThread producer = new CrawlerThread(); private final BlockingQueue queue; private final FileFilter fileFilter; private final File root; public IndexingService(BlockingQueue queue, FileFilter fileFilter, File root) { this.queue = new LinkedBlockingQueue<>(CAPACITY); this.fileFilter = f -> f.isDirectory() || fileFilter.accept(f); this.root = root; } private boolean alreadyIndexed(File entry) { return false; } // 消费者的逻辑,判断从队列中取到是否为毒丸对象,如果不是就持续消费 class IndexerThread extends Thread { @Override public void run() { try { while (true) { final File file = queue.take(); if (file == POISON) { break; } else { indexFile(file); } } } catch (InterruptedException e) { e.printStackTrace(); } } private void indexFile(File file) { // ... 具体业务逻辑 这里省略 } } // 生产者的逻辑,当文件抓取完成后持续向队列中放入毒丸对象 class CrawlerThread extends Thread { @Override public void run() { try { crawl(root); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 当抓取完成后开始持续放入毒丸对象来关闭这个任务 while (true) { try { // 放入毒丸对象 queue.put(POISON); break; } catch (InterruptedException e) { // 重试 } } } } } // 抓取文件的方法 private void crawl(File root) throws InterruptedException { final File[] entries = root.listFiles(fileFilter); if (entries != null) { for (File entry : entries) { if (entry.isDirectory()) { crawl(entry); } else if (!alreadyIndexed(entry)) { queue.put(entry); } } } } } ``` **只有在生产者和消费者数量都已知的情况下,才可以使用"毒丸"对象。** 在 `IndexingService`中采用的解决方案可以扩展到 「多个生产者」: 只需给每个生产者队列中都放入一个"毒丸"对象,并且消费者仅当在接收 生产者数量个毒丸对象时才会停止。 这种方法也可以扩展到**多个消费者**:只需生产者将 消费者数量的毒丸对象放入队列。 **然而,当生产者和消费者的数量较大时,这种方法变得难以使用。** <---【瓶颈】 只有在无界队列中,"毒丸"对象才能可靠的工作。**<---【数据结构要求】** #### 7.2.4 示例:只执行一次的服务 如果某个方法需要处理一批任务,并且当所有任务都处理完成后才返回,那么可以通过一个**私有的** `Executor` 来简化服务的生命周期管理,其中该 `Executor` 的生命周期是由这个方法来控制的。(在这种情况下 `invokeAll` 和 `invokeAny` 等方法通常会起较大作用) **【↑ 给了场景和解决方案,需要记录在册】** #### 7.2.5 shutdownNow 的局限性 **当通过 `shutdonwNow` 来强行关闭 `ExecutorService` 时,它会尝试取消正在执行的任务,并返回所有已提交但尚未开始的任务,从而将这些任务写入日志或者保存起来以便之后进行处理。** 【`shutdownNow` 返回的 `Runnable` 对象可能与提交给 `ExecutorService` 的 `Runnable` 对象 **「不相同」**:它们可能是已经被封装过的已提交任务。】 然而我们无法通过「常规」 的方法来找出哪些任务已经开始但尚未结束。 这意味着我们无法在关闭过程中知道正在执行的任务的状态,除非任务本身会执行某种检查。 要知道哪些任务还没有完成,你不仅需要知道哪些任务还没有开始,还需要知道当 `Executor` 关闭时,哪些任务正在执行。【然而,**在关闭过程中只会返回尚未开始的任务,而不会返回正在执行的任务**。 如果能返回这两种类型的任务,那么就不需要这种不确定的中间状态】 > **程序清单 7-21** 在 `ExecutorService` 中**跟踪在关闭之后被取消的任务**: ```java // 跟踪 ExecutorService 中被关闭的任务 public class TrackingExecutor extends AbstractExecutorService { private final ExecutorService exec; // 用来保存被关闭的任务 private final Set taskCancelledAtShutdown = Collections.synchronizedSet(new HashSet<>()); public TrackingExecutor(ExecutorService exec) { this.exec = exec; } @Override public void shutdown() { exec.shutdown(); } @Override public List shutdownNow() { return exec.shutdownNow(); } @Override public boolean isShutdown() { return exec.isShutdown(); } @Override public boolean isTerminated() { return exec.isTerminated(); } @Override public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { return exec.awaitTermination(timeout,unit); } public List getCancelledTasks() { if (!exec.isTerminated()) { throw new IllegalStateException("当前服务尚未关闭"); } return new ArrayList<>(taskCancelledAtShutdown); } @Override public void execute(Runnable runnable) { exec.execute(() -> { try { runnable.run(); }finally { if (isShutdown() && Thread.currentThread().isInterrupted()) { taskCancelledAtShutdown.add(runnable); } } }); } } ``` 在 **程序清单 7-22** 的 `WebCrawler` 中给出了 `TrackingExecutor` 的用法。 网页爬虫程序的工作通常是无穷尽一直进行的,因此当爬虫程序必须关闭时,我们通常希望**保存它的状态**,以便稍后重新启动。 CrawlTask 提供了一个 getPage 方法,该方法能找出正在处理的页面。 当爬虫程序关闭时,无论是**「尚未开始新任务」** 还是 **「那些被取消的任务」**,都会记录它们的 **URL**,当爬虫程序重新启动,就可以将这些 URL 的页面抓取任务加入到任务队列中重新执行。 ↑【断点续传】 > **程序清单 7-22** 使用 `TrackingExecutorService` 来保存未完成的任务以备后续执行: ```java // 使用 TrackingExecutorService 来保存未完成的抓取任务,以便下次启动时重新抓取 public abstract class WebCrawler { private volatile TrackingExecutor exec; @GuardedBy("this") private final Set urlsToCrawl = new HashSet<>(); private final ConcurrentMap seen = new ConcurrentHashMap<>(); private static final long TIMEOUT = 500; private static final TimeUnit UNIT = TimeUnit.MILLISECONDS; public WebCrawler(URL startUrl) { urlsToCrawl.add(startUrl); } public synchronized void start() { exec = new TrackingExecutor(Executors.newCachedThreadPool()); urlsToCrawl.forEach(this::submitCrawlTask); urlsToCrawl.clear(); } public synchronized void stop() throws InterruptedException { try { saveUncrawled(exec.shutdownNow()); // 作用? if (exec.awaitTermination(TIMEOUT, UNIT)) { saveUncrawled(exec.getCancelledTasks()); } } finally { exec = null; } } protected abstract List processPage(URL url); // 保存尚未执行的抓取任务 private void saveUncrawled(List unCrawledTask) { for (Runnable task : unCrawledTask) { urlsToCrawl.add(((CrawlTask) task).getPage()); } } private void submitCrawlTask(URL url) { exec.execute(new CrawlTask(url)); } private class CrawlTask implements Runnable { private final URL url; public CrawlTask(URL url) { this.url = url; } // 这个计数器没有使用上 private int count = 1; // 判断该任务是否已经被抓取过 boolean alreadyCrawled() { return seen.putIfAbsent(url, true) != null; } // 将已经抓取的任务移除队列 void makeUncrawled() { seen.remove(url); System.out.printf("marking %s uncrawled %n", url); } @Override public void run() { for (URL link : processPage(url)) { if (Thread.currentThread().isInterrupted()) { return; } submitCrawlTask(link); } } public URL getPage() { return url; } } } ``` 在 `TrackingExecutor` 中存在一个**「不可避免的竞态条件」**,从而产生误报问题:一些被认为已经取消的任务实际上已经执行完成。 产生这个问题的原因是:在任务执行最后一条指令以及线程池将任务记录为"结束"的两个**时刻之间**,线程池可能关闭。 如果任务是**「幂等」**的,(任务多次执行与执行一次的结果相同),那么不会存在问题,在网页爬虫中就是这种情况。 否则,在应用程序中必须考虑这种风险,并对误报问题做好准备。 **↑【给出了问题,给出了原因,给出了场景,包括这个代码,都很值得好好消化,反复咀嚼。】**