scrapy / scrapy

Scrapy, a fast high-level web crawling & scraping framework for Python.
https://scrapy.org
BSD 3-Clause "New" or "Revised" License
51.16k stars 10.35k forks source link

execution of asyncio.ensure_future(coro()) ignored on close_spider() pipelines call #6238

Closed abebus closed 2 months ago

abebus commented 2 months ago

Description

Initialising some asyncio based library resources (clients/connections) wrapped in asyncio.ensure_future works fine in spider_opened method. But execution of async functions (closing connections/clients) fails in spider_closed.

Steps to Reproduce

  1. Write function to close your async resources in pipeline
  2. Pass it in asyncio.ensure_future call in spider_closed

Expected behavior: [What you expect to happen]

Function (Future) needs to be executed (awaited)

Actual behavior: [What actually happens]

It gets ignored

Reproduces how often:

Always

Versions

Scrapy : 2.11.1 lxml : 5.1.0.0 libxml2 : 2.12.3 cssselect : 1.2.0 parsel : 1.8.1 w3lib : 2.1.2 Twisted : 23.10.0 Python : 3.12.1 (main, Dec 10 2023, 15:07:36) [GCC 11.4.0] pyOpenSSL : 24.0.0 (OpenSSL 3.2.1 30 Jan 2024) cryptography : 42.0.4 Platform : Linux-5.15.133.1-microsoft-standard-WSL2-x86_64-with-glibc2.35

Additional context

Code of pipeline:

import asyncio
import logging
from scrapy.utils.defer import deferred_from_coro

class AsynctestPipeline:
    async def ainit(self):
        logging.debug('async init')
        await asyncio.sleep(1)
        logging.debug('async initialised')

    async def adel(self):
        logging.debug('async closing resources')
        await asyncio.sleep(1)
        logging.debug('async resources closed')

    def open_spider(self, spider):
        asyncio.ensure_future(self.ainit()) # works as expected, engine executes it

    def close_spider(self, spider):
        logging.debug('call_adel_executed')
        # asyncio.ensure_future(self.adel()) # ignored by the engine
        return deferred_from_coro(asyncio.ensure_future(self.adel())) # execued by the engine

    async def process_item(self, item, spider):
        logging.debug('executing async task')
        await asyncio.sleep(1)
        logging.debug('async task done')
        return item
Gallaecio commented 2 months ago

Only process item has coroutine support. Try using the spider_closed signal instead.

abebus commented 2 months ago

but why then open_spider works?

Gallaecio commented 2 months ago

My guess: both work, only that the spider closes before the close one gets executed. Do you get any mentions in the standard output about unawaited futures? If you add a long sleep on the close spider signal, the close one might work. In any case, it is not intended to work in open_spider either, so I strongly suggest not to do that either, there is also an spider_open signal.

abebus commented 2 months ago

Thanks, didn't know crawler can automatically await on async functions connected via signals. The following code works as expected:

import asyncio
import logging
from scrapy import signals
import aiohttp

class AsynctestPipeline:
    async def ainit(self):
        logging.critical('async init')
        self.client = aiohttp.ClientSession()
        self.something = await self.client.get('https://scrapy.org/')
        logging.critical('async initialised')

    async def adel(self):
        logging.critical('async closing resources')
        await self.client.close()
        logging.critical('async resources closed')

    @classmethod
    def from_crawler(cls, crawler):
        p = cls()
        crawler.signals.connect(p.ainit, signal=signals.spider_opened)
        crawler.signals.connect(p.adel, signal=signals.spider_closed)
        return p

    async def process_item(self, item, spider):
        logging.critical('executing async task')
        await asyncio.sleep(10)
        logging.critical('async task done')
        logging.critical(self.something)
        return item
mhdzumair commented 1 month ago

Nice to have the above solution sample as part of async documentation. :+1: