funnycoding / blog

A Github Issue Blog
22 stars 0 forks source link

《Java 并发编程实战》 5. 基础构建模块 #23

Open funnycoding opened 4 years ago

funnycoding commented 4 years ago

第四章介绍了构造线程安全类时采用的一些技术:例如将线程安全性委托给现有的线程安全类

「委托」是创建线程安全类的一个最有效的策略:只需让现有的线程安全类 「管理所有的状态」 即可。

Java 平台类库包含了丰富的并发基础构建模块,例如线程安全的容器类以及各种用于协调多个相互协作的线程控制流的同步工具类(Synchronizer)

本章将介绍其中最有用的并发构建模块。特别是在 「Java5.0」「Java6」 中引入的新模块,以及在使用这些模块来构造并发应用程序时的一些常用模式

【这里的常用模式如果是设计的模式的话,就可以跟刚买的 《图解 Java多线程设计模式》 这本书联系起来了,不过在这个语境下的模式不一定是设计模式的意思。】

5.1 同步容器类

同步容器类包括 VectorHashtable,二者是早起 JDK 的一部分,此外还包括在 JDK 1.2 中添加的一些功能相似的类,这些同步的封装器是由 Collections.synrhonizedXxx 等一列工厂方法创建的。

这些类实现线程安全的方式是:将它们的「状态」封装起来,并对每个公有方法进行同步,使得每次只有一个线程能访问容器的同步状态。【保持了互斥,就保持了线程安全】

下图是 Collections.java 中的包装器类,将非线程安全的类包装为线程安全的类:

【以 对 List 的包装为例:】

// java.util.Collections JDK中自带的 Collections Framework 工具类

        /**
     *  将非线程安全的 List 包装为 线程安全的类的包装器类 , 可以看到重点就是只要操作容器的方法,都使用内置锁, 保证同一时间只有一个线程可以对该容器进行操作,保证了互斥性。
     */
    static class SynchronizedList<E>
        extends SynchronizedCollection<E>
        implements List<E> {
        private static final long serialVersionUID = -7754090372962971524L;

        final List<E> list;

        SynchronizedList(List<E> list) {
            super(list);
            this.list = list;
        }
        SynchronizedList(List<E> list, Object mutex) {
            super(list, mutex);
            this.list = list;
        }

        public boolean equals(Object o) {
            if (this == o)
                return true;
            synchronized (mutex) {return list.equals(o);}
        }
        public int hashCode() {
            synchronized (mutex) {return list.hashCode();}
        }

        public E get(int index) {
            synchronized (mutex) {return list.get(index);}
        }
        public E set(int index, E element) {
            synchronized (mutex) {return list.set(index, element);}
        }
        public void add(int index, E element) {
            synchronized (mutex) {list.add(index, element);}
        }
        public E remove(int index) {
            synchronized (mutex) {return list.remove(index);}
        }

        public int indexOf(Object o) {
            synchronized (mutex) {return list.indexOf(o);}
        }
        public int lastIndexOf(Object o) {
            synchronized (mutex) {return list.lastIndexOf(o);}
        }

        public boolean addAll(int index, Collection<? extends E> c) {
            synchronized (mutex) {return list.addAll(index, c);}
        }

        public ListIterator<E> listIterator() {
            return list.listIterator(); // Must be manually synched by user
        }

        public ListIterator<E> listIterator(int index) {
            return list.listIterator(index); // Must be manually synched by user
        }

        public List<E> subList(int fromIndex, int toIndex) {
            synchronized (mutex) {
                return new SynchronizedList<>(list.subList(fromIndex, toIndex),
                                            mutex);
            }
        }

        @Override
        public void replaceAll(UnaryOperator<E> operator) {
            synchronized (mutex) {list.replaceAll(operator);}
        }
        @Override
        public void sort(Comparator<? super E> c) {
            synchronized (mutex) {list.sort(c);}
        }

        /**
         * SynchronizedRandomAccessList instances are serialized as
         * SynchronizedList instances to allow them to be deserialized
         * in pre-1.4 JREs (which do not have SynchronizedRandomAccessList).
         * This method inverts the transformation.  As a beneficial
         * side-effect, it also grafts the RandomAccess marker onto
         * SynchronizedList instances that were serialized in pre-1.4 JREs.
         *
         * Note: Unfortunately, SynchronizedRandomAccessList instances
         * serialized in 1.4.1 and deserialized in 1.4 will become
         * SynchronizedList instances, as this method was missing in 1.4.
         */
        private Object readResolve() {
            return (list instanceof RandomAccess
                    ? new SynchronizedRandomAccessList<>(list)
                    : this);
        }
    }

【第一节读完的一个小疑问包装器类可以保证将线程不安全的容器包装为线程安全的类,但是后面为什么又添加了 CopyOnWriteArrayList,和 ConcurrentHashMap 这样为并发而生的容器呢?

被包装器包装的类的最大的问题应该就是效率问题,互斥带来的影响就是如果容器非常大,对容器的操作需要花费很长时间的话,程序会进行长时间的阻塞,带来死锁与饥饿问题。】

5.1.1 同步容器类的问题

同步容器类都是线程安全的,但在某些情况下,可能需要额外的客户端加锁来保护复合操作。

容器上常见的复合操作包括:

同步容器类中,这些复合操作在没有客户端加锁的情况下仍然是线程安全的,但当其他线程并发地修改容器时,这些操作可能出现意料之外的行为

【也就是虽然不加锁使用线程安全的同步容器是安全的,但是不加锁的话就可能存在其他线程在访问的同时对容器进行修改,这时会出现问题。】

程序清单 5-1 给出了在 Vector 中定义的两个方法: getLast 和 deleteLast ,它们都会执行 「先检查再运行」 操作,「每个方法首先获得数组的大小,然后通过结果来获取或删除最后一个元素」

程序清单 5-1 , Vector 上可能导致混乱结果的复合操作

// 非线程安全的类,对 Vector 进行复合操作可能带来令人困惑的结果
// UnsafeVecotrHelpers.java
public class UnsafeVecotrHelpers {
    /**
     * 获取当前 Vector 的最后一个元素
     *
     * @param list
     * @return
     */
    public static Object getLast(Vector list) {
        int lastIndex = list.size() - 1;
        return list.get(lastIndex);
    }

    /**
     * 移除当前 Vector 的最后一个元素
     *
     * @param list
     */
    public static void deleteLast(Vector list) {
        int lastIndex = list.size() - 1;
        list.remove(lastIndex);
    }
}

这些方法看似没有任何问题,从某种意义上来看也是如此 —— 无论多少个线程同时调用它们,也不会破坏 Vector 本身的线程安全性。

但从这些方法的 「调用者」 角度来看,情况就不同了。

如果 「线程A」 在包含 10个元素的 Vector 上调用 getLast 方法。同时 「线程B」 在同一个 Vector 上调用 deleteLast 方法,这些操作的交替执行图 如图 5-1 所示,getLast 将抛出 ArrayIndexOutOfBoundsException异常。在调用 size()与调用 getLast() 这两个操作之间,Vector 变小了,因此调用 size() 时得到的 Vector 长度信息变成了失效值。

这种情况很好滴遵循了 Vector「规范」 —— 如果请求一个不存在的元素,那么将抛出一个异常。

但是这并不是 getLast() 调用者想获得的结果(即使在并发修改的情况下也不希望看到),除非 Vector 从一开始就是一个空的容器。

由于 「同步容器类」 要遵守同步策略 —— 支持客户端加锁(这只在 Java5.0 的 Javadoc 中作为迭代示例简要地提了一下),因此可能会创建一些新的操作。

只要我们知道应该使用哪一个锁,那么这些新操作就与容器的其他操作一样都是原子操作。

同步容器类通过其自身的锁来保护它的每个方法,通过获得容器类的锁,我们可以使 getLastdeleteLast 成为原子操作,并确保 Vector 的大小在 调用 sizeget 之间不会发生变化,如 程序清单 5-2 所示。

程序清单 5-2 在使用客户端加锁的 Vector 上的复合操作:

// 在调用线程安全的容器类时进行客户端加锁来保证操作的原子性
public class SafeVectorHelpers {
    /**
     * 原子操作的获取 Vector 上的最后一个元素
     *
     * @param list
     * @return
     */
    public static Object getLast(Vector list) {
        // 获取容器类的锁,保证在操作 list 容器时 没有其他线程可以修改该容器
        synchronized (list) {
            int lastIndex = list.size() - 1;
            return list.get(lastIndex);
        }
    }

    /**
     * 原子操作的删除 Vector 上的最后一个元素
     *
     * @param list
     */
    public static void deleteLast(Vector list) {
        synchronized (list) {
            int lastIndex = list.size() - 1;
            list.remove(lastIndex);
        }
    }
}

【这里的关键就是在对 Vector 进行操作时,在调用端将这个 Vector 本身作为锁,使用内置锁来确保后续的操作是一个原子操作,同时只能有一个线程对该容器进行操作,保证互斥性】

在调用 size 和 相应的 get 之间, Vector 的长度 可能会发生变化。 这种风险在对 Vector 中的元素进行「迭代」时仍然会出现,如程序清单 5-3 所示:

for (int i = 0; i<vector.size(); i++) {
        doSomething(vector.get(i));
}

这种迭代操作的正确性要依赖于运气,即在调用 size()get() 之间没有「其他线程」修改 Vector

「单线程」 环境中,这种假设完全成立,但在有其他线程 并发地修改 Vector 时,则可能导致错误的出现。

getLast() 一样,如果在对 Vector 进行迭代时,「另一个线程删除了一个元素」,并且这两个操作「交替执行」,那么这种迭代方法将抛出 ArrayIndexOutOfBoundsException 异常。

【所以错误的原因跟之前的例子一样,同一时间,可能有多个线程同时修改容器的内容,导致获取的元素已经被修改或者不存在。】

虽然在 程序清单5-3 中的 「迭代」操作可能抛出异常,但这并不意味着 Vector 不是线程安全的类。

Vector 的状态仍然是有效的,而抛出异常的情况也与其规范保持了一致。然而,读取最后一个元素或是迭代 这样的简单操作中抛出异常显然不是我们预期的结果。

我们可以通过客户端加锁来解决不可靠地迭代问题,但要牺牲一些伸缩性。

通过在「迭代期间」 持有 Vector 的锁,可以防止其他线程在迭代期间修改 Vector,如 程序清单 5-4 所示。 然而这同样会导致其他线程在迭代期间无法访问它,因此降低了并发性能

程序清单5-4 带有客户端加锁的迭代:

// 加锁保证了只有一条线程操作 vector
synchonized(vector) {
        for (int i = 0; i < vector.size(); i++) {
                doSomething(vector.get(i));
        }
}

2个名词解释,我之前阅读的时候有的一些疑问不过现在已经明白了,所以在这里记录下来,希望帮助到有同样疑问的人:

1、这里一直提的 「客户端加锁」,其实就是在调用的处加锁,你就是客户端开发者。

2、这里的 「牺牲一些伸缩性」,中的伸缩性,指的是横向扩容时对机器的提升,比如给予更多的CPU,更大的内存时对程序性能的提升,而当使用过多的客户端加锁,就会导致过多的互斥,同一时间只有一个线程能对共享资源进行操作,导致增加 硬件 也没法提升程序的性能,这就是我对牺牲伸缩性的理解。

5.1.2 迭代器与 ConcurrentModificationException

为了将问题阐述清楚,我们使用 Vector,虽然这是一个 "古老" 的容器类。 然而,许多"现代" 的容器类也并没有消除复合操作中的问题。 无论在直接迭代还是在 Java5.0 引入 for-each 循环语法中,对容器类进行迭代的标准方式都是使用 Iterator 迭代器

然而,如果有其他线程并发地修改容器,那么即使使用迭代器也无法避免在迭代期间对容器进行加锁。【解决的方法只有加锁,但是这会导致性能下降。】

设计同步容器类的迭代器时并没有考虑到 并发修改 的问题,并且它们表现出的行为是 "及时失败"(fail-fast) 的。 这意味着,当它们(迭代器)发现容器在迭代过程中被修改时,就会抛出一个 ConcurrentModificationException 并发修改异常

这种 "及时失败" 的迭代器并不是一种完备的处理机制, 而只是 "善意地" 捕获并发错误,因此只能作为并发问题的预警指示器

它们采用的实现方式是,将计数器的变化与容器关联起来:如果在迭代期间计数器被修改,那么 hasNext 或 next 将抛出 ConcurrentModificationException。 然而,这种检查是在没有同步的情况下进行的,因此可能会看到 失效的计数值, 而 迭代器可能并没有意识到计数值已经发生了修改。

这是一种 设计上的权衡 ,从而降低并发修改操作的检测代码对程序性能带来的影响。

Tips:在单线程代码中也可能抛出 ConcurrentModificationException。 当对象直接从容器中删除而不是通过 Iterator.remove 来删除时,就会抛出这个异常。

【这个在我们刚学习 Java语法 并解除对容器的内容进行操作时应该都犯过这种错误,直接list.remove() 导致的异常】

程序清单5-5 说明了如何使用 for-each 循环语法对 List 容器进行迭代。 从内部来看,javac 将生成使用 Iterator 的代码,反复调用 hasNext 和 next 来迭代 Lisdt 对象。 与迭代 Vector 一样,要想避免出现 ConcurrentModificationException,就必须在迭代过程中持有容器的锁。

【保证在同一时间只有一个线程能对这个容器进行操作】

程序清单 5-5 通过 Iterator 来迭代 List:

List<Widget> widgetList = Collections.synchronizedList(new ArrayList<>());
...
// 可能抛出并发修改异常的迭代,不能保证当前线程对 List widgeList 独占
for (Widget w :widgetList) {
        dosomething(w);
}

然而,有时候开发人员并不希望在 迭代期间 对容器加锁。

例如,某些线程在可以访问容器之前,必须等待迭代过程结束。 而如果容器的规模很大,或者在某个元素上执行的操作时间很长,那么这些线程将长时间等待。

同样,如果容器像程序清单5-4 中那样加锁(使用了内置锁将容器本身作为锁),那么在调用 doSomething时将持有一个锁,这样可能导致死锁(参见 [第十章]())。

即使不存在饥饿或者死锁等风险,长时间地对容器加锁也会 降低 程序的可伸缩性。 持有锁的时间越长,在锁上的竞争可能就越激烈,如果许多线程都在等待锁被释放,那么将极大地降低 吞吐量 和 CPU 的利用率。(参见

[第十一章]())

如果不希望在迭代期间对容器加锁,那么一种 替代方法 就是 "克隆" 容器,并在副本上进行迭代。 由于副本被封闭在线程内,因此其他线程不会在迭代期间对其进行修改。这样就避免了抛出 ConcurrentModificationException(但是在克隆的过程中仍然需要对容器加锁)。

在克隆容器时存在 显著的性能开销,这种方式的好坏取决于多个因素:容器的大小在每个元素上执行的工作迭代操作相对于容器其他操作的调用频率,以及在响应时间和吞吐量等方面的要求。

5.1.3 隐藏迭代器

虽然加锁可以防止 迭代器抛出 ConcurrentModificationException,但是关键点在于 所有 对 共享容器 进行 迭代 的地方都需要加锁。

实际情况更加复杂,因为在某些情况下,迭代器会隐藏起来,如 程序清单5-6 中的 HiddenIterator 所示。

在 HiddenIterator 中没有显示的 迭代操作,但在粗体标出的代码中将执行 迭代操作。 编译器将 字符串的链接操作转换为调用 StringBuilderappend(Object) ,这个方法又会调用容器的 toString 方法标准容器的 toString 方法将迭代容器,并在每个元素上调用 toString 来生成容器内容的格式化表示。

程序清单5-6 隐藏在字符串中的迭代操作(不要这么做)

// 通过 toString 隐藏的调用容器的迭代器
public class HiddenIterator {
    @GuardedBy("this")
    private final Set<Integer> set = new HashSet<>();

    public synchronized void add(Integer integer) {
        set.add(integer);
    }

    public synchronized void remove(Integer integer) {
        set.remove(integer);
    }

    public void addTenThings() {
        Random random = new Random();
        for (int i = 0; i < 10; i++) {
            add(random.nextInt());
        }
        // 在这里打印集合将迭代集合中的每个元素,并对每个元素调用 toString() 方法
        System.out.println("Debug: added ten elemnts to " + set);
    }

    public static void main(String[] args) {
        HiddenIterator hi = new HiddenIterator();
        hi.addTenThings();
    }
}

addTenThings()方法可能会抛出 ConcurrentModificationException,因为在生成调试消息的过程中,toString 对容器进行迭代。

真正的问题在于 HiddenIterator 不是线程安全的,在使用 println 中的 set 之前必须先获取 HiddenIterator 的锁,但在调试代码和日志代码中通常会忽视这个要求。

这里得到的教训是:一个状态与保护它的同步代码之间相隔的距离越远,开发人员就越容易忘记在访问状态时使用正确的同步。如果 HiddenIterator 用 synchronizedSet 来包装 HashSet 并对同步代码进行封装,那么就不会发生这种错误。

正如封装对象的状态有助于维持 不变性条件 一样,封装对象的同步机制 同样有助于确保实施同步策略。

