lixi5338619 / asyncpy

使用asyncio和aiohttp开发的轻量级异步协程web爬虫框架
110 stars 27 forks source link

有没有异步pipeline的参考? #6

Open triangle959 opened 2 years ago

triangle959 commented 2 years ago

我在尝试写支持异步的pipeline,以提高保存数据速度,但是我发现loop没法嵌套,如果我在pipeline创建一个loop会抛出这个错误:

This event loop is already running

我尝试使用nest_asyncio进行补救,但是发现这样起来效率并不高,同步和异步的插入速度差别不大。 以下是我的代码参考

from motor.motor_asyncio import AsyncIOMotorClient
class MongoPipeLine(SpiderPipeline):

    def __init__(self, logger):
        super().__init__(logger)
        import nest_asyncio
        self.mongo_client = MongoClient(host=MONGO_HOST, port=MONGO_PORT, username=MONGO_USER, password=MONGO_PWD)
        self.async_connection = AsyncIOMotorClient(host=MONGO_HOST, port=MONGO_PORT, username=MONGO_USER, password=MONGO_PWD)
        self.loop = self.async_connection.get_io_loop()
        nest_asyncio.apply(self.loop)

    def insert_data_sync(self, database_name, collection_name, data):
        self.mongo_client.get_database(database_name).get_collection(collection_name).insert_one(data)

    async def insert_data_async(self, database_name, collection_name, data):
        # https://motor.readthedocs.io/en/stable/tutorial-asyncio.html
        await self.async_connection[database_name][collection_name].insert_one(data)

    def process_item(self, item, spider_name):
        database_name = "test"
        collection_name = "test"

        if MONGO_ASYNC:
            self.loop.run_until_complete(self.insert_data_async(database_name, collection_name, item))
        else:
            self.insert_data_sync(database_name, collection_name, item)
tuhaolam commented 1 year ago

1、nest-asyncio不光在这里效果不明显,尝试写了个demo效果也不明显。参考https://stackoverflow.com/questions/59740704/correct-use-constraints-of-use-of-nest-asyncio 可以看到有一些局限。 2、另外nest-asyncio官方文档说了,只支持asyncio创建的时间循环,不支持其他的,而本项目在环境有uvloop库时会优先使用uvloop,也可能导致nest-asyncio不生效。 3、那么,有没有办法实现异步pipeline呢,需要结合实际场景来替换,比如http请求用aiohttp,mysql用fivem-mysql-async

triangle959 commented 1 year ago

已收到