penglongli / blog

18 stars 1 forks source link

ReentrantLock、Condition 介绍 #115

Open penglongli opened 6 years ago

penglongli commented 6 years ago

直接上例子,我们用 ReentrantLock 实现一个简单的“FIFO 的阻塞队列”

FiFoBlockingQueue.java

public class FiFoBlockingQueue<E> {

    private Object[] elems = null;
    private int index = 0;
    private int removeIndex = 0;
    private int count = 0;

    private final Lock lock = new ReentrantLock();
    private final Condition notEmpty = lock.newCondition();
    private final Condition notFull = lock.newCondition();

    public FiFoBlockingQueue(int capacity) {
        this.elems = new Object[capacity];
    }

    public void add(E elem) throws InterruptedException {
        final Lock lock = this.lock;
        lock.lock();
        while (count == elems.length) {
            notFull.await();
        }

        if (index == elems.length) {
            index = 0;
        }
        elems[index++] = elem;
        count++;

        notEmpty.signal();
        lock.unlock();
    }

    public E remove() throws InterruptedException {
        final Lock lock = this.lock;
        lock.lock();
        while (count <= 0) {
            notEmpty.await();
        }

        if (removeIndex == elems.length) {
            removeIndex = 0;
        }
        @SuppressWarnings("unchecked")
        E elem = (E) elems[removeIndex];
        elems[removeIndex++] = null;
        count--;

        notFull.signal();
        lock.unlock();
        return elem;
    }

}

Message.java

public class Message {

    private String phone;

    private String msg;

    Message(String phone, String msg) {
        this.phone = phone;
        this.msg = msg;
    }

    public String getPhone() {
        return phone;
    }

    public void setPhone(String phone) {
        this.phone = phone;
    }

    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }
}

Producer.java

public class Producer extends Thread {

    private final FiFoBlockingQueue<Message> queue;

    Producer(FiFoBlockingQueue<Message> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            for (int i = 0; i < 20; i++) {
                Message message = new Message(
                        Integer.toString((int)(Math.random() * 9999999)),
                        "Message " + i
                );
                System.out.println("[Producer] phone=" + message.getPhone()
                        + ", message=" + message.getMsg());

                queue.add(message);
            }
            queue.add(null);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Consumer.java

public class Consumer extends Thread {
    private final FiFoBlockingQueue<Message> queue;

    Consumer(FiFoBlockingQueue<Message> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Message message = queue.remove();
                if (null == message) {
                    break;
                }
                // 模拟消费时间
                Thread.sleep(1000);

                System.out.println("[Consumer: "+
                        Thread.currentThread().getName() +"]: phone=" +
                        message.getPhone() + ", message=" +
                        message.getMsg()
                );
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

Test.java

public class Test {

    static final FiFoBlockingQueue<Message> queue = new FiFoBlockingQueue<>(10);

    public static void main(String[] args) {
        Producer producer = new Producer(queue);
        Consumer c1 = new Consumer(queue);
        Consumer c2 = new Consumer(queue);

        producer.start();
        c1.start();
        c2.start();
    }
}

我们借助于 Condition 来判断队列是否为空、是否满,在为空的时候通知 Producer 生产,在满的时候通知 Consumer 消费。

关于 Condition 原理,参考: