Open keyfall opened 3 years ago
并发性和并行性都是关于同时执行任务,而不是按顺序执行。 并发性:多个任务不在同一时间点进行,由于cpu调度器实现线程进出的快速切换,所以看上去是同时执行 并行性:多个任务在同一时间点进行,在多核cpu上实现两个任务同时进行
python通过线程模块支持多线程编程,线程模块公开了一个Thread类,封装了 一个执行线程 1.锁(lock)对象对于同步受保护的共享资源的访问很有用,以及与其类似的RLock对象. 2.条件(condition)对象,对线程在等待任意条件时进行同步很有用的。 3.事件(event)对象,它在线程之间提供了基本的信号机制 4.信号量(semaphore)对象,它允许一组固定的线程相互等待,同步到一个特定的状态,接着继续往下执行. 5.界线(barrier)对象,它允许一组固定的线程相互等待,同步到一个特定的状态,接着继续往下执行
def thumbnail_image(url):
print(url)
img_urls = []
# 普通
for url in img_urls:
thumbnail_image(url)
# 多线程
import threading
for url in img_urls:
t = threading.Thread(target=thumbnail_image(), args=(url, ))
t.start()
现实生活中,img_urls不会写成固定列表,需要从数据库或者根据条件生成出来,生成的url进行处理。 这里为每一个url创建一个线程是对资源浪费,写一些方法重用创建的线程 对于这样的系统,包括一组生产数据的线程和另一组消费或处理数据的线程,生产者/消费者模型是最理想的选择: 1.生产者是用来专门生产数据的工作者(线程)类,可以从一个特定的源接收数据或者自己生产数据 2.生产者将数据添加到共享的同步队列中,在python中,这个队列由适当命名的队列模块里的队列类提供 3.另一组专门的工作者类,即消费者类,在队列上等待(消费)数据,一旦它们获得了数据,就会处理并产生结果 4.当生产者停止生产数据并且消费者缺乏数据时,程序就结束了。像超时,轮询这样的技术可以用来实现程序的终止 这里有几个例子,讲解生产者/消费者架构,多线程使用 生产者类: 生成不同大小,前景和背景颜色的url图像 有一个run方法,在一个循环产生一个随机的图像url,将其放到queue,然后休息配置的sleep_time秒 有一个stop方法,把flag变成False,导致循环终止,这个stop一般是通过主线程在外部调用
import threading
import time
import string
import random
import urllib.request
from queue import Queue
from PIL import Image
class ThumbnailURL_Generator(threading.Thread):
def __init__(self, queue, sleep_time=1):
self.sleep_time = sleep_time
self.queue = queue
# A flag for stopping
self.flag = True
# choice of sizes
self._sizes = (240, 320, 360, 480, 600, 720)
# URL scheme
self.url_template = 'https://dummyimage.com/%s/%s/%s.jpg'
threading.Thread.__init__(self, name='producter')
def __str__(self):
return 'Producter'
def get_size(self):
return '%dx%d' % (random.choice(self._sizes),
random.choice(self._sizes))
def get_color(self):
return ''.join(random.sample(string.hexdigits[:-6], 3))
def run(self):
while self.flag:
url = self.url_template % (self.get_size(),
self.get_color(),
self.get_color())
self.queue.put(url)
time.sleep(self.sleep_time)
def stop(self):
self.flag = False
消费者类 这是从队列中取出url,进行创建缩略图 run方法是在一个循环中红,从队列获取url,然后通过thumbnail_image转换成缩略图
class ThumbnailURL_Consumer(threading.Thread):
def __init__(self, queue):
self.queue = queue
self.flag = True
threading.Thread.__init__(self, name='consumer')
def __str__(self):
return 'Consumer'
def thumbnail_image(self, url, size=(64, 64), format='.png'):
im = Image.open(urllib.request.urlopen(url))
filename = url.split('/')[-1].split('.')[0] + '_thumb' + format
im.thumbnail(size, Image.ANTIALIAS)
im.save(filename)
print(self, 'Saved', filename)
def run(self):
while self.flag:
url = self.queue.get()
print(self, 'Got', url)
self.thumbnail_image(url)
def stop(self):
self.flag = False
运行: 这三段代码都在一个文件里
q = Queue(maxsize=200)
producters, consumers = [], []
for i in range(2):
t = ThumbnailURL_Generator(q)
producters.append(t)
t.start()
for i in range(2):
t = ThumbnailURL_Consumer(q)
consumers.append(t)
t.start()
启动显示
上面的没有停止条件,会一直运行,直到网络请求被拒绝或超市,或者由于缩略图太多,磁盘耗尽
实现计数器的同步单元将限制创建的图像数量
import threading
import time
import string
import random
import urllib.request
from queue import Queue
import uuid
from PIL import Image
import glob
class ThumbnailURL_Generator(threading.Thread):
def __init__(self, queue, sleep_time=1):
self.sleep_time = sleep_time
self.queue = queue
# A flag for stopping
self.flag = True
# choice of sizes
self._sizes = (240, 320, 360, 480, 600, 720)
# URL scheme
self.url_template = 'https://dummyimage.com/%s/%s/%s.jpg'
threading.Thread.__init__(self, name='producter')
def __str__(self):
return 'Producter'
def get_size(self):
return '%dx%d' % (random.choice(self._sizes),
random.choice(self._sizes))
def get_color(self):
return ''.join(random.sample(string.hexdigits[:-6], 3))
def run(self):
while self.flag:
url = self.url_template % (self.get_size(),
self.get_color(),
self.get_color())
print(self, 'Put', url)
self.queue.put(url)
time.sleep(self.sleep_time)
def stop(self):
self.flag = False
class ThumbnailImageSaver(object):
def __init__(self, limit=10):
self.limit = limit
self.lock = threading.Lock()
self.counter = {}
def thumbnail_image(self, url, size=(64, 64), format='.png'):
im = Image.open(urllib.request.urlopen(url))
pieces = url.split('/')
filename = ''.join((pieces[-2], '_', pieces[-1].split('.')[0], '_thumb', format))
im.thumbnail(size, Image.ANTIALIAS)
im.save(filename)
print('Save', filename)
self.counter[filename] = 1
return True
def save(self, url):
while self.lock:
if len(self.counter) >= self.limit:
return False
self.thumbnail_image(url)
print('Count=>', len(self.counter))
return True
class ThumbnailURL_Consumer(threading.Thread):
def __init__(self, queue, saver):
self.queue = queue
self.flag = True
self.saver = saver
self._id = uuid.uuid4().hex
threading.Thread.__init__(self, name='Consumer-' + self._id)
def __str__(self):
return "Consumer-" + self._id
def run(self):
while self.flag:
url = self.queue.get()
print(self, 'Got', url)
if not self.saver.save(url):
print(self, 'Set limit reached, quitting')
break
def stop(self):
self.flag = False
q = Queue(maxsize=2000)
saver = ThumbnailImageSaver(limit=100)
producers, consumers = [], []
for i in range(3):
t = ThumbnailURL_Generator(q)
producers.append(t)
t.start()
for i in range(5):
t = ThumbnailURL_Consumer(q, saver)
consumers.append(t)
t.start()
for t in consumers:
t.join()
print("Joined", t, flush=True)
while not q.empty():
item = q.get()
for t in producers:
t.stop()
print('Stopped', t, flush=True)
print("Total number of PNG image", len(glob.glob('*.png')))
运行结果
创建了一个新的ThumbnailImageSaver类实例,并在创建时将其传递给消费者线程 这里消费者使用join(),然后当达到限制时,消费者函数会自动退出 消费者退出后,再退出生产者函数
信号量用大于0的值初始化: 1.当一个线程调用获得一个具有正内部值的信号量时,该值会减1,并且线程会继续前进 2.当另一个线程调用释放这个信号量时,值会增加1 3.当值达到0时,任何线程调用获得的线程都被阻塞,直到它被另一个调用释放的线程唤醒
import threading
import time
import string
import random
import urllib.request
from queue import Queue
import uuid
from PIL import Image
import glob
class ThumbnailURL_Generator(threading.Thread):
def __init__(self, queue, sleep_time=1):
self.sleep_time = sleep_time
self.queue = queue
# A flag for stopping
self.flag = True
# choice of sizes
self._sizes = (240, 320, 360, 480, 600, 720)
# URL scheme
self.url_template = 'https://dummyimage.com/%s/%s/%s.jpg'
threading.Thread.__init__(self, name='producter')
def __str__(self):
return 'Producter'
def get_size(self):
return '%dx%d' % (random.choice(self._sizes),
random.choice(self._sizes))
def get_color(self):
return ''.join(random.sample(string.hexdigits[:-6], 3))
def run(self):
while self.flag:
url = self.url_template % (self.get_size(),
self.get_color(),
self.get_color())
print(self, 'Put', url)
self.queue.put(url)
time.sleep(self.sleep_time)
def stop(self):
self.flag = False
class ThumbnailImageSemaSaver(object):
def __init__(self, limit=10):
self.limit = limit
self.counter = threading.BoundedSemaphore(value=limit)
self.count = 0
def acquire(self):
return self.counter.acquire(blocking=False)
def release(self):
return self.counter.release()
def thumbnail_image(self, url, size=(64,64), format='.png'):
im = Image.open(urllib.request.urlopen(url))
pieces = url.split('/')
filename = ''.join((pieces[-2], '_', pieces[-1].split('.')[0], '_thumb', format))
try:
im.thumbnail(size, Image.ANTIALIAS)
im.save(filename)
print('Save', filename)
self.count += 1
except Exception as e:
print('Error saving URL', url, )
self.release()
return True
def save(self, url):
if self.acquire():
self.thumbnail_image(url)
return True
else:
print('Semaphore limit reached, returning False')
return False
class ThumbnailURL_Consumer(threading.Thread):
def __init__(self, queue, saver):
self.queue = queue
self.flag = True
self.saver = saver
self._id = uuid.uuid4().hex
threading.Thread.__init__(self, name='Consumer-' + self._id)
def __str__(self):
return "Consumer-" + self._id
def run(self):
while self.flag:
url = self.queue.get()
print(self, 'Got', url)
if not self.saver.save(url):
print(self, 'Set limit reached, quitting')
break
def stop(self):
self.flag = False
q = Queue(maxsize=2000)
saver = ThumbnailImageSemaSaver(limit=100)
producers, consumers = [], []
for i in range(3):
t = ThumbnailURL_Generator(q)
producers.append(t)
t.start()
for i in range(5):
t = ThumbnailURL_Consumer(q, saver)
consumers.append(t)
t.start()
for t in consumers:
t.join()
print("Joined", t, flush=True)
while not q.empty():
item = q.get()
for t in producers:
t.stop()
print('Stopped', t, flush=True)
print("Total number of PNG image", len(glob.glob('*.png')))
使用的是threading.BoundedSemaphore这个类来进行限制
把限制数limit通过value=limit
方式传给BoundedSemaphore
acquire()方法就会给limit-1,
release()方法就会给limit+1
这里每次consumer调用save方法后,就会调用self.acquire(),这时limit-1,if验证self.acquire()返回值>0(为True)
那么就会去调用thumbnail_image(url)去进行缩略图片
里面有个try-except,如果报错了,那么就使用release方法把刚才使用acquire的limit-1,再给加回来
线程中同步单元应用-条件对象 在生产者/消费者系统中,会出现3中情况: 1.生产者比消费者快,生产者产生的多余数据在队列中累积,导致每个循环队列都会消耗较多的内存和CPU使用 2.消费者比生产者快,消费者会保持闲置 3.生产者和消费者以同样速度工作,理想状态
解决方法: 1.固定队列大小 2.为工作者提供超市设定后其他职责 3.动态配置工作者类的数量:按需求自动增加或减少工作者类池的大小,如果生产者类多了,系统启动相同数量的消费者类保持平衡 4.调整数据生产速率:静态或动态调整生产者的数据生成速率
这里使用方法4 条件对象:它有一个隐含的内置锁,这个锁在线程wait时打开,然后等待任意一个条件,直到这个条件变成True为止
xx:公有变量 _xx:前置单下划线,私有化属性或方法,一般来讲,变量名_xx被看作是“私有 的”,在模块或类外不可以使用。当变量是私有的时候,用_xx 来表示变量是很好的习惯。类对象和子类可以访问,这并不能完全做到真正的私有,只是约定俗成的而已,这样写表示不希望这个变量在外部被直接调用 xx:前置双下划线,私有化属性或方法,无法在外部直接访问(名字重整所以访问不到,只能是允许这个类本身进行访问了。连子类也不可以) xx:前后双下划线,系统定义名字(这就是在python中强大的魔法方法),因为变量名xxx_对Python 来说有特殊含义,对于普通的变量应当避免这种命名风格。 xx:后置单下划线,用于避免与Python关键词的冲突
还可以通过外部访问,只是约定俗成的,所以是伪私有(直接访问__xx会报错,python把这个名称变成了_类型__xx了) 单下划线和双下划线完整版
dict的内在
dict对象的实现 dict源码解析
set集合的操作方法和dict一样,都是将实际值放入list中,唯一不同的在于hash函数操作的对象,对于dict,hash函数操作的是其key,而对于set是直接操作的它的元素