daggaz / json-stream

Simple streaming JSON parser and encoder.
MIT License
122 stars 18 forks source link

Streaming elements in a list #49

Closed jxnl closed 1 year ago

jxnl commented 1 year ago

Is there any day to stream the data within a list one by one?

# This is your existing generator that yields chunks of JSON string
def read_json_in_chunks(json_string):
    for i in range(0, len(json_string), 5):  # replace 5 with the chunk size you want
        ss = json_string[i : i + 5]
        yield ss
        print(f"generated {ss}")

import json_stream

json_string = b'{"tasks":[{"id":1,"title":"task1"},{"id":2,"title":"task2"},{"id":3,"title":"task3"}]}'
data = json_stream.load(read_json_in_chunks(json_string), persistent=True)

print(data["tasks"][0])

Currently it streams from [ to ] before returning data["tasks"][0]

or for

for task in data["tasks"]
   ...

It streams everything first.

daggaz commented 1 year ago

Hi @jxnl ,

The reason you're seeing this behaviour, is because the data from the iterator is first being buffered (up to the system default buffer size, which for me is 8096).

If your data is longer than this buffer size, then you would see it being processed in chunks.

This is a side-effect of using the iterable wrapping, I'm not really sure if there's a way around that, but I'm thinking about it.

If you use an unbuffered stream, for example a network socket stream with buffering=0 as below, then you will not see this behaviour:

import asyncio
import socket

import json_stream

json_string = b'{"tasks":[{"id":1,"title":"task1"},{"id":2,"title":"task2"},{"id":3,"title":"task3"}]}'

async def handle_client(_, writer):
    for i in range(0, len(json_string), 15):
        message = json_string[i:i + 15]
        print(f"Sending: {message!r}")
        writer.write(message)
        await writer.drain()
        await asyncio.sleep(1)

    print("Closing connection")
    writer.close()
    await writer.wait_closed()

def test():
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.connect(('127.0.0.1', 8888))
    f = sock.makefile(mode='b', buffering=0)
    data = json_stream.load(f)

    for task in data["tasks"]:
        print(task)

async def main():
    server = await asyncio.start_server(handle_client, '127.0.0.1', 8888)
    addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
    print(f'Serving on {addrs}')
    async with server:
        await asyncio.to_thread(test)

asyncio.run(main())
daggaz commented 1 year ago

Here is an example where all the data doesn't fit inside an 8096 length buffer:

import json

import json_stream

def read_json_in_chunks(json_string, chunk_size=100):
    for i in range(0, len(json_string), chunk_size):
        ss = json_string[i:i+chunk_size]
        yield ss
        print('.', end='', flush=True)

data = {
    'tasks': [{
        'id': i,
        'title': f'task{i}'
    } for i in range(1000)],
}
data = json_stream.load(read_json_in_chunks(json.dumps(data).encode()))

for task in data["tasks"]:
    print(task)
daggaz commented 1 year ago

Ah ha!

I have found the issue. There was an unnecessary io.BuffereReader wrapping the IterableStream that wraps the iterable.

Removing this means that each iterator chunk is passed directly out to the tokenizer un-buffered.

New version 2.3.2 on it's way soon.