alephdata / memorious

Lightweight web scraping toolkit for documents and structured data.
https://docs.alephdata.org/developers/memorious
MIT License
311 stars 59 forks source link

Implement Aleph bulk upload as an aggregation operation #61

Closed sunu closed 4 years ago

sunu commented 5 years ago

Whenever a scraper uses balkhash_emit to put entities into a Balkhash dataset, we want to push all those entities on to an Aleph collection after the crawler has finished running.

pudo commented 5 years ago

I wonder if we should also move the aleph_emit operation into the alephclient package.

thimios commented 4 years ago

@pudo I see that the aleph_emit method was moved to alephclient: https://github.com/alephdata/memorious/pull/69/files I wonder if there is a nice way to schedule exporting of crawled lists from memorious to aleph datasets using the memorious scheduling system and not relying on alephclient command and some external scheduling mechanism (cron or whatever else)

sunu commented 4 years ago

Hi @thimios, you can define an aggregator function in a crawler's config. The function will run when the crawler is finished running. It can be any python function with the signature function(context, params) For example, we have written an aggregator function to push ftm entities made by a memorious crawler in bulk into Aleph. Here's the config file: https://github.com/alephdata/opensanctions/blob/6928fd94efee1027935bb40aa701a7bfa731f155/opensanctions/config/au_dfat_sanctions.yml#L17 Here's how the function itself looks like: https://github.com/alephdata/balkhash/blob/master/balkhash/memorious.py#L43

thimios commented 4 years ago

@sunu thanks for the pointers. In order for that function to work I would also need to have alephclient installed on the worker container with:

pip install alephclient and also set the two env variables:

ALEPHCLIENT_HOST: http://aleph_api_1:5000 ALEPHCLIENT_API_KEY: 047df233f8cb4b96bfd23b25a1bf2xxx

right? Or is there another way to configure the host and api key for that function?

thimios commented 4 years ago

also I do not need a postgres container for opensanctions if I only use crawlers that push to aleph right?

sunu commented 4 years ago

@sunu thanks for the pointers. In order for that function to work I would also need to have alephclient installed on the worker container with:

pip install alephclient and also set the two env variables:

ALEPHCLIENT_HOST: http://aleph_api_1:5000 ALEPHCLIENT_API_KEY: 047df233f8cb4b96bfd23b25a1bf2xxx

right? Or is there another way to configure the host and api key for that function?

Yes, that should do. Make sure the Aleph host is actually reachable from your crawler worker container. Docker's networking can make that complicated sometimes.

also I do not need a postgres container for opensanctions if I only use crawlers that push to aleph right?

In case of OpenSanctions, we use balkhash to store FtM entity fragments before pushing them to Aleph. Balkhash works with multiple backends; Postgres being one of them. If you don't use postgres, Balkhash will fallback to using leveldb. You would have to change https://github.com/alephdata/opensanctions/blob/6928fd94efee1027935bb40aa701a7bfa731f155/setup.py#L19 to balkhash[leveldb] to install the appropriate dependencies.

thimios commented 4 years ago

It seems that it is not working. I have put the two containers in the same docker network so that I can do:

INFO:alephclient.cli:[un_sc_sanctions] Bulk load entities: 1000...
INFO:alephclient.cli:[un_sc_sanctions] Bulk load entities: 2000...
/memorious # 

on the opensanctions worker container shell.

I have configured all memorious crawlers to use the aleph bulkpush aggregator:

name: us_bis_denied
description: "[OSANC] US BIS Denied Persons List"
schedule: daily
pipeline:
  init:
    method: seed
    params:
      url: 'https://www.bis.doc.gov/dpl/dpl.txt'
    handle:
      pass: fetch
  fetch:
    method: fetch
    handle:
      pass: parse
  parse:
    method: opensanctions.crawlers.us_bis_denied:parse
aggregator:
  method: balkhash.memorious:aleph_bulkpush

and I have also tried specifying the foreign id of the aleph dataset:

name: interpol_red_notices
description: "[OSANC] Interpol Red Notices"
schedule: daily
pipeline:
  init:
    method: seed
    params:
      url: 'https://www.interpol.int/en/How-we-work/Notices/View-Red-Notices'
    handle:
      pass: fetch
  fetch:
    method: fetch
    handle:
      pass: index
  index:
    method: opensanctions.crawlers.interpol_red_notices:index
    handle:
      pass: fetch_notices
  fetch_notices:
    method: fetch
    handle:
      pass: parse_noticelist
  parse_noticelist:
    method: opensanctions.crawlers.interpol_red_notices:parse_noticelist
    handle:
      pass: fetch_notice
  fetch_notice:
    method: fetch
    handle:
      pass: parse_notice
  parse_notice:
    method: opensanctions.crawlers.interpol_red_notices:parse_notice
aggregator:
  method: balkhash.memorious:aleph_bulkpush
  params:
    foreign_id: 26e399bfc91a4000893985f95b23830e

but the datasets are not getting filled, either by letting the crawlers follow their schedule or by running them manually:

INFO:us_bis_denied.init:[us_bis_denied->init(seed)]: 2cbb539567714df5bbeca2a984443e69
INFO:us_bis_denied.fetch:[us_bis_denied->fetch(fetch)]: 2cbb539567714df5bbeca2a984443e69
INFO:us_bis_denied.fetch:Fetched [200]: 'https://www.bis.doc.gov/dpl/dpl.txt'
INFO:us_bis_denied.parse:[us_bis_denied->parse(opensanctions.crawlers.us_bis_denied:parse)]: 2cbb539567714df5bbeca2a984443e69
/memorious # 

I do not get any explanatory logs on the console or the docker logs... Any ideas what I should look into?