meishaoming / blog

MIT License
1 stars 2 forks source link

python 中的 Queue 与 SimpleQueue #91

Open meishaoming opened 4 years ago

meishaoming commented 4 years ago

SimpleQueue 在 3.7 版本引入

它的几个特点:

  1. 它是一个 FIFO,
  2. 创建的时候不指定队列大小
  3. put 不会阻塞(block和timeout参数不使用)。而对于 Queue 的 put,如果队列已满可以选择等,最多等 timeout 时间
  4. 如果队列为空,get 可以等 timeout 时间
  5. 用一个信号量 threading.Semaphore(0) 来标定队列里是否有数据,每入队一个数据就 release() 一次,每出队一个数据就 acquire 一次
  6. 没有 full() 判断
  7. 缺少 Queue() 中的 task_done() 和 join() 等任务跟踪高级功能

Queue() 里的 task_done() 和 join() 的作用:

想象一下 put() 操作就是往 queue 里放任务,每 put 一次,未完成的任务就加 1 (self.unfinished_tasks += 1)

每次 get() 取出之后,再调用一次 task_done() 告诉 queue 已经完成了一次任务。当把所有任务都作完了之后,未完成任务就是 0 (self.unfinished_tasks = 0)

join() 就是阻塞等待,直到队列里的所有任务都被执行完毕。

举个例子:

有 A、B 两个线程:A 往 q 里添加任务,B 从 q 里取任务出来执行。 要等到任务执行完毕后才能继续做其它事情。那么我们怎么知道任务都被执行完毕了?用 q.join()

附 SimpleQueue 的实现:

import threading
from collections import deque

class Empty(Exception):
    'Exception raised by Queue.get(block=0)/get_nowait()'
    pass

class SimpleQueue:

    def __init__(self):
        self._queue = deque()
        self._count = threading.Semaphore(0)

    def put(self, item, block=True, timeout=None):
        self._queue.append(item)
        self._count.release()

    def get(self, block=True, timeout=None):
        if timeout is not None and timeout < 0:
            raise ValueError("'timeout' must be a non-negative number")
        if not self._count.acquire(block, timeout):
            raise Empty
        return self._queue.popleft()

    def put_nowait(self, item):
        return self.put(item, block=False)

    def get_nowait(self):
        return self.get(block=False)

    def empty(self):
        return len(self._queue) == 0

    def qsize(self):
        return len(self._queue)