BruceChen7 / gitblog

My blog
6 stars 1 forks source link

fluent-python #69

Open BruceChen7 opened 1 year ago

BruceChen7 commented 1 year ago

参考资料

named tuple

>>> Card = collections.namedtuple('Card', ['rank', 'suit'])
>>> beer_card = Card('7', 'diamonds')
>>> beer_card
Card(rank='7', suit='diamonds')

FrenchDeck

Card = collections.namedtuple('Card', ['rank', 'suit'])
class FrenchDeck:
    ranks = [str(n) for n in range(2, 11)] + list("JQKA")
    suits = 'spades diamonds clubs hearts'.split()
    def __init__():
        self._cards=[Card(rank, suit) for suit in self.suits
                                      for rank in self.rands]
    def __len__(self):
        return len(self._cards)
    def __getitem__(self, position):
        return self._cards[position]

>>> deck = FrenchDeck()
>>> len(deck)
52
# 能随意的选择卡牌
>>> from random import choice
>>> choice(deck)
Card(rank='3', suit='hearts')
Category Method names
String/bytes representation __repr__, __str__, __format__, __bytes__
Conversion to number __abs__, __bool__, __complex__, __int__, __float__, __hash__,__index__
Emulating collections len, getitem, setitem, delitem, contains
Context management __enter__, __exit__
Instance creation and destruction __new__, __init__, __del__
Attribute management _getattr, getattribute, setattr, delattr, dir__
Attribute descriptors get, set, delete
Class services _prepare, instancecheck, subclasscheck__
Unary numeric operators neg -, pos +, abs abs()

数据结构

>>> colors = ['black', 'white']
>>> sizes = ['S', 'M', 'L']
>>> tshirts = [(color, size) for color in colors for size in sizes]
tshirts
[('black', 'S'), ('black', 'M'), ('black', 'L'), ('white', 'M'), ('white', 'L')]。每个Unicode standard都是4到6个16进制的字符。

生成表达式

Text versus Bytes

character issues

Byte Essentials

序列类型

处理文本文件

默认编码

Concurrent with futures

import os
import time
import sys
import requests

POP20_CC = ('CN', 'IN', 'US')
BASE_URL = 'http://flupy.org/data/flags'
DEST_DIR = 'downloads/'

def save_flag(img, filename):
    path = os.path.join(DEST_DIR, filename)
    with open(path, "wb") as fp:
        fp.write(img)

def get_flag(cc):
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
    resp = requests.get(url)
    return resp.content

def show(text):
    print(text, end=' ')
    sys.stdout.flush()

def download_many(cc_list):
    for cc in sorted(cc_list):
        image = get_flag(cc)
        show(cc)
        save_flag(image, cc.lower() + '.gif')

    return len(cc_list)

def main(download_many):
    t0 = time.time()
    count = download_many(POP20_CC)
    elapsed = time.time() - t0
    msg = '\n{} flags downloaded in {:.2f}s'
    print(msg.format(count, elapsed))

if __name__ == "__main__":
    main(download_many)

download with concurrent.futures

def download_once(cc):
    image = get_flag(cc)
    show(cc)
    save_flag(image, cc.lower() + ".gif")
    return cc

def download_many(cc_list):
    workers = min(MAX_WORKERS, len(cc_list))

    with futures.ThreadPoolExecutor(workers) as executor:
        res = executor.map(download_once, sorted(cc_list))
def download_many(cc_list):
    cc_list = cc_list[:5]
    with futures.ThreadPoolExecutor(max_workers = 3) as executor:
        to_do = []
        for cc in sorted(cc_list):
            future = exector.submit(download_one, cc)
            to_do.append(future)
            msg = 'Scheduled for {} : {}'
            # 存储每个 future,这样能能够获取它通过 as_complete
            print(msg.format(cc, future))

        results = []
        # as_completed 将会产生 futures
        for future in futures.as_completed(to_do):
            res = future.result()
            msg = '{} result: {!r}'
            print(msg.format(future, res))
            results.append(res)

    return len(results)

futures

使用多进程

def download_many(cc_list):
    with futures.ProccessPoolExector() as executor

体验 Exector.map

from time import sleep, strftime
from concurrent import futures
def display(*args):
    print(strftime['%H:%M:%S'], end = ' ')
    print(*args)
def loiter(n):
    msg = '{}loiter({}): doing nothing for {}s...'
    display(msg.format('\t'*n, n, n))
    sleep(n)
    msg = '{}loiter({}):done'
    display(msg.format('\t'*n, n))
    return n * 10
def main():
    display('scripting starting')
    executor = futures.ThreadPoolExecutor(max_workers = 3)
    results = executor.map(loiter, range(5))
    display('results', results)
    display('waiting for individual results:')

    for i, result in enumerate(results):
        display('{result {} : {}}'.format(i, result))

main()

Thread versus coroutine

import threading
import itertools
import time
import sys
class Signal:
   go = True

def spin(msg, signal):
    write, flush = sys.stdout.write, sys.stdout.flush
    for char in itertools.cycle('|/-\\'):
        status = char + ' ' + msg
        write(status)
        flush()
        write('\x08' * len(status))
        time.sleep(.1)
        if not signal.go:
            break
    write(' ' * len(status) + '\x08' * len(status))

def slow_function():
    time.sleep(3)
    return 42

def supervisor():
    signal = Signal()
    spinner = threading.Thread(target = spin, args=('thinking', signal))
    print('spinnner object', spinner)
    spinner.start()
    result = slow_function()
    signal.go = False
    spinner.join()
    return result

def main():
    result = supervisor()
    print('Answer:', result)
if __name__="__main__":
    main()

coroutine 版本

import asyncio
import itertoos
import sys
@asyncio.coroutine
def spin(msg):
    write, flush = sys.stdout.write, sys.stdout.flush
    for char in itertools.cycle('|/-\\'):
        status = char + ' ' + msg
        write(status)
        flush()
        write('\x08' * len(status))
        try:
            yield from asyncio.sleep(.1) # 将执行流交给事件循环这里假装在进行 io 操作。
        except asyncio.CancelledError:
            break
    write(' ' * len(status) + '\x08' * len(status))

@asyncio.coroutine
def slow_function()
    yield_from asyncio.sleep(3) # 将执行流交给事件循环,在 3 秒后,该 coroutine 将会执行
    return

@asyncio.coroutine
def supervisor()
    spinner = asyncio.async(spin('thinking')) # 调度 spin coroutine 来执行,wrapping it in a Task object.
    print('spinner object:', spinner)
    result = yield from slow_function() 等待执行完
    spinner.cancel() #取消 coroutine
    return result

def main():
    loop = asyncio.get_event_loop()
    result = loop.run_until_complete(supervisor())
    loop.close()
    print("Answer:", result)

if __name__ == "__main__":
    main()

Yielding from futures, tasks and coroutines

在 coroutine 中使用 yield from 需要注意的是

而在 asyncio 中。

type/python #async #public