penglongli / blog

18 stars 1 forks source link

Java 的 Callable 与 Future 使用 #110

Open penglongli opened 6 years ago

penglongli commented 6 years ago

Future 表示异步计算的结果,使用 get() 方法可以获得结果,当调用 get() 方法时如果未计算完成则会陷入阻塞状态。下边是用 Future 能实现的一些功能:

并行计算

如下,我们要计算 1-3E 之间的累加和:

public class Test {

    static class Calc implements Callable<Long> {
        private Long start;
        private Long end;

        Calc(Long start, Long end) {
            this.start = start;
            this.end = end;
        }

        @Override
        public Long call() {
            Long sum = 0L;
            for (long i = start; i <= end; i++) {
                sum += i;
            }

            return sum;
        }
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        Calc c1 = new Calc(1L, 100000000L);
        Calc c2 = new Calc(100000001L, 200000000L);
        Calc c3 = new Calc(200000001L, 300000000L);

        Future<Long> r1 = executorService.submit(c1);
        Future<Long> r2 = executorService.submit(c2);
        Future<Long> r3 = executorService.submit(c3);

        System.out.println(r1.get() + r2.get() + r3.get());

        executorService.shutdown();
    }
}

我们可以把其分为三个过程:1-1e、1e-2e、2e-3e,这样子就可以把计算并行为三个过程。

任务手动取消

这里调用的是 Future.cancel(bool mayInterruptIfRunning) 方法

使用方法如下:

public static void main(String[] args) throws InterruptedException {
    ExecutorService executorService = Executors.newCachedThreadPool();
    Calc c1 = new Calc(1L, 10000000000L);

    Future<Long> r1 = executorService.submit(c1);
    Thread.sleep(2000);
    r1.cancel(true);
    try {
        System.out.println(r1.get());
    } catch (CancellationException e) {
        System.out.println("任务被取消");
    } finally {
        executorService.shutdown();
    }
}

我在用的时候发现了一个线程池泄露的问题,当调用 Future.cancel() 时候,Future 并未被取消掉。这里需要明白一个概念:Java 中没有显示定义线程立即中断或停止的方法,需要手工解决,参见:https://github.com/penglongli/blog/issues/109

这里我们做一下改动:

public class Test {

    static class Calc implements Callable<Long> {

        private Long start;
        private Long end;

        Calc(Long start, Long end) {
            this.start = start;
            this.end = end;
        }

        @Override
        public Long call() {
            Long sum = 0L;
            for (long i = start; i <= end; i++) {
                if (Thread.currentThread().isInterrupted()) {
                    return null;
                }
                sum += i;
            }

            return sum;
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        Calc c1 = new Calc(1L, 10000000000L);

        Future<Long> r1 = executorService.submit(c1);
        Thread.sleep(2000);
        r1.cancel(true);
        try {
            System.out.println(r1.get());
        } catch (CancellationException e) {
            System.out.println("任务被取消");
        } catch (ExecutionException e) {
            System.out.println("任务执行异常");
        } finally {
            executorService.shutdown();
        }
    }
}

任务限时自动取消

任务限时取消的方法是:Future.get(long timeout, TimeUnit unit) ,当然这里也会出现线程池泄露的情况,如下方法:

public class Test3 {
    static class Calc implements Callable<Long> {
        private Long start;
        private Long end;

        Calc(Long start, Long end) {
            this.start = start;
            this.end = end;
        }

        @Override
        public Long call() {
            Long sum = 0L;
            for (long i = start; i <= end; i++) {
                if (Thread.currentThread().isInterrupted()) {
                    // If interrupt, stop thread
                    return null;
                }
            }

            return sum;
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        Calc c1 = new Calc(1L, 10000000000L);

        Future<Long> r1 = executorService.submit(c1);
        try {
            r1.get(2000, TimeUnit.MILLISECONDS);
        } catch (CancellationException e) {
            System.out.println("Task is canceled");
        } catch (ExecutionException e) {
            System.out.println("Task occurred an exception when executing");
        } catch (TimeoutException e) {
            System.out.println("Task timeout");

            // do cancel if Timeout
            r1.cancel(true);
        } finally {
            executorService.shutdown();
        }
    }
}

异构任务并行计算

例如对于 Markdown 解析来说,第一步是解析所有文本,第二步是上传图片。解析文本是 CPU 密集型,上传图片是网络 I/O 密集型。对于此可以把这两步分为两个任务来并行执行