cgarciae / pypeln

Concurrent data pipelines in Python >>>
https://cgarciae.github.io/pypeln
MIT License
1.55k stars 98 forks source link

TypeError: 'coroutine' object is not iterable in pypeln 0.4.2 #46

Closed charlielito closed 4 years ago

charlielito commented 4 years ago

I have a scrapper that uses pypeln for opening different pages concurrently. Nonetheless, with the latest version the production code broke, I had to stick with version 0.3.0. A minmal version of the code looks like the following:

import asyncio
import typing as tp
from copy import copy

import pypeln as pl
import pyppeteer

async def basic(result: dict) -> tp.AsyncIterable[dict]:
    result = copy(result)
    yield result

async def process_url(
    result: dict, search_contact: bool = True
) -> tp.AsyncIterable[dict]:

    results = basic(result)

    async for result in results:
        yield result

async def scrape() -> tp.AsyncIterable[dict]:
    async def get_urls(page_offset):
        browser = await pyppeteer.launch(headless=True)
        page = await browser.newPage()
        await page.goto(
            f"https://google.com/search?q=some python example&num=10&start={page_offset}",
        )
        search_results = await page.evaluate(
            """() => {
                var search_rows = Array.from(document.querySelectorAll("div.r"));
                var data = search_rows.map((row, index) => {
                    //Get the url for this search result
                    return { "url": row.querySelector("a").href, "rank": index }
                });
                return (data)
            }
            """
        )
        return [
            dict(url=element["url"], rank=int(element["rank"]) + int(page_offset))
            for element in search_results
        ]

    offsets = [i * 10 for i in range(1)]
    search_urls = pl.task.flat_map(get_urls, offsets, workers=10)
    stage = pl.task.flat_map(
        lambda url_obj: process_url(url_obj), search_urls, workers=10,
    )

    async for result in stage:
        yield result

async def main():
    results = scrape()
    data = []
    async for result in results:
        data.append(result)

    print(data)

asyncio.get_event_loop().run_until_complete(main())

In version 0.4.2 I get the following error:

/home/charlie/miniconda3/lib/python3.7/asyncio/base_events.py:1776: RuntimeWarning: coroutine 'scrape.<locals>.get_urls' was never awaited
  handle = None  # Needed to break cycles when an exception occurs.
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
Traceback (most recent call last):
  File "test_bug.py", line 68, in <module>
    asyncio.get_event_loop().run_until_complete(main())
  File "/home/charlie/miniconda3/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
    return future.result()
  File "test_bug.py", line 62, in main
    async for result in results:
  File "test_bug.py", line 55, in scrape
    async for result in stage:
  File "/home/charlie/data/snappr/pg-scraper/.venv/lib/python3.7/site-packages/pypeln/task/stage.py", line 98, in to_async_iterable
    async for elem in main_queue:
  File "/home/charlie/data/snappr/pg-scraper/.venv/lib/python3.7/site-packages/pypeln/task/queue.py", line 79, in __aiter__
    raise exception
TypeError: 

("'coroutine' object is not iterable",)

Traceback (most recent call last):
  File "/home/charlie/data/snappr/pg-scraper/.venv/lib/python3.7/site-packages/pypeln/task/worker.py", line 100, in __call__
    for key, value in kwargs.items()
  File "/home/charlie/data/snappr/pg-scraper/.venv/lib/python3.7/site-packages/pypeln/task/worker.py", line 248, in __aexit__
    await self.join()
  File "/home/charlie/data/snappr/pg-scraper/.venv/lib/python3.7/site-packages/pypeln/task/worker.py", line 242, in join
    await asyncio.gather(*self.tasks)
  File "/home/charlie/data/snappr/pg-scraper/.venv/lib/python3.7/site-packages/pypeln/task/worker.py", line 222, in get_task
    await coro
  File "/home/charlie/data/snappr/pg-scraper/.venv/lib/python3.7/site-packages/pypeln/task/api/flat_map.py", line 38, in apply
    for i, y in enumerate(ys):
TypeError: 'coroutine' object is not iterable
cgarciae commented 4 years ago

The problem was that the case of flat_map return an Awaitable[Iterable] wasn't being taken into account.