vxgmichel / aiostream

Generator-based operators for asynchronous iteration
http://aiostream.readthedocs.io
GNU General Public License v3.0
800 stars 34 forks source link

Performance issue with aiohttp and starmap #26

Closed 1ycx closed 6 years ago

1ycx commented 6 years ago

Hello there Vincent ! I've read your answer on stack overflow on asynchronous programming. After following it I get this error :

Traceback (most recent call last):
  File "async_update_code.py", line 260, in <module>
    loop.run_until_complete(main())
  File "/usr/lib/python3.6/asyncio/base_events.py", line 468, in run_until_complete
    return future.result()
  File "async_update_code.py", line 208, in main
    with aiohttp.ClientSession() as session:
  File "/home/yahyaa/.local/lib/python3.6/site-packages/aiohttp/client.py", line 818, in __enter__
    raise TypeError("Use async with instead")
TypeError: Use async with instead
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f750a1e5080>

BTW, is it alright to not be in a session ? I also have another question on asyncio/aiostream on stack overflow asked just now. Can you answer it too ?

Thanks.

1ycx commented 6 years ago

Removed the session, and changed to :

xs = stream.iterate(myList)
ys = stream.starmap(xs, download, ordered=True, task_limit=20)
await ys

But I don't get that expected asynchronous speed. It's all the same as before. IDK why ? :thinking:

vxgmichel commented 6 years ago

Hi @Kogam22,

I'm not too familiar with aiohttp but the initial issue is that aiohttp.ClientSession is meant to be used as an asynchronous context:

async def main():
    async with aiohttp.ClientSession() as client:
        ...

I don't know about the performance issue, could you describe your timing method and the results you get for different value of task_limit?

1ycx commented 6 years ago

@vxgmichel :

I don't know about the performance issue, could you describe your timing method and the results you get for different value of task_limit?

I don't understand the term timing method(probably because I just started with async). The results I get for different task_limit values are same. Some other user @ stack overflow who answered my question said that requests.get() should be changed to some aiohttp method to download asynchronously.

vxgmichel commented 6 years ago

Some other user @ stack overflow who answered my question said that requests.get() should be changed to some aiohttp method to download asynchronously.

Yes he's right, you definitely have to use aiohttp instead of requests. He's also right that aiostream might be a bit overkill for your use case. Start with asyncio.gather, and get back to stream.map if you need to limit the number of task running concurrently.

1ycx commented 6 years ago

@vxgmichel Well, I fixed the code and even with ordered=True, it downloads without order. Can you check this ? I also posted it on repl.it where you can execute online for a manual check : https://repl.it/@Kogam22/Python-EPUB-Save

from aiostream import stream, pipe
import requests as req
from ebooklib import epub
from bs4 import BeautifulSoup as bs
import asyncio
from aiohttp import ClientSession
import html5lib

err, urls_titles, min_urls_titles = [], [], []
length, counter, start, end = 0, 0, 0, 0
book = epub.EpubBook()

def html_gen(elem, soupInstance, value, tagToAppend, insert_loc=None):
    element = soupInstance.new_tag(elem)
    element.string = value
    if not insert_loc:
        tagToAppend.append(element)
    else:
        tagToAppend.insert(insert_loc, element)

def get_urls_titles():
    for a in chapLi.findAll('a'):
        link = novelURL + a.get('href')
        l = (link, a.string)
        urls_titles.append(l)
    del urls_titles[2], urls_titles[1], urls_titles[0]

async def fetch(url, title):
    global counter
    async with ClientSession() as session:
        async with session.get(url) as response:
            response = await response.read()
            try:
                #####################
                # Get, Pass & Select
                s = bs(response, "html5lib")    
                div = s.select_one("#chaptercontent")

                c2 = epub.EpubHtml(title=title, file_name=title+'.xhtml', lang='hr')
                c2.content = div.encode("utf-8")
                book.add_item(c2)

                # Add to table of contents & book ordering
                book.toc.append(c2)    
                book.spine.append(c2)

                # Print Created Chapter
                print("Parsed Chapter :", title)
                counter += 1

            except Exception as e:
                print(e)

def save():
    global title

    # Add Navigation Files
    book.add_item(epub.EpubNcx())
    book.add_item(epub.EpubNav())

    # Defines & Add CSS Style
    style = 'p { text-align : left; }'
    nav_css = epub.EpubItem(uid="style_nav", file_name="style/nav.css", media_type="text/css", content=style)
    book.add_item(nav_css)
    print("Saving . . .")
    epub.write_epub(title + ".epub", book, {})

async def main():
    get_urls_titles()
    global start 
    global end 
    global counter

    start, end = 1, 100 #intro()

    if end == length:
        min_urls_titles = urls_titles[start:]
    else:
        min_urls_titles = urls_titles[start:end+1]

    counter = start - 1

    xs = stream.iterate(min_urls_titles)
    ys = stream.starmap(xs, fetch, ordered=True, task_limit=100)
    await ys

    if counter < 0: counter = 0

if __name__=="__main__":
    book.set_language('en')

    # Enter The Novel URL Here
    novelURL =  'http://m.wuxiaworld.co/The-Magus-Era/'
    novelChapURL = novelURL + 'all.html'
    while novelURL == '':
        print("Novel URL Not Provided Inside The Script.")
        novelURL = str(input("Please Enter Novel URL : "))
    print("\r\nNovel URL Set")

    # Main Novel Page -- Get, Create Instance & Select
    page = req.get(novelURL)
    soup = bs(page.text, "html5lib")

    # Novel Chapters Page -- Get, Create Instance & Select
    pageChap = req.get(novelChapURL)
    so = bs(pageChap.text, "html5lib")
    chapLi = so.select_one("#chapterlist")

    # Book Title
    title = soup.select_one('header span').text
    book.set_title(title)
    book.spine = ['nav']
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()
vxgmichel commented 6 years ago

ordered=True doesn't do anything in your example since fetch doesn't return any value other than None, and nothing process those ordered None from ys.

You probably want something like:

xs = stream.iterate(min_urls_titles)
ys = stream.starmap(xs, fetch, ordered=True, task_limit=100)
zs = stream.starmap(ys, process)
await zs

where fetch returns the values to be processed by process.

1ycx commented 6 years ago

@vxgmichel Even when I return somthing to ys, I straight away get an error :

Novel URL Set

Traceback (most recent call last):
  File "python", line 118, in <module>
  File "python", line 88, in main
NameError: name 'process' is not defined

Can you tell me what is the process in

zs = stream.map(ys, process)

?

vxgmichel commented 6 years ago

Sure, it's up to you to write the process function. You probably want to define it as:

def process(response, title):
    s = bs(response, "html5lib")    
    div = s.select_one("#chaptercontent")
    c2 = epub.EpubHtml(title=title, file_name=title+'.xhtml', lang='hr')
    c2.content = div.encode("utf-8")
    book.add_item(c2)
    book.toc.append(c2)    
    book.spine.append(c2)
    print("Parsed Chapter :", title)
    counter += 1

With fetch returning (response, title).