dlt-hub / verified-sources

Contribute to dlt verified sources 🔥
https://dlthub.com/docs/walkthroughs/add-a-verified-source
Apache License 2.0
70 stars 50 forks source link

[web scraper] verified source #262

Closed rudolfix closed 8 months ago

rudolfix commented 1 year ago

Quick source info

Current Status

What source does/will do

The idea is to base the source on scrapy. In theory, scrapy can be used with dlt directly because you can get the scraped data as a generator, on the other hand it is typically wrapped in opaque process from where there's no way to get data. scrapy has its own framework so we can fit dlt into scrapy (ie as export option)

We must investigate if to use scrapy or switch to beautiful soup and write own spider.

The requirements [for scrapy]

Test account / test data

Looks like we'll have plenty of websites to test against

Additional context

Please provide one demo where we scrap PDFs and parse them in transformer as in

sultaniman commented 10 months ago

I did a small research on this topic and I think we can use

  1. Custom item exporter or
  2. Custom item pipeline item which basically runs dlt
  3. And feeds json values or uses dlt to create load packages and then finally starts exporting process to destination.
sultaniman commented 10 months ago

@rudolfix @burnash wdyt?

burnash commented 10 months ago

@sultaniman good points. Could you please do a proof of concept of this?

sultaniman commented 10 months ago

I will do it once I check out this one https://github.com/dlt-hub/dlt/issues/811

sultaniman commented 10 months ago

So I created a draft prototype where

  1. pipeline runs in a separate thread,
  2. communication between the spider and pipeline goes via in-memory queue,
  3. manually start crawler process,
  4. manually start pipeline in separate thread.
flowchart LR
    queue[[queue]]
    pipeline[[dlt pipeline]]
    exit{{scraping done}}
    save([exit & save data])
    nodata{scraping done?}
    spider-- push results -->queue
    spider-- no more data -->exit
    queue-->pipeline
    pipeline-->nodata
    nodata-- NO -->queue
    nodata-- DONE -->save
    exit-. no data .->queue

Pipeline and scaffolding

from queue import Queue
import threading
import dlt
from scrapy.crawler import CrawlerProcess

from quotes_spider import QuotesSpider

result_queue = Queue(maxsize=1000)

class SpiderResultHandler(threading.Thread):
    def __init__(self, queue: Queue):
        super().__init__(daemon=True)
        self.result_queue = queue

    def run(self):
        @dlt.resource(name="quotes")
        def get_results():
            # keep pulling items from queue
            # until we get "done" in message
            while True:
                result = self.result_queue.get()
                if "done" in result:
                    break
                yield result

        pipeline = dlt.pipeline(
            pipeline_name="issue_262",
            destination="postgres",
        )

        load_info = pipeline.run(
            get_results,
            table_name="fam_quotes",
            write_disposition="replace",
        )

        print(load_info)

process = CrawlerProcess()

process.crawl(QuotesSpider, queue=result_queue)
handler = SpiderResultHandler(queue=result_queue)
handler.start()
process.start()
handler.join()

Spider source

from queue import Queue
from typing import Any
import scrapy

class QuotesSpider(scrapy.Spider):
    name = "quotes"
    start_urls = [
        "https://quotes.toscrape.com/page/1/",
    ]
    custom_settings = {"LOG_LEVEL": "INFO"}

    def __init__(
        self,
        name: str | None = None,
        queue: Queue | None = None,
        **kwargs: Any,
    ):
        super().__init__(name, **kwargs)
        self.queue = queue

    def parse(self, response):
        for quote in response.css("div.quote"):
            data = {
                "headers": dict(response.headers.to_unicode_dict()),
                "quote": {
                    "text": quote.css("span.text::text").get(),
                    "author": quote.css("small.author::text").get(),
                    "tags": quote.css("div.tags a.tag::text").getall(),
                },
            }

            # here we push result to queue
            self.queue.put(data)

        next_page = response.css("li.next a::attr(href)").get()
        if next_page is not None:
            next_page = response.urljoin(next_page)
            yield scrapy.Request(next_page, callback=self.parse)
        else:
            # finally if there are no more results then send "done"
            self.queue.put({"done": True})
burnash commented 10 months ago

@sultaniman thanks for the POC, this looks great. Please go ahead and make a verified source from this POC. As you mentioned, you'd need to devise a nice way to wrap it into a source definition that hides some complexities while giving enough ways to configure the source. Please take a look at other verified source in this repo for the inspiration. Please submit a draft PR and we'll iterate on the source interface.