open-contracting / ocdskit

A suite of command-line tools for working with OCDS data
https://ocdskit.readthedocs.io
BSD 3-Clause "New" or "Revised" License
17 stars 6 forks source link

split-*-packages: Avoid buffering each input into memory (revparse) #118

Closed jpmckinney closed 4 years ago

jpmckinney commented 4 years ago

We need to read an entire package, because the package metadata can be after the records array. Otherwise, if the metadata were guaranteed to be first (or if we parsed backwards from the end of the file, though parsing JSON backwards would be very complicated – we don't want to write a JSON parser), we could do something like below, to yield a partially-read records array.

First, subclass ijson.common.ObjectBuilder and edit a copy of event():

import ijson
from ijson.common import ObjectBuilder

from ocdskit.cli.commands.base import StandardInputReader

class ChunkObjectBuilder(ObjectBuilder):
    def __init__(self, chunk_key, **kwargs):
        super().__init__(**kwargs)
        self.chunk_key = chunk_key
        self.items = []
        self.key = None

    def event(self, event, value):
        # ... elif event == 'start_array':
            # Use an instance variable for the records/releases array.
            array = self.items if self.key == self.chunk_key else []

Then, edit a copy of ijson.common.items():

def items(prefixed_events, prefix, map_type=None):
                # ... if event in ('start_map', 'start_array'):
                    # Set the prefix for the records array.
                    item_prefix = []
                    if prefix:
                        item_prefix.append(prefix)
                    if event == 'start_array':
                        item_prefix.append('item')
                    item_prefix = '.'.join(item_prefix + ['records', 'item'])

                    builder = ChunkObjectBuilder('records', map_type=map_type)
                    end_event = event.replace('start', 'end')
                    while (current, event) != (prefix, end_event):
                        builder.event(event, value)
                        current, event, value = next(prefixed_events)

                        # After adding an item to the array, if its size reached the limit, yield the
                        # package and truncate the array.
                        if (current == item_prefix and event not in ('start_map', 'map_key') and 
                                len(builder.items) == self.args.size):
                            yield builder.value
                            del builder.items[:]

                    del builder.containers[:]

                    # Yield the package, if there are any items in the array.
                    if builder.items:
                        yield builder.value

Finally, iterate over items():

file = StandardInputReader(self.args.encoding)
for item in items(ijson.parse(file, multiple_values=True), self.args.root_path):
    self.print(item)

To test:

python -c 'import json; print(json.dumps({"records": list(range(2000000))}))' |
ocdskit split-record-packages 100 > /dev/null

Adding link to this issue from code, in case of future use, and closing.