open-contracting / ocdskit

A suite of command-line tools for working with OCDS data
https://ocdskit.readthedocs.io
BSD 3-Clause "New" or "Revised" License
17 stars 6 forks source link

compile: Add --stream CLI option (support large files) #83

Closed jpmckinney closed 4 years ago

jpmckinney commented 5 years ago

The compile command presently needs to buffer all the data to be sure it has all releases with the same ocid before merging releases. Datasets like Colombia are several GBs, exceeding the memory available on many machines. An optimized process would be to:

  1. un-package the releases (using jq)
  2. group them by ocid (perhaps by appending to files named according to the OCID)
  3. pass them to the compile command with a --stream flag, so that it knows that the input is grouped, so that it can only buffer releases until the ocid changes

If, as it runs through releases, it notices that releases are not in fact in order, it should error. To do so, it would only need to keep a set of OCIDs, which would consume little memory.


This assumes that no single contracting process has so many releases that we would want to further optimize the process e.g. by using external sorting so that OCDS Merge doesn't have to buffer those releases in order to sort them itself.

duncandewhurst commented 4 years ago

I've been running into memory issues when trying to help a user compile a 1.4gb file on a machine with 12gb RAM.

I discussed with @kindly and came to a similar conclusion on the preferred approach.

I'd suggest this should be a priority

jpmckinney commented 4 years ago

Sounds good – feel free to add it to the Trello board. An early step would be to share the proposed implementation at a high-level, as otherwise I'm likely to request more changes :)

I figure there would have to be a two-step process, one to group releases by ocid, and another to compile each group. The first step can stream input packages using ijson and append individual releases to line-delimited JSON files, named according to OCID. Those files can be streamed into the second step using cat, which will compile releases more or less as is done now, except it will produce an output whenever the ocid changes.

jpmckinney commented 4 years ago

It might help to do #104 first (it's not long, I think).

duncandewhurst commented 4 years ago

ocdskit compile: improve support for large files

jpmckinney commented 4 years ago

Implementation options are between a temporary Sqlite database and multiple individual files on disk.

Since the input size is unknown, writing to files named after the ocid can generate millions of files; mechanisms used by caching systems can be used to create file hierarchies to avoid exceeding filesystem limits for the number of files per directory. However, this is likely too much effort for the (slight?) performance gain.

So, Sqlite is preferred.

jpmckinney commented 4 years ago

https://stackoverflow.com/questions/1711631/improve-insert-per-second-performance-of-sqlite

jpmckinney commented 4 years ago

Copying relevant comments from Trello:

@jpmckinney: Sqlite simplifies sorting by date. So we'd insert rows with ocid, date and JSON blob for the release in a first pass (letting Sqlite write to disk and manage its own memory consumption), and then we'd read from the DB in a more predictable way (compared to reading a stream from the command line).

@kindly: Yes the sqllite approach discussed would write to disk in the way you describe. The idea is that it would be a temporary database file that would be deleted after the process. Not much different to the file per ocid option but give you better sorting options and stop the need to have potentially millions of files on disk.

sqlite is in the python standard library so the whole thing would be transparent to users. I think it would be easy enough using this library directly and not requiring an abstraction/orm layer (like sqlalchemy). See docs https://docs.python.org/3.7/library/sqlite3.html#module-sqlite3. All we need is basically outlined in the examples.

Also selecting from the database using a cursor the data can be streamed out.

My main concern about it would be speed, mainly write performance of inserts. There are several ways to improve this like using bulk inserts and turning some consistency stuff off outlined here: https://stackoverflow.com/questions/1711631/improve-insert-per-second-performance-of-sqlite

jpmckinney commented 4 years ago

Using the 1.4GB file (384,528 releases) from update 6 in CRM issue 5075, I first had to fix:

# fix.py
class FixId(dict):
    def __setitem__(self, key, value):
        if key == '_id':
            key = 'id'
        super().__setitem__(key, value)

def fix_id(obj):
    if isinstance(obj, dict):
        if 'id' in obj and isinstance(obj['id'], dict):
            obj['id'] = obj['id']['$oid']
        for value in obj.values():
            fix_id(value)
    elif isinstance(obj, list):
        for value in obj:
            fix_id(value)

def _default(obj):
    if isinstance(obj, Decimal):
        return float(obj)
    raise TypeError('%r is not JSON serializable' % obj)

for release in ijson.items(sys.stdin.buffer, 'item', map_type=FixId):
    if 'date' in release:
        release['date'] = release['date']['$date']
        fix_id(release)
        print(json.dumps(release, default=_default))
    else:
        sys.stderr.write(json.dumps(release, default=_default) + '\n')

The script takes two minutes to run:

cat all_ti_190723.json | python fix.py > clean.json 2>error.json

The 17,058 releases without dates are omitted, leaving 367,470 releases. Run:

cat clean.json | ocdskit upgrade 1.0:1.1 | ocdskit compile > compiled.json

With the new code, this takes ~7-8 min and uses <150MB memory. With the old code, it's known to consume GBs memory just to read in all the data, and it's anticipated to take much longer (would still need to test) if memory were exhausted.

The test suite (which has very small inputs) performs similarly before and after the changes. If we want, we can test with medium-sized inputs that won't exhaust memory to see if there's a difference in performance between the SQLite approach and the old approach. SQLite doesn't write uncommitted changes to disk unless it has to, so the performance might be similar (viz. avoiding I/O), though there's likely some CPU overhead to using SQLite (creating the index, etc.). Update: After profiling on the above input, most time is spent in OCDS Merge.

Still need to add tests for:

jpmckinney commented 4 years ago

Noting that on small inputs, most of the time is spent sending two web requests (for OCDS tags and the OCDS schema), e.g.:

curl -sS 'http://200.13.162.86/api/v1/record/ocds-lcuori-P2013-1801-3-84/?format=json' | python -m cProfile -o ocdskit.prof ocdskit/cli/__main__.py compile --versioned --root-path releases.item > /dev/null
gprof2dot -f pstats ocdskit.prof | dot -Tpng -o output.png
open output.png

If there were a performance issue, we could have added a --backend {sqlite,python} option, for the user to select the fastest option for their input.