容器的 hashCode() 和 equals() 等方法也会间接地执行迭代操作,当容器作为另一个容器的元素或键值时,就会出现这种情况。

同样: contaisAll、removeAll、retainAll 等方法,以及把容器作为参数的构造函数,都会对容器进行迭代。

所有这些间接的迭代操作都可能抛出 ConcurrentModificationException

【本小节总结:】

疑问:

当我运行 程序清单5-6时,运行了多次,甚至将循环的次数扩大到了100,000 ,但是仍然没有出现并发修改异常,是因为我只是模拟了单线程的情况吗?

那么问题来了,怎样模拟多线程?我知道的是使用线程池多开几个线程然后进行操作。

【于是将代码改为使用线程池模拟多线程环境,错误出现了:

    public static void main(String[] args) {
        HiddenIterator hi = new HiddenIterator();
        ExecutorService executorService = Executors.newFixedThreadPool(6);
        executorService.execute(() -> hi.addTenThings());
        executorService.execute(() -> hi.addTenThings());
        executorService.execute(() -> hi.addTenThings());
        executorService.execute(() -> hi.addTenThings());
    }

image-20200405232241151

所以还是得在多线程环境下对程序进行测试。】

5.2 并发容器

Java 5.0 提供了多种 并发容器类 来改进 同步容器 的性能。 同步容器将所有对容器状态的访问都 串行化 。以实现它们的线程安全性。 这种方法的代价是严重降低并发性,当多个线程竞争容器的锁时,吞吐量将严重降低

另一方面,并发容器是针对多个线程并发访问设计的。 在Java 5.0 中增加了 ConcurrentHashMap,用来替代基于散列的 HashMap。增加了 CopyOnWriteArrayList,用于在遍历操作为主的情况下替代同步的 List。

在新的 ConcurrentMap 接口中增加了对一些常见复合操作的支持:例如 "若没有则添加"、替换以及有条件的删除等。

使用并发容器来替代同步容器,可以极大地提高伸缩性降低风险

Java 5.0 增加了两种新的容器类型 :QueueBlockingQUeue

Queue 用来临时保存一组等待处理的元素。它提供了几种实现:

Queue 上的操作不会发生阻塞,如果队列为空,那么获取元素的操作将返回空值。虽然可以用 List 来模拟 Queue 的行为 —— 事实上,正是通过 LinkedList 来实现 Queue 的,但还需要一个 Queue 类,因为它能去掉 List 的随机访问需求,从而实现更高效的并发

BlockingQueue 扩展Queue,增加了可阻塞的插入和获取等操作。 如果队列为空, 那么获取元素的操作将一直阻塞,直到队列中出现一个可用的元素。 如果队列已满(对于有界队列来说),那么插入元素的操作将一直阻塞,直到队列中出现一个可用的空间。

