daggaz / json-stream

Simple streaming JSON parser and encoder.
MIT License
122 stars 18 forks source link

Read data from non-normalized objects with possible fields missing #56

Closed comonadd closed 7 months ago

comonadd commented 7 months ago

Problem

Suppose I have an object

{
  "id": 1,
  "firstName": "Josh",
  "lastName": "Joshua",
  "posts": [...a lot of objects here...],
  "total_score": 150
}

I want to read all user fields ("id", "firstName", "lastName", and "total_score"), as well as read each post separately from "posts", with an assumption that there might be some fields missing.

If all specified fields were always present, I could've done lookup:

raw_stream = get_stream()
stream = json_stream.load(raw_stream)
for key in ['id', 'firstName', 'lastName']:
  user[key] = stream[key]
for post in stream['posts']:
  # save posts data...
user['total_score'] = stream['total_score']

But, this does not work for me, because some fields might be missing from the stream, and so I have to use stream.items() to read <=k fields, where k is the number of fields that I expect to get, which is implemented using this function:

def read_streamed_fields_in_order(stream, output, fields_to_read):
    remaining_fields = set(fields_to_read)
    if len(fields_to_read) == 0:
        return
    for idx_in_stream, (key, value) in enumerate(stream.items()):
        if idx_in_stream >= len(fields_to_read):
            return
        should_see_field = fields_to_read[idx_in_stream]
        if key != should_see_field:
            # This means that the key that we are expecting is missing
            if should_see_field in remaining_fields:
                remaining_fields.remove(should_see_field)
        if key in remaining_fields:
            output[key] = value
            remaining_fields.remove(key)
        if len(remaining_fields) == 0:
            return

My callee code looks like this and I get an exception:

read_streamed_fields_in_order(...)
read_posts()
read_streamed_fields_in_order(...) # <-- This here throws json_stream.base.TransientAccessException: Cannot restart iteration of transient JSON stream

I suppose this exception is thrown because I'm trying to create a new stream.items() iterator, which, for some reason, tries to start from the beginning of the object. How can I instead start from where read_posts() left off? I think in transient mode these iterators should share the same seek position, shouldn't they?

daggaz commented 7 months ago

You simply need to move the stream.items() call up to the top level, and pass it in instead:

items = stream.items()
your_function(items, ...)
your_function(items, ...)
comonadd commented 7 months ago

Thanks, that works!