scrapy / scrapy

Scrapy, a fast high-level web crawling & scraping framework for Python.
https://scrapy.org
BSD 3-Clause "New" or "Revised" License
50.99k stars 10.34k forks source link

add support for custom exporter class #6273

Open guillermo-bondonno opened 2 months ago

guillermo-bondonno commented 2 months ago

Opening this PR to discuss the implementation of adding support for custom exporter classes. This comes from the idea of adding stream exporters that don't rely on local storage. The usage I had in mind would be something like:

custom_settings = {
        "FEEDS": {
            "custom:": {
                "custom_exporter_class": "project.exporters.StreamExporter",
                "format" : None,
                "item_export_kwargs" : {"myarg1": "myvalue1"}, #needs dont_fail=True
            }
        }
    }
class CustomExporter(BaseItemExporter):
    def __init__(self, *, dont_fail=True, **kwargs):
        self._configure(kwargs, dont_fail=dont_fail)
        self.this_slot_items = []

    def export_item(self, item):
        #example
        requests.post(
            "https://some-api.com", json=item
        )
        #or
        self.this_slot_items.append(item)

    def finish_exporting(self):
        #do something here
        pass

The batching would work the same, since a new instance would be created for every new slot and the custom exporter can handle the queuing, etc.

After this, reusable exporters for widely used services can be implemented. Missing tests and docs.

codecov[bot] commented 2 months ago

Codecov Report

Merging #6273 (bb8ff74) into master (e208f82) will increase coverage by 0.20%. Report is 50 commits behind head on master. The diff coverage is n/a.

Additional details and impacted files ```diff @@ Coverage Diff @@ ## master #6273 +/- ## ========================================== + Coverage 88.67% 88.88% +0.20% ========================================== Files 161 161 Lines 11788 11971 +183 Branches 1912 1929 +17 ========================================== + Hits 10453 10640 +187 + Misses 1007 980 -27 - Partials 328 351 +23 ``` [see 81 files with indirect coverage changes](https://app.codecov.io/gh/scrapy/scrapy/pull/6273/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=scrapy)
VMRuiz commented 2 months ago

Hello, I like the idea of using the Feed Exporters rather than Feed Storages to do the actual exporting. You don't actually need to write any additional code to support this right now, this should be similiar to your solution:

FEED = {
    "stdout://": {
       "format": "StreamingAPI"
   }
}
FEED_EXPORTERS = { 
   "StreamingAPI": "StreamingAPIExporter"
}

The methods start_exporting and finish_exporting could be used to initialization, authentication, batching, etc...

However, there are several downsides that we would need to figure out:

  1. export_item is an sync method, meaning that any blocking call like requests.post would block the entire Scrapy process
  2. We would need to figure out how to properly manage networking / API errors and allowing for retries.
ftadao commented 1 month ago
  1. export_item is an sync method, meaning that any blocking call like requests.post would block the entire Scrapy process

Wouldn't this be something that the developer needs to take care of on their customized exporter code? If we're allowing the customization of def export_item, any developer can also adapt it to be an async function and remove that barrier on its terms. Of course, that might open up possible adaptations that we might do to the underlying exporter code (e.g., allow export_item to be an async function), but it might make the code a little more robust after we go through it.

  1. We would need to figure out how to properly manage networking / API errors and allowing for retries.

Couldn't all this be managed by the custom exporter class itself? If we assume that there's only a limited amount of exceptions that are covered by the engine, wouldn't this be something managed by the code itself?

I think those questions can be answered better if we have an example of a customized exporter that matches those requirements. @VMRuiz what about the Airtable exporter we've previously discussed?

guillermo-bondonno commented 1 month ago

Reopening this since it closed when I reset the branch, because the changes were not necessary. I tried @VMRuiz proposal for custom exporters and runs just fine. Using defer.maybeDeferred to export the item you can run async requests inside export_item like below.

custom_settings = {
        "FEEDS" : {
            "stdout://": {
               "format": "StreamingAPI"
           }
        },
        "FEED_EXPORTERS" : { 
           "StreamingAPI": "stream_export_project.exporters.StreamExporter"
        }
    }
class StreamExporter(BaseItemExporter):
    """
    A new instance will be created for each slot/batch
    """
    def __init__(self, file, *args, dont_fail=False, **kwargs):
        self._kwargs = kwargs
        self.file = file #Or other name in case other format is used
        self._configure(kwargs, dont_fail=dont_fail)

    def start_exporting(self):
        pass

    def send_request(self, item):
        requests.post(
            "https://example.com/endpoint", json=item | self._kwargs
        )

    def export_item(self, item):
        dfd = threads.deferToThread(self.send_request, item)
        # maybe collect these to ensure them in finish exporting
        return dfd

    def finish_exporting(self):
        pass

The failing tests confuse me, because from the error they seem to be related to ExceptionJsonItemExporter , but the actual tests don't use it

guillermo-bondonno commented 1 month ago

Turns out without changing any Scrapy code I get the same behavior by calling deferToThread from export_item, I'll look deeper into it to see if it's safe and reliable to do that

VMRuiz commented 1 month ago

@Gallaecio @wRAR , Do you think using deferToThread is appropriate here, or could there be performance issues if there are too many calls?

wRAR commented 1 month ago

I suggest using deferToThread from time to time but I've never used it myself :) It won't create a new thread for every item as it uses a thread pool so I think the performance shouldn't be worse.