daggaz / json-stream

Simple streaming JSON parser and encoder.
MIT License
122 stars 18 forks source link

Read multiple JSON objects in a stream #30

Open ygoe opened 1 year ago

ygoe commented 1 year ago

It's not mentioned in the documentation, so I'm wondering if this is already possible. I need a streaming JSON parser that can give me one JSON object after the other. No need for streaming each object but I'm going to receive multiple JSON objects over a byte stream (TCP or similar). The problem is that these are byte streams not message streams. I can parse a JSON object by whatever means but first need to separate them from the stream. And one object (or more) might be read completely while the next object was only read partially. Can this library already do this?

The intended purpose is for JSON-RPC over TCP. I couldn't find any Python library (with a free license) for that. Actually JSON-RPC is very simple. It's the message splitting that's hard. It can be done with separate protocol overhead (like WebSockets does it) or by reading complete JSON objects. (I could do that myself, too, but wanted to see if there's a ready solution.)

daggaz commented 1 year ago

I think this is possible:

from time import sleep

from json_stream.tokenizer import tokenize

import json_stream

def input_stream():
    # simulate JSON-RPC messages
    yield '{"bob": 1, "bobby": 4}'.encode()
    sleep(5)
    yield '{"bobo": 3}'.encode()

def json_documents(f):
    # process each new JSON document in the stream and yield
    # a json_stream parser
    try:
        while True:
            yield json_stream.load(f, tokenizer=tokenize)
    except StopIteration:
        pass

f = input_stream()
for document in json_documents(f):
    # once for each new JSON-RPC message
    print("got new message")
    for k, v in document.items():
        # process message
        print(f"{k} = {v}")
    print("end of message")

Note: due to an issue with the rust tokenizer, this code uses the non-default pure-python tokenizer implementation.

jorektheglitch commented 1 year ago

There is one tricky moment - JSONs may be fragmented in different ways.

Example:

"{'alpha': 'bra"
"vo'}{'chadlie': 'delta'}"

Does suggested method works In this case?

daggaz commented 1 year ago

@jorektheglitch I'm not 100% clear what you're saying, that just looks like malformed JSON?

jorektheglitch commented 1 year ago

@daggaz, I meant situation in with JSON readed chunk-by-chunk. There is absolutely no guarantee that JSONs will be read from start to end and nothing else. In example I just show two possible chunks readed from stream.

ygoe commented 1 year ago

I've meanwhile lost interest in this but did the same in C# as the application moved to that language.

I implemented this with a preparser. It cannot deserialise JSON objects into anything, it can just track the syntax and tell me if and where an object is complete so I can extract that part of the data and pass it to a real deserialiser. It then continues with what comes afterwards.

chrishas35 commented 1 year ago

I think this is possible:

This specific example is working for me, but I'm having a difficult time making it work for an httpx stream. Feel like I'm missing something obvious, but any pointers appreciated. I swapped to the httpx.load function.

def json_documents(f):
    # process each new JSON document in the stream and yield
    # a json_stream parser
    try:
        while True:
            yield json_stream.httpx.load(f, tokenizer=tokenize)
    except StopIteration:
        pass

with self.client.stream("POST", url, json=data) as resp:
    for document in json_documents(resp):
        # once for each new JSON-RPC message
        print("got new message")
        for k, v in document.items():
            # process message
            print(f"{k} = {v}")
        print("end of message")

This will get me the first object returned, but the next is throwing httpx.StreamConsumed.

chrishas35 commented 1 year ago

Classic situation of needing to ask the question to be able to figure out the answer (I think?)

def json_documents(f):
    # process each new JSON document in the stream and yield
    # a json_stream parser
    try:
        while True:
            yield json_stream.load(f, tokenizer=tokenize)
    except StopIteration:
        pass

with self.client.stream("POST", url, json=data) as resp:
    for document in json_documents(resp.iter_bytes()):
        # once for each new JSON-RPC message
        print("got new message")
        for k, v in document.items():
            # process message
            print(f"{k} = {v}")
        print("end of message")
daggaz commented 1 year ago

@chrishas35 great that works for you, but I think it's only by accident!

We will be merging big changes to the backend tokenizer API to allow this to work all the time.

chrishas35 commented 1 year ago

I may not be the best programmer, but I get lucky some times! 🤣