在 "生产者——消费者" 这种设计模式中,阻塞队列是非常有用的。 [5.3节](#5.3 阻塞队列和生产者 — 消费者模式) 将会详细介绍。

正如 ConcurrentHashMap 用于代替 HashMap 一样, Java6 也引入了 ConcurrentSkipListMapConcurrentSkipListSet,分别作为同步的 SortedMapSortedSet 的并发替代品(例如用 synchronizedMap 包装的 TreeMapTreeSet

5.2.1 ConcurrentHashMap

同步容器类在执行每个操作期间都持有一个锁。【<---先给出了一个重要的结论】 在一些操作中,例如 HashMap.getList.contains ,背后可能包含大量的工作当遍历散列桶或链表来查找某个特定的对象化时,必须在许多元素上调用 equals (并且 equals 本身还包含着一定的计算量)。 在基于散列的容器中,如果 hashCode 不能很均匀地分布散列值,那么容器中的元素不会均匀地分布在整个容器中

某些情况下,某个糟糕的散列函数还会把一个散列表变成线性链表。当遍历很长的链表并且在某些 或者 全部 元素上调用 equals 方法时,会花费很长的时间,而其他线程在这段时间内都不能访问该容器。

【使用同步容器类在并发环境下的窘境,所以 JDK5的时候引入了并发包,引入了并发容器,这是并发容器为什么出现的技术背景】

与 HashMap 一样,ConcurrentHashMap 也是一个 基于散列 的Map,但是它使用了一种完全不同的加锁策略来提供更高的并发性伸缩性

ConcurrentHashMap 并不是将每个方法都在同一个锁上同步,并使得每次都只能有一个线程访问容器。 而是使用了一种更细粒度的加锁机制来实现更大程度的共享,这种机制称为 分段所(Lock Stripinmg,参见 [11.4.3]() 节)

在这种分段锁机制中,任意数量的 读取线程 可以 并发地 访问 Map,执行 读取操作 的线程和执行 写入操作 的线程可以并发地访问 Map,并且一定数量的写入线程可以并发地修改 Map。

ConcurrentHashMap 带来的结果是:在并发环境下将实现更高的吞吐量,而在单线程环境中只损失非常小的性能。

ConcurrentHashMap 与其他并发容器一起增强了 同步容器类 :

尽管有这些改进,但仍然需要一些权衡的因素。 对于一些需要在整个 Map 上进行计算的方法,例如 sizeisEmpty这些方法的语义被略微减弱了,以反映容器的并发特性

由于 size返回的结果在计算时可能已经过期了,它实际上只是一个估计值,因此允许 size 返回一个近似值而不是 精确值。 虽然这看上去令人有些不安,但事实上, sizeisEmpty这样的方法在并发环境下的用处很小,因为它们的返回值总在不断地变化。因此,这些操作的需求被弱化了,以换取对其他更重要操作的性能优化:比如 get、put、containsKey 和 remove 等。

【这就是典型的根据场景,决定需求,然后进行 trade-off 取舍,舍弃相对不重要的东西,强化使用的更多,更重要的操作,这设计理念太清晰了。】

ConcurrentHashMap 中没有实现对 Map 的加锁以提供独占访问。 在 HashtablesynchronizedMap,获得 Map 的锁能防止其他线程访问这个Map【互斥】。

在一些不常见的情况中需要这种功能,例如:通过原子方式添加一些映射,或者对 Map 迭代若干次并在此期间保持元素顺序相同。然而,总体来说这种权衡还是合理的,因为并发容器的内容会持续变化。<----【这是并发容器要面对的场景的核心特性】

5.2.2 额外的原子 Map 操作

由于 ConcurrentHashMap 不能被加锁来执行独占访问,因此我们无法使用客户端加锁来创建新的原子操作,例如 [4.4.1]() 节中对 Vector 增加原子操作 "若没有则添加"、"若相等则移除(Remove-If-Equal)"和 "若相等则替换(Replace-If-Equal)"等,「都已经实现为原子操作并且在 ConcurrentMap 的接口中声明」。<----【 感觉这一句翻译的不太好,我看了一下原文和源码,应该改为 "都已经在 ConcurrentMap 接口中声明并实现为原子操作" 更好一些】

【并且回顾了一下 4.4.1 其实只有一个 putIfAbsent 也就是 若没有则添加 的操作实现了,另外两个好像没找到。】

程序清单 5-7 所示,如果你需要在现有的 同步Map 中添加这样的功能,那么很可能就意味着应该考虑使用 ConcurrentMap了。

这是 ConcurrentMap 的接口方法列表,确实包含上述的 "若没有则添加(putIfAbsent),若相等则移除(remove),若相等则替换(replace)" 从它们的注释中可以清晰的看到,有这样的描述「Removes the entry for a key only if currently mapped to a given value.」

确实是一个先观察后执行的操作。

【↑ ---- 一部分是我根据书中内容找到原文和源码的探寻,目的是为了求真和印证,同时也是加强学习的过程】

程序清单 5-7 ConcurrentMap 接口 --> 其实就是一个 ConcurrentMap 的 JavaDoc 的链接

// 以下是从源码中拷贝出了几个书中介绍的方法,有一些 1.8 新加的 defaultMethods 没有复制过来,还是建议大家直接去看源码

public interface ConcurrentMap<K, V> extends Map<K, V> {

    /**
     * {@inheritDoc}
     *
     * @implNote This implementation assumes that the ConcurrentMap cannot
     * contain null values and {@code get()} returning null unambiguously means
     * the key is absent. Implementations which support null values
     * <strong>must</strong> override this default implementation.
     *
     * @throws ClassCastException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     * @since 1.8
     */
    @Override
    default V getOrDefault(Object key, V defaultValue) {
        V v;
        return ((v = get(key)) != null) ? v : defaultValue;
    }

    /**
     * If the specified key is not already associated
     * with a value, associate it with the given value.
        当没有 K 值时 才插入
     */
     V putIfAbsent(K key, V value);

    /**
     * Removes the entry for a key only if currently mapped to a given value.
     * This is equivalent to
             若一致则移除
     */
    boolean remove(Object key, Object value);

    /**
     * Replaces the entry for a key only if currently mapped to a given value.
     * This is equivalent to
         仅当Key 被映射到  oldValue 才被替换为 newValue 
         也就是 Key 和 OldValue 的值要一致?
     */
    boolean replace(K key, V oldValue, V newValue);

    /**
     * Replaces the entry for a key only if currently mapped to some value.
       仅当 Key 被映射为 某个值时 才替换为 newValue
     */
    V replace(K key, V value);
}

5.2.3 CopyOnWriteArrayList

CopyOnWriteArrayList 用于替代同步的 List在某些情况下它提供了更好的并发性能,并且在迭代期间 不需要对容器进行「加锁」或 「复制」。(类似地,CopyOnWriteArraySet 的作用是替代同步的 Set)

"写入时复制 (Copy-On-Write)" 容器的线程安全性在于,只要正确地发布一个 「事实不可变的对象」,那么在访问该对象时就不再需要进一步的使用同步手段。

在每次修改时,都会创建并重新发布一个新的容器副本,从而实现可变性。<---【之前没有看过这个源码,但是看这个原理描述,感觉跟不可变类 String 挺像的,String,每次返回的都是一个新的实例,而已经构造好的是无法改变的。】

"写入时复制" 容器的迭代器保留一个指向底层 基础数组的引用,这个数组当前位于 迭代器的 起始位置,由于它不会被修改,因此在对其进行同步时只需要确保数组内容的 可见性。

因此,多个线程可以同时对这个容器进行迭代,而不会彼此干扰或者与修改容器的线程相互干扰。

"写入时复制" 容器返回的迭代器不会抛出 ConcurrentModificationException,并且返回的元素与迭代器创建时的元素完全一致,而不必考虑之后修改操作带来的影响。

显然,每当「修改容器」时都会「复制底层数组」,这需要一定的「开销」,特别是当「容器的规模较大」时。仅当迭代操作远远多于修改操作时,才应该使用 CopyOnWrite 容器。

这个准则很好地描述了许多事件通知系统:在分发通知时,需要迭代已注册监听器的链表,并调用链表内的每一个监听器,在大多数情况下,注册和注销事件监听器的操作远少于接收事件通知的操作。(关于"写入时复制"的更多信息参见。[CPJ 2.4.4]。)

5.3 阻塞队列和生产者 — 消费者模式

阻塞队列提供了可阻塞的 put 和 take 方法,以及支持定时的 offer 和 poll 方法。 如果队列已经满了,那么 put 方法将阻塞直到有空间可用; 如果队列为空,那么 take 方法将阻塞 直到有元素可取。

队列可以是有界的也可以是无界的,无界队列永远都不会被填充满,因此 无界队列上的 put 方法永远不会阻塞。

阻塞队列支持 "生产者——消费者" 这种设计模式,该模式将 "找出需要完成的工作" 与 "执行工作" 这两个过程分离开,并把工作项放入一个 "待完成" 列表中以便在随后处理,而不是找出后 立即处理。

"生产者——消费者" 模式能简化开发过程,因为它消除了生产者类和消费者类之间的代码依赖性【解耦】,此外该模式还将生产数据的过程与使用数据的过程 解耦 以简化 工作负载 的管理,因为生产和消费 这两个过程在处理数据的速率上有所不同。

在基于 阻塞队列 构建的 "生产者——消费者" 设计中,当数据生成时,生产者把数据放入队列,当消费者准备处理数据时,从队列中获取数据。

生产者不需要知道消费者的标识或数量,或者它们是否是唯一的生产者,它们只需要将数据放入队列即可。

同样 ,消费者也不需要知道生产者是谁,或者工作来自何处,只需要从队列中取出数据进行处理即可。

BlockingQueue 简化了 "生产者——消费者" 设计的实现过程,它支持任意数量的生产者和消费者。一种最常见的 "生产者——消费者" 模式就是 「线程池与工作队列的组合」,在 Excutor 任务执行框架中就体现出了这种模式,这也是 [第6章]() 和 [第8章]() 的主题。

以两个人洗盘子为例,二者的劳动分工也是一种 "生产者——消费者" 模式:

在这个例子中,盘架相当于 「阻塞队列」,如果盘架上没有盘子,那么消费者会一直等待,直到有盘子需要烘干。 如果盘架放满了,那么生产者会停止清洗,直到盘架上又空间来摆放盘子。

我们可以将这种类比扩展为 多个生产者(虽然可能存在对 「水槽」 的竞争) 和多个消费者,每个工人只需要与盘架打交道。人们不需要知道究竟有多少 生产者 或 消费者,或者谁生产了某个指定的工作项。

"生产者" 和 "消费者" 的角色是相对的,某种环境中的消费者在另一种不同的环境中可能成为生产者。

烘干盘子的工人将"消费" 洗干净的湿盘子,从而生产烘干的盘子。第三个人把洗干净的盘子整理好,在这种情况中,烘干盘子的人既是消费者,也是生产者,从而就有了两个共享的工作队列。(每个队列都可能阻塞 烘干 工作的运行)。

阻塞队列简化了消费者程序的编码【通过数据结构的特性,减少了逻辑代码的编写】,因为 take 操作会一直阻塞到有可用的数据。

如果生产者不能尽快地产生工作项,使消费者保持忙碌,那么消费者就只能一直等待,直到有工作可做。

在某些情况下,这种方式是非常合适的(例如在服务器应用程序中,没有任何客户请求服务时,则一直闲置),而在其他一些情况下,也表示需要调整生产者线程数量和消费者线程数量之间的比率,从而实现更高的资源利用率。(例如:爬虫 WebCrawler 或其他应用程序中,有无穷的工作需要完成)

如果生产者完成工作的速率比消费者处理工作的速率快,那么工作项会在队列中积累起来,最终 耗尽内存。 同样,put 方法的阻塞特性也极大地简化了生产者的编码。如果使用 「有界队列」,那么当队列充满时,生产者将阻塞并且不能继续工作,而消费者就有时间来赶上工作处理进度。

阻塞队列同样提供了一个 offer 方法,如果数据项不能被添加到 队列中,那么将返回一个 失败 状态。

这样你就能够创建更多的灵活策略来处理负荷过载的情况,例如:

在构建「高可靠」的应用程序时,有界队列是一种强大的「资源管理工具」:它们能抑制并防止产生过多的工作项,使应用程序在「过载」的情况下变得更加「健壮」。

虽然 "生产者——消费者" 模式能够将 生产者和消费者之间的代码通过队列进行解耦,但是它们的行为仍然也会通过共享工作队列间接的耦合在一起。 【解决了直接耦合,产生了间接耦合。】

开发人员总会假设消费者处理工作的速率能赶上生产者生成工作项的速率,因此通常不会为工作队列的大小设置 边界。 但这将导致之后需要重新设计系统架构。【因为生产速率和消费速率可能并不如我们设想的那样,所以需要从设计层面来认真考虑是否需要引入 阻塞队列 并明确设置其边界大小】

因此应该尽早地通过阻塞队列在设计中构建资源管理机制 —— 这就事儿做的越早,就越容易。在许多情况下,「阻塞队列」能使这项工作更加简单,如果阻塞队列不完全符合设计需求,那么还可以通过 「信号量」Semaphore 来创建其他的 「阻塞数据结构」。参见 [5.5.3节](#5.5.3 信号量)。

在类库中包含了 BlockingQueue 的多种实现,其中 LinkedBlockingQueue 和 ArrayBlockingQueue 是 FIFO 队列【先进先出】,二者分别与 LinkedList 和 ArrayList 类似,但比同步 List 拥有更好的并发性能。

PriorityBlockingQueue 是一个按优先级排序的队列,当你希望按照某种顺序而不是 先进先出 来处理元素时,这个队列将非常有用。【比如 VIP 用户的排队插队场景】

正如其他有序的容器一样,PriorityBlockingQueue 既可以根据元素的 「自然顺序」 来比较元素(如果它们实现了 Comparable 方法),也可以使用 Comparator 来比较。 <----【实现 Comparable 方法与 使用 Comparator 比较的区别?】

最后一个 BlockingQueue 的实现是 SynchronousQueue ,实际上它不是一个真正的队列,因为它不会为队列中的元素维护存储空间。与其他队列不同的是,它维护 「一组线程」,这些线程在等待着把元素加入或移出队列。

如果以洗盘子作为比喻,那么SynchronousQueue 相当于没有盘架,而是直接将洗好的盘子放入下一个空闲的烘干机中。【也就是没有工作队列,而是多个操作员?】

这种实现队列的方式看似很奇怪,但由于可以直接交付工作,从而降低了将数据从生产者移动到消费者的延迟。(在传统的队列中,在一个工作单元可以 交付之前,必须通过 「串行」 方式首先完成 入列 [Enqueue] 或者出列[Dequeue] 等操作) 【也就是跟队列有一个互动,而这种 SynchronousQueue 则去掉了队列,没有中间商赚差价?233】 直接交付方式还会将更多关于人物状态的信息反馈给 「生产者」。 当交付被接受时,它就知道生产者已经得到了任务,而不是简单地把任务放入了一个队列 —— 这种区别就好比将文件直接交给同事,或者是将文件通过邮箱发给同事,并希望她能尽快拿到文件。【一个是及时的,一个是异步的。】

因为 SynchronousQueue 没有存储功能,因此 put 和 take 会一直阻塞【相当于管道一样?直接把生产者和消费者对接,当有数据产生则直接发送,否则就一直阻塞】,直到另一个线程已经准备好参与到交付过程中。

仅当有足够多的消费者,并且总是有一个消费者准备好获取交付的工作时,才适合使用「同步队列」<---【使用场景,但是光这一句话还是太单一了,我需要对其进行扩充,因为我也没有在实际工作中使用过这种同步队列】

5.3.1 示例:桌面搜索

有一种类型的程序适合被分解为 生产者 和 消费者,例如 代理程序,它将扫描本地驱动器上的文件并建立索引以便随后进行搜索,类似于某些桌面搜索程序或者 Windows 索引服务。

程序清单 5-8 DiskCrawler 中给出了一个生产者任务,即在某个文件层次结构中搜索 符合索引标准的文件,并将它们的文件名 放入 「工作队列」。而且 Indexer 中还给出了一个消费者任务,从队列中取出文件名称并对它们建立索引。

程序清单 5-8 桌面搜索应用程序中的生产者任务和消费者任务

// 一个关于 桌面搜索程序的对生产者和消费者的应用实例
public class ProducerConsumer {

    /**
     * 用来抓取桌面文件内容的生产者
     */
    static class FileCrawler implements Runnable {
        // 阻塞队列,用于存储要索引的文件
        private final BlockingQueue<File> fileQueue;
        // 文件的过滤器,判断是否要将文件放入队列
        private final FileFilter fileFilter;
        // 具体的被处理的队列中的生产资源,在这个例子中是file
        private final File root;

        public FileCrawler(BlockingQueue<File> fileQueue, FileFilter fileFilter, File root) {
            this.fileQueue = fileQueue;
            this.root = root;
            // 重写了 accept 方法
            this.fileFilter = f -> f.isDirectory() || fileFilter.accept(f);
        }

        // 返回是否已经对文件进行索引,但是这里没有进行判断,直接返回的就是 false
        private boolean alreadyIndexed(File file) {
            return false;
        }

        @Override
        public void run() {
            try {
                crawl(root);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }

        /**
         * 将桌面中的所有 文件放入 fileQueue 中
         *
         * @param root
         * @throws InterruptedException
         */
        private void crawl(File root) throws InterruptedException {
            File[] entries = root.listFiles(fileFilter);
            if (entries != null) {
                for (File entry : entries) {
                    if (entry.isDirectory()) {
                        crawl(entry);
                    }
                    // 但是这里 返回的肯定是 false 然后 变成 true 进入 put 逻辑
                    else if (!alreadyIndexed(entry)) {
                        fileQueue.put(entry);
                    }
                }
            }
        }
    }

    // 消费者,获取队列中的元素,这里是文件,然后对其进行索引(方法逻辑未实现,只是列出其逻辑)
    static class Indexer implements Runnable {
        private final BlockingQueue<File> queue;

        public Indexer(BlockingQueue<File> queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                while (true) {
                    indexFile(queue.take());
                }
            } catch (InterruptedException e) {
                // 发生异常就使线程停止
                Thread.currentThread().interrupt();
            }
        }

        public void indexFile(File file) {
            // 对文件进行索引编号的方法,这里书中并没有实现具体逻辑
        }
    }

}

生产者 — 消费者 模式提供了一种 适合线程的方法将 桌面搜索问题分解为 更简单的组件。 将 「文件遍历」 与建立索引等功能分解为独立的操作,比将所有功能都放到一个操作中实现有着更高的 「代码可读性」 和 「可重用」性:每个操作只需要完成一个任务,并且阻塞队列将负责所有的控制流,因此每个功能的代码都更加简单清晰。

【也就是将逻辑控制完全交给了阻塞队列,从而使 生产 和 消费 2个动作独立,各自逻辑自己处理,分解成2个独立的块,更加清晰,这部分设计的意图我弄明白了,这是 生产者—消费者模式在代码设计上带来的好处

生产者 — 消费者 模式同样能带来许多「性能优势」。 生产者 — 消费者 可以并发地执行。如果一个是 I/O 密集型,另一个是 CPU 密集型,那么并发执行的吞吐率要高于 串行执行的吞吐率。

如果生产者和消费者的「并行度」不同,那么将它们紧密耦合在一起会把整体并行度降低为二者中更小的并行度。

【↑也就是木桶理论,性能最差的部分决定整个程序的性能】

程序清单 5-9 中启动了多个爬虫程序和索引建立程序,每个程序都在各自的线程中运行。前面提到过,消费者线程永远不会退出,因而程序无法终止, [第7章]() 将介绍多种技术来解决这个问题。<---【并发中的终止问题】

虽然这个示例使用了 显示管理 的线程,但许多 生产者—消费者 的设计也可以通过 Executor 任务执行框架来实现,其本身使用的也是 生产者 — 消费者模式。 【 使用线程池 Executor 也间接的使用了 生产者 — 消费者 模式】

程序清单 5-9 启动桌面搜索

 // 设定队列的界限
    private static final int BOUND = 10;

    // 设置消费者的数量,这里是获取当前运行环境的CPU核心数并将其设置为消费者数量
    private static final int N_CONSUMERS = Runtime.getRuntime().availableProcessors();

    public static void startIndexing(File[] roots) {
        BlockingQueue<File> queue = new LinkedBlockingQueue<>(BOUND);

        FileFilter filter = pathname -> true;

        // 生产者,将文件放入 queue 中,填充要标记的文件队列
        for (File root : roots) {
            // 每个 roots 分配一个线程用来填充要进行标记的文件队列
            new Thread(new FileCrawler(queue, filter, root)).start();
        }

        // 消费者,消费队列中的文件,对其进行 indexFile() 标记方法
        for (int i = 0; i < N_CONSUMERS; i++) {
            new Thread(new Indexer(queue)).start();
        }
    }

【这里只是对 之前 5-8 的节选,也就是 startIndexing() 方法的代码。 所以 5-8,5-9 加起来是一个完整的例子,5-8 是生产者—消费者 2部分的代码,5-9 是启动搜索的代码。 】

5.3.2 串行线程封闭

java.util.concurrent Java 并发包中实现的各种阻塞队列 都包含了足够的 内部同步机制,从而安全地将对象从 「生产者线程」 发布到 「消费者线程」。

对于可变对象, 生产者—消费者 这种设计与阻塞队列一起,促进了线程封闭,从而将对象所有权从生产者交付给消费者。

线程封闭对象只能被单个线程持有,但可以通过安全地 「发布」 该对象来 "转移" 对象的所有权。 在转移所有权之后也只有另一个线程能获得这个对象的访问权限,并且发布对象的线程不会再访问它。

这种安全的发布确保了对象状态对于 新的所有者 来说是可见的,并且由于最初的所有者不会再访问它,因此对象被封闭在新的 线程中。

新的所有者线程可以任意对该对象进行修改,因为它具有 独占 的访问权。

「对象池」 利用了 串行线程封闭,将对象 "借给" 一个请求线程。 只要对象池内包含足够的 内部同步 来安全地发布池中的对象,并且只要客户代码本身不会发布池中对象,或者在将对象返回给对象池之后就不再使用它,那么就可以安全地在线程之间传递所有权。

【↑ 这段话有点绕的,作者陈述了线程之间安全传递对象的前提,里面包含了好几个要点,需要总结出来】

我们也可以使用其他发布机制来传递 可变对象 的所有权,但必须确保只有一个线程能接受被转移的对象。<---【基本条件】 阻塞队列简化了这项工作,除此之外,还可以通过 ConcurrentMap 的原子方法 remove 或者 AtomicReference 的原子方法 compareAndSet 来完成这项工作。<---【这里只给出了结论,同样缺少了具体案例,需要我们自己进行补充】

5.3.3 双端队列与工作密取

Java 6 增加了两种容器类型 ,Deque(发音为 "deck")和 BlockingDeque ,它们分别对 QueueBlockingQueue 进行了扩展。

Deque 是一个 「双端队列」,实现了在队列头和队列尾的高效插入和移除【也就是高效修改操作】,具体实现包括 ArrayDequeLinkedBlockingDeque

正如阻塞队列适用于 "生产者—消费者" 模式,所有消费者有一个共享的工作队列,而在 「工作密取」设计中,每个消费者都有各自的双端队列。<---【从共享变为了独享】

如果一个消费者完成了自己双端队列中的全部工作,那么它可以从其他消费者双端队列末尾 「秘密地获取工作」。<---【说明消费者可以获取其他消费者队列的末尾的数据】

在 「大多数时候」,消费者只访问自己的双端队列,从而极大地减少了竞争。 当工作者线程需要访问另一个队列时,它会从队列的「尾部」 而不是 「头部」获取工作,因此进一步降低了队列上的竞争程度。

「工作密取」非常适用于既是消费者,也是生产者的问题 —— 当执行某个工作时可能导致出现更多的工作。

例如: 在网页爬虫程序中处理一个页面时,通常会发现有更多的页面需要处理。类似的还有很多 图 搜索的算法,例如在 垃圾回收阶段对 堆进行标记,都可以通过 工作密取 机制来实现 「高效并行」。 当一个工作线程找到新的任务单元时,它会将其放到自己队列的末尾(或者在工作共享设计模式中,放入其他工作者线程队列中)。

当「双端队列」 为空时,它会在另一个线程的队列队伍查找新的任务,从而确保每个线程都保持忙碌状态。【这个设计可以很好的提高线程的利用率】

【这一节的信息量很大,引入了很多数据结构和概念名词,例子却少了点,所以有很多是需要自己去寻找和探索来对文章进行补充的,因为光靠概念是没办法深入学习的,talk is cheap,但是第一遍的时候我不打算去做这件事,先过一遍,第二遍才是深度学习与精度】

5.4 阻塞方法与中断方法

线程可能会阻塞或暂停执行,原因有多种:

线程阻塞时,它通常被「挂起」,并处于某种『阻塞状态』**(BLOCK、WAITING 或 TIMED_WAITING)。阻塞操作与执行时间很长的普通操作的差别在于 : 被阻塞的线程必须等待某个不受它控制**的事件发生后 才能继续执行,例如「等待 I/O 操作完成」,「等待某个锁可用」,或者「等待外部计算的结束」。

当某个外部事件发生时,线程被置回 RUNABLE 可运行状态,并可以再次被调度执行。

BlockingQueueputtake 等方法会抛出 受检查异常Checked ExceptionInterruptedException,这与类库中其他一些方法的做法相同,例如 Thread.sleep

当某方法抛出 InterruptedException 时,表示该方法是一个阻塞方法,如果这个方法被中断,那么它将努力提前结束 「阻塞状态」。<---【阻塞方法的定义,比如 Thread.slee

Thread 提供了 interrupt 方法,用于中断线程或者查询线程是否已经被中断。 每个线程都有一个布尔类型的属性,表示线程的中断状态,当中断线程时,将设置这个状态。

中断是一种「协作机制」,一个线程不能强制其他线程停止正在执行的操作而去执行其他的操作。<---【也就是只能建议,而不能强制停止】

当 「线程A」 中断 「线程B」 时,A 仅仅是要求 B 在执行到某个可以暂停的地方停止正在执行的操作 —— 前提是 「线程B」 愿意停止下。虽然在 API 或者 语言规范中并没有为中断定义任何特定应用级别的语义,但最常使用中断的情况就是 「取消某个操作」。

方法对中断请求的响应度越高,就越容易及时取消那些执行时间很长的操作。<---【那么方法的响应度是可以设置的吗? 总是把话说一半。。。】

当在代码中调用了一个将抛出 InterruptedException 异常的方法时,你自己的方法也就变成了一个阻塞方法,并且必须要处理对中断的响应,对于库代码来说,有两种基本选择:

程序清单 5-10 恢复中断状态以屏蔽中断:

// Restoring the interrupted status so as not to swallow the interrupt
// 恢复中断状态以避免中断被屏蔽
public class TaskRunnable implements Runnable {

    BlockingQueue<Task> queue;

    @Override
    public void run() {
        try {
            processTask(queue.take());
        } catch (InterruptedException e) {
            // 恢复被中断的状态 将调用者线程的中断状态设为true。
            Thread.currentThread().interrupt();
        }
    }

    // 对Task进行处理的业务逻辑代码
    void processTask(Task task) {
        // 处理 task
    }

    interface Task {
    }
}

还可以采用一些更复杂的终端处理方法,但是上述两种方法已经可以应付大多数情况了。 然而在出现 InterruptedException不应该做的事情是:捕获它但是不做出任何响应。【也就是异常的侵吞,这种操作在任何时候应该都是不推荐的】。这将使调用栈更上层的代码无法对中断采取处理措施,因为线程被中断的证据已经丢失。

只有在一种特殊的情况下才能屏蔽中断 —— 对 Thread 进行扩展,并且能够调用栈上更高层的代码。 [第7章]() 将进一步介绍 「取消」 和 「中断」 等操作。

5.5 同步工具类

在容器类中,「阻塞队列」是一种独特的类:它们不仅作为保存对象的容器,还能协调生产者和消费者等线程之间的控制流。【也就是除了容器功能还带有逻辑控制功能】,因为 take 和 put 方法将阻塞,直到队列到达期望的状态。(队列既非空,也非满)。

同步工具类可以是任何一个对象,只要它根据自身的状态来协调线程的控制流。<---【这里任何一个对象 怎么理解?不是就那几个容器类吗?】

阻塞队列可以作为同步工具类,其他类型的同步工具类还包括:

在 JDK 中还包含其他一些同步工具类的类,如果这些类无法满足需要,那么可以按照 [第14章]() 中给出的机制来创建自己的同步工具类。

所有的同步工具类都包含一些特定的「结构化属性」:它们封装了一些状态,这些状态将决定执行同步工具类的线程是继续执行还是等待,此外还提供了一些方法对「状态」进行操作,以及另一些方法用于高效地等待同步工具类进入到 「预期状态。」

5.5.1 闭锁

「闭锁」是一种「同步工具类」,可以延迟线程的进度直到其到达终止状态(delay the progress of threads until it reaches its terminal state)。[CPJ 3.4.2] <--- 【这里的CPJ 指的是什么?】

闭锁的作用相当于一扇门:在闭锁到达结束状态之前,这扇门一直是关闭的,并且没有任何线程能够通过,当到达结束状态时,这扇门会打开并允许所有线程通过。

当闭锁到达结束状态后,将不会再改变状态,因此这扇门将永远保持打开状态。【那这就是一次性的?】

闭锁可以用来确保某些活动直到其他活动都完成才继续执行<---【 闭锁的使用场景】 例如:

CountDownLatch 是一种灵活的 「闭锁实现」,可以在上述情况中使用,它可以使一个或多个线程等待一组事件发生。

闭锁状态包括一个「计数器」,该计数器被初始化为一个正数,表示需要等待的事件数量。 countDown 方法递减计数器,表示有一个事件已经发生了,而 await 方法等待计数器到达零,这表示所有需要等待的事件都已经发生。

如果计数器的值非零,那么 await 会一直阻塞直到计数器为零,或者等待中的线程中断,或者等待超时

在程序清单 5-11 的 TestHarness 中给出了 闭锁的两种常见用法TestHarness 创建一定数量的线程,利用它们并发地执行指定的任务。它使用两个闭锁,分别表示 "起始门(Starting Gate)""结束门(Ending Gate)"

起始门计数器的初始值为1,而结束门计数器的初始值为 「工作线程」 的数量。 每个工作线程首先要做的事情就是在起始门上等待,从而确保所有线程都就绪后才开始执行。

而每个线程要做的最后一件事情就是调用 结束门 的 countDown 方法使计数器 -1 ,这能使主线程高效地等待直到所有工作线程都之心古玩城,因此可以统计所消耗的时间。

程序清单 5-11 在计时测试中使用 CountDownLatch 来启动和停止线程:

// Using CountDownLatch for starting and stopping threads in timing tests
// 使用 CountDownLatch 来计算 方法执行的耗时
public class TestHarness {
    public long timeTask(int nThreads, final Runnable task) throws InterruptedException {
        // 起始闭锁 初始值为1
        final CountDownLatch startGate = new CountDownLatch(1);
        // 结束闭锁 初始值为传入的线程值
        final CountDownLatch endGate = new CountDownLatch(nThreads);

        // 根据传入的线程数 循环执行task 每次执行 endGate 都会 -1
        for (int i = 0; i < nThreads; i++) {
            Thread t = new Thread() {
                @Override
                public void run() {
                    try {
                        startGate.await();
                        try {
                            task.run();
                        } finally {
                            endGate.countDown();
                        }

                    } catch (InterruptedException e) {
                        // 啥也不做
                    }
                }
            };
            // 使线程开始执行
            t.start();
        }
        long start = System.nanoTime();
        // 起始闭锁 -1 ,闭锁结束
        startGate.countDown();
        endGate.await();
        long end = System.nanoTime();
        return end - start;
    }
}

为什么要在 TestHarness 中使用闭锁而不是在线程创建后就立即启动? 或许我们希望测试 N 个线程并发执行某个任务需要的时间。

如果在创建线程后立即启动它们,那么先启动的线程将 "领先" 后启动的线程,并且活跃线程数量会随着时间的推移而增加或减少,竞争程度也在不断发生变化。

「启动门 」 startGate 将使得 主线程能够同时释放所有工作线程,而 「结束门」 endGate 则使主线程能够等待最后一个线程执行完成,而不是顺序地等待每个线程执行完成。 <---【这个对程序的分析值得好好细品,因为之前从未使用过 CountDownLatch,第一次接触难免有些懵逼】

5.5.2 FutureTask

FutureTask」 也可以用做闭锁(FutureTask 实现了 Future语义,表示一种抽象的可生成结果的计算 )。

FutureTask 表示的计算是通过 Callable 回调来实现的,相当于一种可生成结果的 Runnable,并且可以处于以下 3种状态:

"执行完成" 表示计算的所有可能结束方式,包括正常结束、由于取消而结束和由于异常而结束等。

当 FutureTask 进入完成状态后,它会永远停止在这个状态上。

Future.get 的行为取决于任务的状态。如果任务已经完成,那么 get 会立即返回结果,否则 get 将阻塞直到任务进入完成状态,然后返回结果或者抛出异常。

FutureTask 将计算结果从「执行计算的线程」传递到「获取这个结果的线程」,而 FutureTask 的规范确保了这种传递过程能实现结果的安全发布。 <--- 【那么 FutureTask 的规范是什么?】

FutureTask 在 Executor 框架中表示异步任务,此外还可以用来表示一些时间较长的计算,这些计算可以在使用计算结果之前启动。

程序清单 5-12 中的 Preloader 就使用了 FutureTask 来执行一个高开销的计算,并且计算结果将在稍后使用。

通过提前启动计算,可以减少等待结果时需要的时间:

程序清单 5-12 使用 FutureTask 来提前加载稍后需要的数据

public class PreLoader {

    ProductInfo loadProductInfo() throws DataLoadException {
        return null;
    }

    // 使用 Future 提前调用 长耗时的方法 loadProductInfo()
    private final FutureTask<ProductInfo> future = new FutureTask<>(new Callable<ProductInfo>() {
        @Override
        public ProductInfo call() throws Exception {
            return loadProductInfo();
        }
    });

    // FutureTask 继承于 Runnable
    private final Thread thread = new Thread(future);

    public void start() {
        thread.start();
    }

    public ProductInfo get() throws InterruptedException, DataLoadException {
        try {
            return future.get();
        } catch (ExecutionException e) {
            Throwable cause = e.getCause();
            if (cause instanceof DataLoadException) {
                throw (DataLoadException) cause;
            } else {
                throw LaunderThrowable.launderThrowable(cause);
            }
        }
    }

    interface ProductInfo {

    }
}

class DataLoadException extends Exception {
}

PreLoader 创建了一个 FutureTask,其中包含从数据库加载产品信息的任务,以及一个执行运算的线程。

由于在构造函数或静态初始化方法中启动线程并不是一个好方法,因此提供了一个 start 方法来启动线程。

当程序随后需要 ProductInfo 时,可以调用 get 方法,如果数据已经加载,那么将返回这些数据,否则将等待加载完成后再返回。

Callable 表示的任务可以抛出 受检查的或未受检查 的异常,并且任何代码都可能抛出一个 Error。无论任务代码抛出什么异常,都会被封装到一个 ExecutionException 中,并在 Future.get 中被重新抛出

这将使调用 get 的代码变得复杂,因为它不仅需要处理可能出现的 ExecutionException(以及未检查的 CancellationException) 而且还由于 ExecutionException 是作为一个 Throwable 类返回的,因此处理起来并不容易。 【怎么理解?】

Preloader 中,当 get 方法抛出 ExecutionException时,可能是以下三种情况之一:

我们必须对每种情况进行单独处理,但我们将使用 程序清单 5-13 中的 launderThrowable 辅助方法来封装一些 复杂的异常处理逻辑。

在调用 launderThrowable 之前, Preloader 会首先检查已知的受检查异常,并重新抛出它们。剩下的是未检查异常,Preloader 将调用 launderThrowable 并抛出结果。

如果 Throwable 传递给 launderThrowable 的是一个 Error,那么 launderThrowable 将直接再次抛出它;如果不是,RuntimeException,那么将抛出一个 IllegalStateException 表示这是一个逻辑错误。 剩下的 RuntimeExceptionlaunderThrowable 将把它们返回给调用者,而调用者通常会重新抛出它们。

【这一大段都是对这个异常处理逻辑的描述,其实就是分情况就行了3个if判断然后进行处理】

程序清单 5-13 强制将未检查的Throwable 转为 RuntimeException

public class LaunderThrowable {

    /**
     * Coerce an unchecked Throwable to a RuntimeException
     * 如果 ThrowAble 是一个 Error 则抛出,如果是 RuntimeException 直接返回,否则抛出 IllegalStateException 异常
     * <p/>
     * If the Throwable is an Error, throw it; if it is a
     * RuntimeException return it, otherwise throw IllegalStateException
     */
    public static RuntimeException launderThrowable(Throwable t) {
        if (t instanceof RuntimeException)
            return (RuntimeException) t;
        else if (t instanceof Error)
            throw (Error) t;
        else
            throw new IllegalStateException("Not unchecked", t);
    }
}

5.5.3 信号量

计数信号量Counting Semaphore) 用来控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量。计数信号量还可以用来实现某种资源池,或者对容器施加边界。

Semaphore 中管理者一组虚拟的许可(permit),许可的初始数量可以通过构造函数来指定。在执行操作时可以首先获得许可(只要还有剩余的许可),并在使用以后释放许可。如果没有许可,那么 acquire 将阻塞直到有许可(或者直到被中断或者操作超时)。

release 方法将返回一个许可给信号量。

计算信号量的一种简化形式是 「二值信号量」,即初始值为 1 的Semaphore。 二值信号量可以用作互斥体(mutex),并具备不可重入锁的语义:谁拥有这个唯一的许可,谁就拥有了互斥锁。

Semaphore 可以用于实现 资源池,例如数据库连接池。

我们可以构造一个固定长度的资源池,当池为空时,请求资源将会失败,但你真正希望看到的行为是阻塞而不是失败,并且当池变为非空状态时解除阻塞。

如果将 Semaphore 的计数值初始化为池的大小,并且从池中获取一个资源之前先调用 acquire 方法获取一个许可,在将资源返回给池之后调用 release 释放许可,那么 acquire 将一直阻塞直到资源池不为空。

[第12章]() 有界缓冲类中将使用这项技术。(在构造阻塞对象池时,一种更简单的方法是使用 BlockingQueue 来保存池的资源)

同样,你也可以使用 Semaphore 将任何一种容器变成 「有界阻塞容器」, 如 程序清单 5-14 中的 BoundedHashSet 所示,信号量的计数值会初始化为容器容量的最大值。 add 操作在向底层容器中添加一个元素之前,首先要获取一个许可。如果 add 操作没有添加任何元素,那么会立刻释放许可。

同样 remove 操作释放一个许可,使更多的元素能够添加到容器中。底层的 Set 实现并不知道关于边界的任何信息,这是由 BoundedHashSet 来处理的。

程序清单 5-14 使用 Semaphore 为容器设置边界:

// 使用信号量 Semaphore 给 容器设置边界
public class BoundedHashSet<T> {
    private final Set<T> set;
    private final Semaphore sem;

    public BoundedHashSet(int bound) {
        this.set = Collections.synchronizedSet(new HashSet<>());
        // 设置边界
        sem = new Semaphore(bound);
    }

    public boolean add(T o) throws InterruptedException {
        // 信号量 +1
        sem.acquire();
        boolean wasAdded = false;
        try {
            wasAdded = set.add(o);
            return wasAdded;
        } finally {
            // 如果没有成功保存元素,则信号量 -1
            if (!wasAdded) {
                sem.release();
            }
        }
    }

    public boolean remove(Object o) {
        boolean wasRemoved = set.remove(o);

        if (wasRemoved) {
            // 信号量 -1
            sem.release();
        }
        return wasRemoved;
    }

    public static void main(String[] args) throws InterruptedException {
        BoundedHashSet<String> strSet = new BoundedHashSet<>(3);
        strSet.add("1");
        strSet.add("2");
        strSet.add("3");
        System.out.println(strSet.set);
        System.out.println(strSet.sem.toString());
        strSet.add("4");
    }
}
/**
输出
[1, 2, 3]
java.util.concurrent.Semaphore@6f94fa3e[Permits = 0]
*/

【可以看到,当添加3个元素之后, 到达了上线,此时 permits = 0,再添加元素 也无法添加进去了 同时线程进入阻塞状态。】

5.5.4 栅栏

我们已经看到通过闭锁来启动一组相关的操作,或者等待一组相关的操作结束。 闭锁是一次性对象,一旦进入终止状态,就不能被重置。

栅栏(Barrier) 类似于闭锁,它能阻塞一组线程直到某个事件发生。

栅栏与闭锁的关键区别在于:所有线程都必须「同时」到达栅栏位置,才能继续执行。 闭锁用于等待事件,而栅栏用于等待其他线程。

栅栏用于实现一些协议,例如:几个家庭决定在某个地方集合"所有人决定6:00 在 麦当劳碰头到了以后要等其他人,等所有人到齐之后再讨论下一步要做的事情"。

CylicBarrier 可以使一定数量的参与方 反复地在栅栏位置汇集,它在并行迭代算法中非常有用:这种算法通常将一个问题拆分成一系列相互独立的 子问题。当有线程都到达了栅栏位置时将调用 await 方法,这个方法将阻塞直到 所有线程都到达栅栏位置。

如果所有线程都到达了额 栅栏位置,那么栅栏将打开,此时所有线程都被释放,而栅栏将被重置 以便下次使用。

如果对 await 的调用超时,那么 await 阻塞的线程被中断,那么栅栏就认为被打破,所有阻塞的 await 调用都将终止并抛出 BrokenBarrierException

如果成功地通过 栅栏,那么 await 将为每个线程返回一个唯一的到达索引号,我们可以利用这些索引来 "选举" 产生一个领导线程,并在下一次迭代中由该领导线程执行一些特殊的工作。

CyclicBarrier 还可以让你将一个栅栏操作传递给构造函数,这是一个 Runnable ,当成功通过栅栏时会在一个子任务线程中执行它,但在阻塞线程被释放之前,该任务是不能执行的。

在模拟程序中通常需要使用栅栏,例如某个步骤中的计算可以 「并行执行」 但必须等到该步骤中的所有计算都执行完毕才能进入下一个步骤。

例如,在 n-body 粒子模拟系统中,每个步骤都根据其他粒子的位置和属性来计算各个粒子的新位置。通过在每两次更新之间等待栅栏,能够确保在 第 k 步中的所有更新操作都已经计算完毕,才进入 第 k+1 步。

在程序清单 5-15 的 CelluarAutomata 中给出了如何通过栅栏来计算细胞的自动化模拟,例如 Conway 的生命游戏。

在对模拟过程并行化时,为每个元素(在这个示例中相当于一个细胞)分配一个独立的线程是不现实的,因为这将产生过多的线程,而在协调这些线程上导致的开销将降低计算性能。

合理的做法是:将问题分解成一定数量的 子问题,为每个字问题分配一个线程来进行求解,之后再将所有的结果合并起来。

CellularAutomate 将问题分解为 Ncpu 个子问题,其中 Ncpu 等于当前环境下的可用 CPU 数量,并将给每个子问题分配一个线程。

【在这种不涉及 I/O 操作 或共享数据访问的计算问题中,当线程数量为 CPU数量 或者 CPU + 1 的数量时将获得最优的吞吐量。 更多的线程不会带来任何帮助,甚至在某种程度上降低性能,因为多个线程会在 CPU 和 内存等资源上发生竞争。】

在每个步骤中,工作线程都为各自子问题中的所有细胞计算新值。当所有工作线程都到达栅栏时,栅栏会把这些新值提交给数据模型。

在栅栏的操作执行完以后,工作线程将开始下一步的计算,包括调用 isDone 方法来判断是否需要进行下一次迭代。

程序清单 5-15 通过 CyclicBarrier 协调细胞自动衍生系统中的计算:

// 使用栅栏协调计算细胞衍生系统
public class CellularAutomate {
    private final Board mainBoard;
    private final CyclicBarrier barrier;
    private final Worker[] workers;

    public CellularAutomate(Board board) {
        this.mainBoard = board;
        int count = Runtime.getRuntime().availableProcessors();
        this.barrier = new CyclicBarrier(count, new Runnable() {
            @Override
            public void run() {
                mainBoard.commitNewValues();
            }
        });
        // 有几个 CPU 就创建一个对应数量的 worker 数组 用来真正的执行计算
        this.workers = new Worker[count];
        for (int i = 0; i < count; i++) {
            // 构造 worker 对象
            workers[i] = new Worker(mainBoard.getSubBoard(count, i));
        }
    }

    /**
     * 真正执行计算的类
     */
    private class Worker implements Runnable {
        private final Board board;

        public Worker(Board board) {
            this.board = board;
        }

        @Override
        public void run() {
            while (!board.hasConverged()) {
                for (int x = 0; x < board.getMaxX(); x++) {
                    for (int y = 0; y < board.getMaxY(); y++) {
                        board.setNewValue(x, y, computeValue(x, y));
                    }
                }
            }
        }

        private int computeValue(int x, int y) {
            // 一个根据 x,y值 计算新的 value 的业务逻辑方法
            return 0;
        }
    }

    // 启动所有 worker
    public void start() {
        for (int i = 0; i < workers.length; i++) {
            new Thread(workers[i]).start();
        }
        mainBoard.waitForConvergence();
    }

    interface Board {
        int getMaxX();

        int getMaxY();

        int getValue(int x, int y);

        int setNewValue(int x, int y, int value);

        void commitNewValues();

        boolean hasConverged();

        //Convergence -> 自动收敛的点
        void waitForConvergence();

        // Partitions -> 隔离物
        Board getSubBoard(int numPartitions, int index);
    }
}

【讲道理,第一次接触,觉得还是挺复杂的。需要多回顾多思考几遍,然后跑起来看看结果。】

另一种形式的栅栏是 Exchanger,它是一种两方(Two-Party)栅栏,各方在栅栏位置上交换数据。当两方执行不对称操作时, Exchanger 会非常有用。

例如当一个线程向缓冲区写入数据,而另一个线程从缓冲区读取数据。这些线程可以使用 Exchanger 来汇合,并将写满的缓冲区与空的缓冲区交换。当两个线程通过 Exchanger 交换对象时,这种交换就把这两个对象安全地发布给另一方。

数据交换的时机取决于应用程序的想要需求。最简单的方案是:当缓冲区被填满时,由填充任务进行交换,当缓冲区为空时,由清空任务进行交换。

这样会把需要交换的次数降至最低。

但如果新数据的到达率不可预测,那么一些数据的处理过程就将延迟。另一个方法是:不仅当缓冲区被填满时进行交换,并且当缓冲区被填充到一定程度并保持一定的时间后,也进行交换。

【上面这段话,太抽象了如果没有使用过且没有写过例子,那理解起来很困难。所以作为第一次接触大量并发工具的初学者,需要放低心态,同时多写例子,将理论与实际场景关联】

5.6 构建高效且可伸缩的结果缓存

Todo:

疑问:

  1. [5.2.1中](#5.2.1 ConcurrentHashMap)尽管有这些改进,但仍然需要一些权衡的因素。 对于一些需要在整个 Map 上进行计算的方法,例如 sizeisEmpty ,这些方法的语义被略微减弱了,以反映容器的并发特性。

【这句话怎么理解?怎样把这句话转换为我自己的理解,什么是语义被略微减弱了,怎样减弱的?

  1. 方法对中断请求的响应度越高,就越容易及时取消那些执行时间很长的操作。<---【那么方法的响应度是可以设置的吗? 总是把话说一半。。。】

  2. 所有的同步工具类都包含一些特定的「结构化属性」:它们封装了一些状态,这些状态将决定执行同步工具类的线程是继续执行还是等待,此外还提供了一些方法对「状态」进行操作,以及另一些方法用于高效地等待同步工具类进入到 「预期状态。」 <----【这里的预期状态怎么理解?】

  3. 「闭锁」是一种「同步工具类」,可以延迟线程的进度直到其到达终止状态。[CPJ 3.4.2] <--- 【这里的CPJ 指的是什么?】