mediacloud / story-indexer

The core pipeline used to ingest online news stories in the Media Cloud archive.
https://mediacloud.org
Apache License 2.0
2 stars 5 forks source link

Infrastructure for queuing/processing historical data and archives, and queue-based fetcher #207

Closed philbudne closed 10 months ago

philbudne commented 10 months ago

Summary of changes by file:

new: bin/run-arch-queuer.sh: runs indexer/workers/arch-queuer.py
new: bin/run-hist-queuer.sh: runs indexer/workers/hist-queuer.py
new: bin/run-hist-fetcher.sh: runs indexer/workers/hist-fetcher.py
new: stubs/warcio/{archiveiterator,recordloader}.pyi
pyproject.toml: add explicit use of beautifulsoup4 & stubs, boto3-stubs
regenerated: requirements[-dev].txt
docker/deploy.sh:
    add -T option for pipeline types: batch-fetcher, historical, archive, queue-fetcher
    add deployment_.... vars for docker-compose comments
    add deployment_id: unique id for each deployment
    add xxx_port_exported vars: xxx_port is now INTERNAL port number
    add parser_replicas
docker/dev.sh: add QUEUER_S3_.... var place holders
docker/docker-compose.yml.j2:
    comments to show deployment_... vars
    add DEPLOYMENT_ID worker-environment-var
    pass --type {{pipeline_type}} to configure-pipeline
    add if's for pipeline_type / queuers
    use parser_replicas
    pass importer_args to importer
indexer/app.py:
    rename ArgsProtocol to AppProtocol (now includes stats), add process_args
    App: honor PROCESS_NAME for alternate queues
    Add "run" function (from worker.py)
    Test app now logs at different levels
indexer/blobstore/S3.py: quiet a mypy warning (now that boto stubs installed)
indexer/elastic.py: use AppProtocol, use as base class
indexer/pipeline.py:
    take --type argument for different pipeline types
    subclass to MyPipeline, move topology to "lay_pipe" method
    get "run" from indexer.app
    add fast_queue support (for queue-fetcher)
    prefix private classes with _
new: indexer/queuer.py:
    Queuer base class for programs that queue stories (historical, archive, rss)
    Subclass supplies "process_file" method that is passed a BinaryIO stream
    Supports reading:
        local files
        traversing local directory trees
        reading http/https on the fly
        optional gzip file expansion
        reading S3 objects (prefix match)
        uses "Tracker" class to keep track of ingested files
    Checks output queue(s) to see if filling up
    By default processes one file is space available, and quits
    Option to loop, sleeping
indexer/worker.py: moved all Story related classes to new indexer/storyapp.py
    use DEPLOYMENT_ID for Pika configuration semaphore
    remove old semaphores
    add fast_queue_name
    moved --from-quarantine from QApp to QWorker (always takes input)
    removed ptlogger (pika thread logger)
    add msglogger (for tag # debug messages)
    let pika thread process_data_events sleep for a day (was 10 seconds)
    _stop_pika_thread queues "nop" callback to wake Pika thread
    let _process_messages return (for multi-thread apps)
    Worker:
        honor NO_QUARANTINE member for exceptions to discard
        make RETRY_DELAY_MINUTES a class member
    removed all Story related classes to indexer/storyapp.py
indexer/storyapp.py:
    split from indexer/worker.py
    add MAX_HTML_BYTES, non_news_fqdn
    StoryMixin class, with helpers:
        incr_stories method
        check_story_length (calls incr_stories)
        check_story_url (calls incr_stories)
    Use StoryMixin in StoryProducer, StoryWorker
indexer/tracker.py: FileTracker class for recording processed files
    only/current implementations are:
        DummyFileTracker for testing
        LocalFileTracker (base for local file based classes)
        DBMFileTracker: LocalFileTracker using gdbm
indexer/workers/arch-queuer.py:
    Queuer that reads local/S3 WARC files and queues Stories
indexer/workers/archiver.py:
    use indexer.path.DATAROOT
    use indexer.storyapp
indexer/workers/hist-queuer.py:
    queue Stories from historical .csv files (on S3)
indexer/workers/hist-fetcher.py:
    process Stories from hist-queuer, fetching HTML from S3
indexer/workers/importer.py:
    use indexer.app.run, indexer.storyapp
    add --no-output option (no output to archiver)
indexer/workers/parser.py:
    use indexer.app.run, indexer.storyapp, incr_stories
    use BeautifulSoup4's UnicodeDammit for character set detection, BOM removal, decode
    (needed for historical Stories and any stories fetched using requests)
indexer/fetch_worker.py: use indexer.app.run, indexer.storyapp.StoryProducer, incr_stories
indexer/scripts/qutil.py: use indexer.storyapp
indexer/story_archive_writer.py: IO typing cleanup (prefer BinaryIO)
philbudne commented 10 months ago

@pgulley no threaded fetcher. But it includes Queuer class that rss-queuer depends on.