open-contracting / ocdskit

A suite of command-line tools for working with OCDS data
https://ocdskit.readthedocs.io
BSD 3-Clause "New" or "Revised" License
17 stars 7 forks source link

combine-*-packages: Avoid buffering all inputs into memory (iterencode) [solved] #119

Closed jpmckinney closed 4 years ago

jpmckinney commented 4 years ago

We can postpone the aggregation of records by putting the for-loop in a function that yields each package's records instead of extending the output's records, and then running:

output['records'] = itertools.chain.from_iterable(func())

This works with simple inputs like:

python -c 'import json; print("\n".join(json.dumps({"records": list(range(1000))}) for x in range(10000)))' |
ocdskit combine-record-packages > /dev/null

However, that loop also serves to deduplicate packages and extensions. (It also has logic to replace package metadata, but we can change the logic to use the first package's metadata.)

One way to postpone the evaluation of packages, extensions and records is to collect the packages and extensions in a first read of the input, while streaming the records into SQLite – and then streaming them out of SQLite. This effectively reads the inputs twice.

Another way is to deduplicate the packages and extensions while yielding the records to a JSON writer that won't yield a closing "}" until the packages and extensions are written. This moves packages and extensions to the end. To fit this into existing code:

For now, we are assuming that users aren't attempting to combine so many small packages that they exhaust memory.

https://github.com/python/cpython/blob/v3.8.1/Lib/json/encoder.py#L259

Adding link to this issue from code, in case of future use, and closing.

jpmckinney commented 4 years ago

The solution below has constant memory consumption for the CLI command. However, we want to keep parity between CLI commands and library methods (which isn't possible without requiring the user to do a lot of extra work - especially to dump the data). Also, I don't think we can assume the stability of json.JSONEncoder.iterencode's implementation. So, not implementing for now.

import itertools

from ocdskit.cli.commands.base import OCDSCommand
from ocdskit.combine import combine_record_packages
from ocdskit.util import (_empty_package, _remove_empty_optional_metadata, _resolve_metadata, _update_package_metadata,
                          iterencode, json_dumps)

class Command(OCDSCommand):
    name = 'combine-record-packages'
    help = 'reads record packages from standard input, collects packages and records, and prints one record package'

    def add_arguments(self):
        self.add_package_arguments('record')

    def handle(self):
        kwargs = {}
        if self.args.pretty:
            kwargs['indent'] = 2
        if self.args.ascii:
            kwargs['ensure_ascii'] = True

        metadata = self.parse_package_arguments()

        ### This section can be put in a separate method.
        output = _empty_package(metadata['uri'], metadata['publisher'], metadata['published_date'])
        output['packages'] = {}

        def items():
            for package in self.items():
                _update_package_metadata(output, package)
                if 'packages' in package:
                    output['packages'].update(dict.fromkeys(package['packages']))
                yield package['records']

        records = itertools.chain.from_iterable(items())
        ###

        encoder = iterencode({'records': records}, **kwargs)

        # This is very specific to the implementation of the json module, which puts the closing brace in one chunk…
        queue = [next(encoder)]
        # … and the indent in another chunk.
        if self.args.pretty:
            queue.append(next(encoder))

        while True:
            try:
                chunk = next(encoder)
            except StopIteration:
                print(',', end='')

                # All inputs have been read; we can clean up now. This section can be put in a separate method.
                if metadata['publisher']:
                    output['publisher'] = metadata['publisher']
                _resolve_metadata(output, 'packages')
                _resolve_metadata(output, 'extensions')
                _remove_empty_optional_metadata(output)
                ###

                # Omit the opening brace.
                print(json_dumps(output, **kwargs)[1:])
                break
            else:
                print(queue.pop(0), end='')
                queue.append(chunk)