Open zgq105 opened 4 years ago
什么是队列? 队列是一种先进先出(FIFO)数据结构。在现实生活中也是很常见,比如在食堂打饭排队的例子就是一个队列。 作用? 队列的作用就是让工作流程有条不紊的按照顺序进行作业。
在java中Queue接口是顶级接口,定义了add、offer、remove、poll、element、peek基础接口,其父类是集合Collection接口。
BlockingQueue是一个阻塞队列接口,所谓的阻塞队列就是线程安全的队列,可以使用在多线程环境,其实现类包括如下:
ArrayBlockingQueue(类):ArrayBlockingQueue是通过数组方式实现的线程安全的阻塞队列,查看源码可以知道内部是通过ReentrantLock来保证线程安全的。
DelayQueue(类):DelayQueue翻译过来叫延迟队列,是一个无界的BlockingQueue(内部动态扩充容量),通过ReentrantLock+PriorityQueue保证线程安全、动态扩容和延迟获取队列中元素等操作。应用场景像铁道部买火车票下单30分钟未支付的订单处理、缓存超期清理等等。实例代码如下:
import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit;
public class DelayQueueTest { public static void test() { DelayQueue delayQueue = new DelayQueue(); TaskItem item = new TaskItem(1000, "1"); delayQueue.add(item); delayQueue.add(new TaskItem(2000, "2")); delayQueue.add(new TaskItem(3000, "3")); delayQueue.add(new TaskItem(4000, "4")); while (true) { try { System.out.println(delayQueue.take().getName());
} catch (Exception e) { e.printStackTrace(); } } }
}
class TaskItem implements Delayed {
private String name; private long startTime = System.currentTimeMillis(); private long endTime; public TaskItem(long time, String name) { endTime = time + startTime; this.name = name; } public String getName() { return name; } public void setName(String name) { this.name = name; } /** * 获取延迟时间 * * @param unit * @return */ @Override public long getDelay(TimeUnit unit) { return endTime - System.currentTimeMillis(); } @Override public int compareTo(Delayed o) { TaskItem taskItem = (TaskItem) o; return (int) (this.getDelay(TimeUnit.MILLISECONDS) - taskItem.getDelay(TimeUnit.MILLISECONDS)); }
- **LinkedBlockingQueue**(类):LinkedBlockingQueue是一个单链表实现的阻塞队列,如果未指定队列容量,它是一个无界的队列。内部通过**ReentrantLock**实现线程安全。 - **PriorityBlockingQueue**(类):PriorityBlockingQueue是一个支持优先级的无界阻塞队列(数组实现,内部支持动态扩容)。队列中元素默认是自然顺序(即升序);也可以实现接口Comparator进行自定义排序。实例代码如下:
import java.util.Comparator; import java.util.Iterator; import java.util.concurrent.PriorityBlockingQueue;
public class PriorityBlockingQueueTest {
public static void test() { PriorityBlockingQueue priorityBlockingQueue = new PriorityBlockingQueue(); priorityBlockingQueue.add(1); priorityBlockingQueue.add(4); priorityBlockingQueue.add(2); priorityBlockingQueue.add(0); System.out.println("head:" + priorityBlockingQueue.peek());//输出0 Iterator iterator = priorityBlockingQueue.iterator(); while (iterator.hasNext()) { System.out.println(iterator.next());//默认升序依次输出:0,1,2,4 } } public static void test2() { PriorityBlockingQueue<Person> priorityBlockingQueue = new PriorityBlockingQueue(10, new Person()); priorityBlockingQueue.add(new Person(10, "张三")); priorityBlockingQueue.add(new Person(7, "李四")); priorityBlockingQueue.add(new Person(12, "王五")); System.out.println("head:" + priorityBlockingQueue.peek().getName());//输出王五 Iterator iterator = priorityBlockingQueue.iterator(); while (iterator.hasNext()) { Person person = (Person) iterator.next(); System.out.println(person.getName());//依次输出:王五,李四,张三 } }
class Person implements Comparator { private int age; private String name;
public Person() { } public Person(int age, String name) { this.name = name; this.age = age; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public int compare(Object o1, Object o2) { Person p1 = (Person) o1; Person p2 = (Person) o2; return p2.getAge() - p1.getAge();//降序 }
- **SynchronousQueue**(类):SynchronousQueue是一种没有缓冲的队列,生产者生产数据之后会直接被消费者消费。SynchronousQueue内部是通过CAS保证线程安全,其提供的方法基本是线程阻塞的,比如put、offer、take,实例代码如下:
package BlockingQueue;
import java.util.concurrent.SynchronousQueue;
public class SynchronousQueueTest {
public static void test() { SynchronousQueue synchronousQueue = new SynchronousQueue(); //生产者 new Thread(new Runnable() { @Override public void run() { try { synchronousQueue.put("dd"); } catch (Exception e) { e.printStackTrace(); } } }).start(); //消费者 new Thread(new Runnable() { @Override public void run() { try { Object res = synchronousQueue.take(); System.out.println(res);//输出:dd } catch (Exception e) { e.printStackTrace(); } } }).start(); }
### 2.1.2 双端队列Deque(接口) 双端队列Deque是指接口支持两种方式来存储数据,即队列(Queue)和栈(Stack )。当Deque作为队列使用时,数据按先进先出(FIFO )的方式存储;当Deque作为栈使用时,数据按后进先出(LIFO )的方式存储。具体如下图所示: 作为**队列**时: ![image](https://user-images.githubusercontent.com/17380142/74077877-970fd580-4a5e-11ea-8b61-82fdfe1e267d.png) 作为**栈**时: ![image](https://user-images.githubusercontent.com/17380142/74077884-aee75980-4a5e-11ea-90db-0f04de326a58.png) 接下来看看双端队列的实现类,如下: - **ArrayDeque**(类) :ArrayDeque是通过数组实现的无界双端队列,内部支持动态扩容,线程不安全,同时支持队列和栈的特性。实例代码如下:
public static void test() { ArrayDeque arrayDeque = new ArrayDeque(); arrayDeque.add(2); arrayDeque.add(1); arrayDeque.addFirst(11); for (Object item : arrayDeque) { System.out.println(item);//依次输出11,2,1 } }
- **ConcurrentLinkedDeque**(类):ConcurrentLinkedDeque是一个无界的双端队列,内部使用双链表存储数据,使用CAS保证线程安全,同时支持队列和栈的特性。实例代码如下:
public static void test() { ConcurrentLinkedDeque concurrentLinkedDeque=new ConcurrentLinkedDeque(); concurrentLinkedDeque.add("aa"); concurrentLinkedDeque.add("bb"); concurrentLinkedDeque.addFirst("kk"); for (Object item : concurrentLinkedDeque) { System.out.println(item);//依次输出kk,aa,bb } }
- **LinkedList**(类):LinkedList是一个无界的双端队列,其内部是使用双链表存储数据的,非线程安全的类;同时支持队列和栈的特性。LinkedList是允许添加null类型的元素,示例代码如下:
LinkedList linkedList=new LinkedList(); linkedList.add("1"); linkedList.addFirst("22"); linkedList.add(null); for (Object item:linkedList){ System.out.println(item);//依次输出:22,1,null }
### 2.1.3 阻塞双端队列BlockingDeque(接口) 阻塞双端队列BlockingDeque是线程安全的双端队列。其实现类如下所示: - **LinkedBlockingDeque**(类):LinkedBlockingDeque是一个无界的、阻塞的双端队列;其内部使用双链表存储数据,使用独占锁**ReentrantLock**实现线程安全,同时支持队列和栈的特性。 ### 2.1.4 传输队列TransferQueue(接口) 传输队列TransferQueue是一种适用于数据传输的队列,其实现类如下: - **LinkedTransferQueue**(类):LinkedTransferQueue是一个单链表实现的无界队列,是线程安全的类(内部通过CAS机制保证线程安全)。传输队列LinkedTransferQueue适用于生产者和消费者模式的数据传输。 - **线程阻塞的方法**:transfer、tryTransfer、take - **非线程阻塞的方法**:offer、 put,、add 以简单的实例代码演示阻塞方法:
LinkedTransferQueue linkedTransferQueue = new LinkedTransferQueue(); new Thread(new Runnable() { @Override public void run() { //消费者 try { Object res = linkedTransferQueue.take(); //队列中没有元素,开始阻塞,直到接收到元素为止 System.out.println("=====" ); //不会执行这句代码 } catch (InterruptedException e) { e.printStackTrace(); } } }).start();
# 3. 小结 - **CAS:** CAS是compare and swap的缩写,是一种实现并发编程的算法;其核心思想比较并替换的思想,在java.util.concurrent模块中有大量的并发操作使用了该算法,也是属于乐观锁的一种机制。 - CAS是**sun.misc.Unsafe**中的一部分接口,由JDK底层实现。接下来以AtomicInteger源码中部分代码来介绍下这种思想,如下所示:
private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long valueOffset;
static { try { valueOffset = unsafe.objectFieldOffset (AtomicInteger.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } }
private volatile int value;
**说明**:valueOffset表示value的指针变量,Unsafe中也是通过这个指针获取内存中的值。volatile修饰的变量表示所有线程都具有可见性。
public final boolean compareAndSet(int expect, int update) { return unsafe.compareAndSwapInt(this, valueOffset, expect, update); }
**说明**:expect表示期待的值,update表示需要更新的值,valueOffset是value的指针变量。如果expect等于通过valueOffset获取到的内存值,则把update更新到内存中,同时返回true;反之,则不做任何操作,同时返回false。
1. 什么是队列?作用?
什么是队列? 队列是一种先进先出(FIFO)数据结构。在现实生活中也是很常见,比如在食堂打饭排队的例子就是一个队列。 作用? 队列的作用就是让工作流程有条不紊的按照顺序进行作业。
2. java中的队列
2.1 Queue接口
在java中Queue接口是顶级接口,定义了add、offer、remove、poll、element、peek基础接口,其父类是集合Collection接口。
2.1.1 阻塞队列BlockingQueue(接口)
BlockingQueue是一个阻塞队列接口,所谓的阻塞队列就是线程安全的队列,可以使用在多线程环境,其实现类包括如下:
ArrayBlockingQueue(类):ArrayBlockingQueue是通过数组方式实现的线程安全的阻塞队列,查看源码可以知道内部是通过ReentrantLock来保证线程安全的。
DelayQueue(类):DelayQueue翻译过来叫延迟队列,是一个无界的BlockingQueue(内部动态扩充容量),通过ReentrantLock+PriorityQueue保证线程安全、动态扩容和延迟获取队列中元素等操作。应用场景像铁道部买火车票下单30分钟未支付的订单处理、缓存超期清理等等。实例代码如下:
public class DelayQueueTest { public static void test() { DelayQueue delayQueue = new DelayQueue();
TaskItem item = new TaskItem(1000, "1");
delayQueue.add(item);
delayQueue.add(new TaskItem(2000, "2"));
delayQueue.add(new TaskItem(3000, "3"));
delayQueue.add(new TaskItem(4000, "4"));
while (true) {
try {
System.out.println(delayQueue.take().getName());
}
class TaskItem implements Delayed {
}
import java.util.Comparator; import java.util.Iterator; import java.util.concurrent.PriorityBlockingQueue;
public class PriorityBlockingQueueTest {
}
class Person implements Comparator { private int age; private String name;
}
package BlockingQueue;
import java.util.concurrent.SynchronousQueue;
public class SynchronousQueueTest {
}
public static void test() { ArrayDeque arrayDeque = new ArrayDeque(); arrayDeque.add(2); arrayDeque.add(1); arrayDeque.addFirst(11); for (Object item : arrayDeque) { System.out.println(item);//依次输出11,2,1 } }
public static void test() { ConcurrentLinkedDeque concurrentLinkedDeque=new ConcurrentLinkedDeque(); concurrentLinkedDeque.add("aa"); concurrentLinkedDeque.add("bb"); concurrentLinkedDeque.addFirst("kk"); for (Object item : concurrentLinkedDeque) { System.out.println(item);//依次输出kk,aa,bb } }
LinkedTransferQueue linkedTransferQueue = new LinkedTransferQueue(); new Thread(new Runnable() { @Override public void run() { //消费者 try { Object res = linkedTransferQueue.take(); //队列中没有元素,开始阻塞,直到接收到元素为止 System.out.println("=====" ); //不会执行这句代码 } catch (InterruptedException e) { e.printStackTrace(); } } }).start();
private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long valueOffset;
private volatile int value;
public final boolean compareAndSet(int expect, int update) { return unsafe.compareAndSwapInt(this, valueOffset, expect, update); }