daggaz / json-stream

Simple streaming JSON parser and encoder.
MIT License
122 stars 18 forks source link

How to export elements from a JSON array and process them batch wise #20

Closed pbel78 closed 2 years ago

pbel78 commented 2 years ago

Hi

Thank you for first for this cool project. I have the requirement to process a list of items in batch of 1000 records.

I have started loading the json file:

data = json_stream.load(f)

after that I have iterated of ther rows.

for row in data.persistent():
    print(json.dumps(row, default=default))

And this works. But now I wanted exit after 1000 rows from the loop and continue later:

index = 0
batch=""
for row in data.persistent():
    index += 1
    batch += json.dumps(row, default=default))
    if index == 1000:
      break
#do something with the batch => that works

I tough now as I quit the for loop before I could renter it and proceed where I was:

index = 0  
batch=""
for row in data.persistent():

=> but this gives me the error that the element is already used up and can't be restarted. So I assume my understanding that "data" can be used like a generator is wrong. I have also tried the for row in data: without the persistent() part but in that case, I don't know how to export the whole JSON element out of the row as it looks json.dumps can't be applied in that situation.

What would be the correct way to read x elements from a json array Input: [{ "key":"value"},{"key":"value1", "anotherkey":"anothervalue"},{"key":"value2"}] lets say if I want to process the first 2 entries in a first batch and then continue and process the next batch? By the way: I don't know the name of the key + I don't know how many keys are in there. I'm interested in the full element including all keys and values.

batch one a string containing: [{ "key":"value"},{"key":"value1", "anotherkey":"anothervalue"}] batch two a string containing: [{"key":"value2"}]

Thank you for any idea what the right solution can be.

daggaz commented 2 years ago

Hey!

Your issue is that you when you start the second batch, your trying to restart iteration on the top level transient list:

f = StringIO('[{ "key":"value"},{"key":"value1", "anotherkey":"anothervalue"},{"key":"value2"}]')

# data is a transient (non restartable) list
data = json_stream.load(f)

# start iterating  - this is your first "chunk" (a chunk of size 1 is still a chunk right?)
for item in data.persistent():  
    print(item)  # item is a persistent dict
    break

# now try iterating again for the second "chunk"
for item in data.persistent():  
    print(item)  # you'll get a TransientAccessException here
    break

The above approach wouldn't work for a regular list either, as you'd just always get the first chunk over and over.

Instead you need to keep a reference to an iterator (by the way, a for loop automatically creates a new iterator when it starts).

data = json_stream.load(f)

# it is an iterator that will produce the next list item when asked
it = iter(data.persistent())

# use the iterator to get the first "chunk"
for item in it:
    print(item)
    break

# use the same iterator to get the next "chunk"
for item in it:
    print(item)
    break

Finally here's a nice neat function to achieve what you wanted:

def chunker(iterable, n):
    it = iter(iterable)
    done = False

    def _chunker():
        nonlocal done
        for _ in range(n):
            try:
                yield next(it)
            except StopIteration:
                done = True

    while not done:
        yield _chunker()

data = json_stream.load(f)

for chunk in chunker(data.persistent(), 2):
    print('-'*10)
    for item in chunk:
        print(dict(item))
pbel78 commented 2 years ago

Thank you so much for the very detailed explanation. That makes it very clear and works! Is there any way to support you and/or the project with a financial contribution?

daggaz commented 2 years ago

Sure! That's very generous!

You can send something through PayPal here .