daggaz / json-stream

Simple streaming JSON parser and encoder.
MIT License
122 stars 18 forks source link

# How to perform a "streaming transform" on some JSON? #32

Closed zacharysyoung closed 1 year ago

zacharysyoung commented 1 year ago

I'm struggling to put together the concepts you've laid out well in the docs into a solution where I can stream in some JSON, update some values, and stream it out.

I have this toy example JSON:

{
    "0": {"foo": "bar"},
    "1": {"foo": "bar"},
    "2": {"foo": "bar"},
    "3": {"foo": "bar"},
    "4": {"foo": "bar"},
    "5": {"foo": "bar"},
    "6": {"foo": "bar"},
    "7": {"foo": "bar"},
    "8": {"foo": "bar"},
    "9": {"foo": "bar"}
}

where I want to update the value for every odd (int-ified) key to {"foo": "BAR"}:

{
    "0": {"foo": "bar"},
    "1": {"foo": "BAR"},
    "2": {"foo": "bar"},
    "3": {"foo": "BAR"},
    "4": {"foo": "bar"},
    "5": {"foo": "BAR"},
    "6": {"foo": "bar"},
    "7": {"foo": "BAR"},
    "8": {"foo": "bar"},
    "9": {"foo": "BAR"}
}

The only I thing I've made work is:

@streamable_dict
def update(data):
    for key, value in data.items():
        if int(key) % 2 == 1:
            value = {"foo": "BAR"}
        else:
            value = dict(value)

        yield key, value

with open("input.json") as f_in:
    data = json_stream.load(f_in, persistent=True)
    updated_data = update(data)
    with open("output.json", "w") as f_out:
        json.dump(updated_data, f_out, indent=1)

But I have to use persistent=True to make that work and that uses 2X more memory over the standard lib's load and dump functions.

I've looked at the Encoding json-stream objects section, but cannot figure out what it'd take to make either json-stream's default function or JSONStreamEncoder class work for me. I've also tried to figure out if the visitor pattern is applicable.

Generally, my stumbling block seems to be getting the worker/procesor in between json-stream's decoder and standar lib's encoder.

Do you have a concrete example of doing a "streaming transform" of some JSON?

daggaz commented 1 year ago

You have hit upon some interesting edge cases here.

Turns out that the json.dump() support only covers persistent streams, which is why you were getting TransientAccessExceptions.

And since the implementation of default uses dict(obj), that's why you were getting the same error with your value = dict(value).

I think that the code you've written (without persistent=True) should "just work", and I'm working on a fix.

In the meantime, with the current version, you can achieve what you want by moving the persistent part from the top-level dict to the individual dict items:

@streamable_dict
def update(data):
    # persistent() call here causes the individual items to be persistent while
    # keeping the top-level dict transient
    for key, value in data.persistent().items():
        if int(key) % 2 == 1:
            value = {"foo": "BAR"}
        yield key, value

with open("input.json") as f_in:
    data = json_stream.load(f_in)  # no persistent=True
    updated_data = update(data)
    with open("output.json", "w") as f_out:
        json.dump(updated_data, f_out, indent=1)

The persistent() method is documented here: https://github.com/daggaz/json-stream#mixed-mode

daggaz commented 1 year ago

I have created a PR that removes the need to call persistent() at all. See #33.

Would you be able to test your usecase against that branch?

The change incorporates @smheidrich's to_standard_types from #16 to iteratively convert any streaming object into regular python dict/lists before returning them to the json encoder.

This is effectively equivalent to calling persistent() as in my previous comment, but it's a bit more magic.

daggaz commented 1 year ago

Released 2.2.0 containing these updates

zacharysyoung commented 1 year ago

Thank you for the updates and the help. I'm sorry I didn't respond earlier, I got kind of burned out trying to answer this SO, https://stackoverflow.com/q/39339044/246801.

I'm curious if I implemented json-stream well enough. I'm guessing the 2-3x time overhead for reading/transforming comes down to lots of pure-Python calls. Did I miss something obvious?

Thanks again!

daggaz commented 1 year ago

You were right, the many python function calls, and many attribute lookups, was slowing things down significantly. I've created a PR #34 with some basic changes that improve things by 28% (according to my tests). I'd appriciate any insight you could give based on your test results.

zacharysyoung commented 1 year ago

I ran the current version of json-stream against this PR with the test harness that generate the table in that SO post, then did a delta of the values. I also ran the standard lib's json module as a control to show how inaccurate this method can be—even when nothing's changed the values can still vary. That 1.63% is for the sys-time, which was about 1s.

Generate

Method Items Real (s) User (s) Sys (s) Mem (MB)
standard 1e+07 1.02% 1.00% 1.63% 1.00%
stream 1e+07 0.99% 0.99% 0.80% 0.96%

Read

Method Items Real (s) User (s) Sys (s) Mem (MB)
standard 1e+07 0.98% 1.00% 0.91% 1.00%
stream 1e+07 0.83% 0.83% 0.89% 1.04%

Transform

Method Items Real (s) User (s) Sys (s) Mem (MB)
standard 1e+07 1.00% 1.00% 0.97% 1.00%
stream 1e+07 0.87% 0.87% 0.84% 0.97%

So, I'm definitely not seeing 28% "in the wild", and the changes seem to favor reading.

daggaz commented 1 year ago

Ah, no, I meant a 28% speedup comparing this branch with the head of master.

zacharysyoung commented 1 year ago

Hmm, I believe I tested the perf branch against v2.2.0:

% pip3 list
...
json-stream              2.2.0
...

And I'm using /urs/bin/time on my Mac for a broader measurement. My tables include all fields, like before, but I think only Real (s) has any meaning for the comparison. How did you measure to arrive at a 28% speedup?

daggaz commented 1 year ago

I was using the numbers spat out by cProfile, but that may not match real time as you measured, as the profiling itself will add different overheads to different operations.

Instead I followed your approach, and ran script with the time command against the perf and master branches.

The script was:

from collections import deque

import json_stream

def read_stream(fname: str):
    with open(fname) as f:
        deque(json_stream.load(f), maxlen=0)

for _ in range(20):
    read_stream('gen/1000000.json')
    print('.', flush=True, end='')
print()

The input file was generated from your script with 1 million items.

Results:

branch result
master 62.34s user 0.60s system 98% cpu 1:03.85 total
perf 47.66s user 0.34s system 98% cpu 48.554 total

That appears to be about 24% faster.