dotnet / runtime

.NET is a cross-platform runtime for cloud, mobile, desktop, and IoT apps.
https://docs.microsoft.com/dotnet/core/
MIT License
15.14k stars 4.71k forks source link

Let Utf8JsonReader process input with one complete JSON document per line #33030

Closed mrange closed 3 months ago

mrange commented 4 years ago

EDIT see https://github.com/dotnet/runtime/issues/33030#issuecomment-2197778133 for an API proposal

Apologies if this ticket has been created but I missed it or there's a flag that I missed. I looked around but didn't find a good match.

I work in BigData and a common scenario is that we have large files that contains JSON documents separated by newline like so:

{}
[]
{}

This is not a valid JSON document obviously but it would help me a lot if the Utf8JsonReader could be configured to process a sequence of JSON documents like this.

My alternatives are to read line by line (forcing me to parse the data and thus losing performance) or adding extra complexity when iterating over the objects so I can keep track when an object is done and I should continue to the next.

Neither is very attractive to me and IMHO since this is a quite common scenario it would make some sense to add it, especially since the parser support relaxed parsing wrt to trailing commas and comments

mrange commented 4 years ago

Code example of workaround:


open System
open System.IO
open System.Text.Json

[<EntryPoint>]
let main argv =
  let options = JsonReaderOptions ()
  let input = File.ReadAllBytes "bigdata.json"

  let bytes = ReadOnlyMemory input

  let mutable objectsSeen = 0
  let mutable tokensSeen = 0

  for i = 1 to 1000 do
    let rec processAllLines offset =
      let bytes = bytes.Span

      // Advance the span to next available object
      let bytes = bytes.Slice offset
      let reader = Utf8JsonReader (bytes, options)

      objectsSeen <- objectsSeen + 1
      let mutable level = 0
      let mutable cont = true

      // Consume a single line that assumed to contain a single object

      // reader is a ref struct which prevents usage of tail-rec
      //  :-(
      //  Not really a bug in Utf8JsonReader but rather in the F#
      //  story around ref structs
      while cont &&  reader.Read () do
        tokensSeen <- tokensSeen + 1
        match reader.TokenType with
        | JsonTokenType.StartArray
        | JsonTokenType.StartObject -> level <- level + 1
        | JsonTokenType.EndArray
        | JsonTokenType.EndObject ->
          level <- level - 1
          if level = 0 then
            cont <- false
        | _ -> ()

      let offset = offset + int reader.BytesConsumed

      // After consuming the line there's still trailing non
      //  printable lines, consume them in order to be able to
      //  tell if we reached the end of input
      let rec consumeEndOfLine o =
        if o < input.Length && input.[o] < 32uy then
          consumeEndOfLine (o + 1)
        else
          o

      let offset = consumeEndOfLine offset

      // Done?
      if offset < input.Length then
        processAllLines offset
    processAllLines 0

  printfn "Objects seen : %d" objectsSeen
  printfn "Tokens seen : %d" tokensSeen

  0
layomia commented 4 years ago

From @int32overflow in https://github.com/dotnet/runtime/issues/36750

The following JSON file (with multi content) is not supported:

{ "name": "Admin" }{ "name": "Publisher" }

The following exception occurs: System.Text.Json.JsonReaderException: "'{' is invalid after a single JSON value. Expected end of data. LineNumber: 0 |

In the library Newtsoft Json.NET there was the following property "SupportMultipleContent". What is the solution here?

https://www.newtonsoft.com/json/help/html/ReadMultipleContentWithJsonReader.htm

Clockwork-Muse commented 4 years ago

... at the very least, using File.ReadLines(...) could help alleviate a large portion of your memory issues. What might be better for the workaround would be if there was some sort of File.ReadLinesAsBytes (since then you wouldn't be deserializing into string first) - essentially, something returning IEnumerable<ReadOnlySpan> that looks for newlines (or possibly something stranger, if you wanted to buffer the input).

angelobreuer commented 4 years ago

I have a similar issue to this, and my issue may fit into this issue:

I am reading UTF-8 JSON data encapsulated in JavaScript from a web server. I've created a stream that skips to the JSON data (according to a specific preamble the JSON data is prefixed by). After the JsonDocument was read, I get an exception that there is data after the object's last token. My current workaround is to save the entire web page in memory (about 2 MiB but I'm only interested in a few KiB) and put it into a JSON reader which works fine, e.g.:

var jsonData = Encoding.UTF8.GetBytes("{\"d\":\"test\"}}}}}"); //  A JSON object with additional data

// Wanted way
using var memoryStream = new MemoryStream(jsonData); // In my case this is a file stream or HTTP content stream
var document = await JsonDocument.ParseAsync(memoryStream); // FAILS!!
var jsonElement = document.RootElement;

// The "normal" way
var document = JsonDocument.Parse(jsonData.AsMemory()); // FAILS!!
var jsonElement = document.RootElement;

// Workaround:
var jsonReader = new Utf8JsonReader(jsonData.AsSpan(), isFinalBlock: true, default);
var jsonElement = (JsonElement)JsonSerializer.Deserialize<object>(ref jsonReader);

In the first and second sample, I get the following exception: System.Text.Json.JsonReaderException : '}' is invalid after a single JSON value. Expected end of data. LineNumber: 0 | BytePositionInLine: 12..

If I try to deserialize the object, it works fine, and additional data is ignored. The aforementioned is a considerable performance loss, as I have to read the web page and data I'm not interested in.

For example, a property AllowAdditionalContent in JsonDocumentOptions would be fine and would allow users to use System.Text.Json more flexibly.

I appreciate any help you can provide. Angelo Breuer

Clockwork-Muse commented 4 years ago

@angelobreuer -

using var memoryStream = new MemoryStream(jsonData); // In my case this is a file stream or HTTP content stream`

If your stream is already skipping to the start, why not have it close at the end? Change the stream to look for the end of the JSON section, and returns EOF (Read returns 0) when it detects the end of the json section. If you don't have one, write a wrapping stream that does this.

// The "normal" way
var document = JsonDocument.Parse(jsonData.AsMemory()); // FAILS!!

Something similar is possible here, since memory and sequence objects can take a range, allowing you to extract just that portion.

angelobreuer commented 4 years ago

@Clockwork-Muse

If your stream is already skipping to the start, why not have it close at the end? Change the stream to look for the end of the JSON section, and returns EOF (Read returns 0) when it detects the end of the JSON section. If you don't have one, write a wrapping stream that does this.

As a workaround, I've already done this but scanning each "block" of bytes for the specific end sequence where the JSON ends have significant overhead.

I have migrated from Json.NET where the aforementioned was possible without any additional overhead:

using var streamReader = new StreamReader(someHttpResponseStream); // already skipped to start of JSON
using var jsonTextReader = new JsonTextReader(streamReader);
var data = await JObject.LoadAsync(jsonTextReader);

I've also checked other JSON serializers which allow the same (Utf8Json, Json.NET, ServiceStack.Text).

eiriktsarpalis commented 2 years ago

I think SupportMultipleContent-like option would be a useful addition to Utf8JsonReader, but we'd need to write up an API proposal.

ghost commented 2 years ago

This issue has been marked with the api-needs-work label. This may suggest that the proposal requires further refinement before it can be considered for API review. Please refer to our API review guidelines for a detailed description of the process.

When ready to submit an amended proposal, please ensure that the original post in this issue has been updated, following the API proposal template and examples as provided in the guidelines.

darkguy2008 commented 2 years ago

Is there any progress on this feature as of today?

remcoros commented 2 years ago

I solved a similar case where I received json objects from a websocket stream, where the sender concats multiple json objects into a single websocket message. Maybe it helps:

The trick here is to check for the root objects '{' token. Then skip all children (TrySkip), if that fails, the buffer contains incomplete data and needs more. If it succeeds, the payload will contain one single json object and you can pass it to another Utf8JsonReader for 'actual' parsing.

        private bool TryParseMessage(ref ReadOnlySequence<byte> buffer, [NotNullWhen(returnValue: true)] out WebSocketMessage? message)
        {
            try
            {
                // Bitvavo sends multiple events in one message, like: { "event" : "candle"...}{ "event" : "ticker"...}
                // Split these messages by starting at the first json object and skipping all its children
                // if we can't skip, we need more data, since the object is not yet fully in the buffer
                // As a side-effect, we actually don't care about individual websocket frames or messages anymore, we request more raw data until we have a valid json object

                var rdr = new Utf8JsonReader(buffer, isFinalBlock: false, state: default);
                if (rdr.Read())
                {
                    if (rdr.TokenType != JsonTokenType.StartObject)
                    {
                        throw new JsonException("Invalid JSON, must start with an object");
                    }

                    if (!rdr.TrySkip())
                    {
                        // Need more data
                        message = null;
                        return false;
                    }

                    ReadOnlySequence<byte> payload;
                    if (buffer.GetOffset(rdr.Position) >= buffer.GetOffset(buffer.End))
                    {
                        // Skipped to end of buffer and have a single message
                        payload = buffer;
                        buffer = buffer.Slice(buffer.End);
                    }
                    else
                    {
                        // Still have data in the buffer, slice of the payload and set buffer with remaining data
                        payload = buffer.Slice(0, rdr.Position);
                        buffer = buffer.Slice(rdr.Position);
                    }

                    message = _messageParser.ParseMessage(payload);

                    if (message != null)
                    {
                        return true;
                    }

                    // No valid message
                    message = new UnknownWebSocketMessage()
                    {
                        Payload = Encoding.UTF8.GetString(payload)
                    };

                    return true;
                }
            }
            catch (JsonException)
            {
                // TODO: log/put in message
            }

            // No valid json/event, fallback to plain text
            message = new UnknownWebSocketMessage()
            {
                Payload = Encoding.UTF8.GetString(buffer)
            };

            buffer = buffer.Slice(buffer.End);
            return true;
        }
darkguy2008 commented 2 years ago

@remcoros I think we are working on projects with similar websocket behavior! :D

Thanks for sharing that piece of code, it helped me to write my own version based on yours, my use-case was different (had to return a list of objects instead of parsing them inside the function itself) but most of the code was used.

Thank you!!!

P.S. Hard to know how come this isn't a core feature of System.Text.Json! :/

webczat commented 2 years ago

skipping means you need whole json object in memory for it to succeed. also because UTF8JsonReader itself does not support streams as input, it would be nice if you could deserialize to a sequence of objects in cases like networked streams. they could as well not be \r\n delimited, it should be doable in theory.

TheXenocide commented 2 years ago

I would also appreciate something similar to this. Perhaps an IAsyncEnumerable that can read a whole "root object" 1 at a time? My use cases are:

menees commented 1 year ago

@TheXenocide You may be able to use JsonSerializer.DeserializeAsyncEnumerable. Here's an example use from a StackOverflow answer that sounds like your use case:

With .NET 6 or later, we can use the DeserializeAsyncEnumerable method to read in streaming fashion over a large JSON file that has an array of items. I've used this to process a 5 GB JSON file with >100,000 items.

using var file = File.OpenRead(path);
var items = JsonSerializer.DeserializeAsyncEnumerable<JsonElement>(file);
await foreach (var item in items)
{
    // Process JSON object
}
chaseaucoin commented 1 year ago

@TheXenocide You may be able to use JsonSerializer.DeserializeAsyncEnumerable. Here's an example use from a StackOverflow answer that sounds like your use case:

With .NET 6 or later, we can use the DeserializeAsyncEnumerable method to read in streaming fashion over a large JSON file that has an array of items. I've used this to process a 5 GB JSON file with >100,000 items.

using var file = File.OpenRead(path);
var items = JsonSerializer.DeserializeAsyncEnumerable<JsonElement>(file);
await foreach (var item in items)
{
    // Process JSON object
}

That works well for well formed JSON, but for NDJson which is popular in big data pipelines and mandatory if you want to support most append only scenarios doesn't work.

TheXenocide commented 1 year ago

append only scenarios

Yeah, the streams I'm reading come from append only scenarios that do not have array tokens at the beginning or end of file, though I suppose that could be provided by a relatively trivial wrapper stream if it's required. I'll see if I can find some time to try this out sometime soon.

LarinLive commented 1 year ago

@remcoros, thanks for your idea of the input buffer slicing.

I solved a similar case where I received json objects from a websocket stream, where the sender concats multiple json objects into a single websocket message. Maybe it helps:

I am working on the WebSockets too and have encountered with the same problem; and your example helped me a lot to find a way to solve it!

benaadams commented 1 year ago

Can use System.IO.Pipleines and a PipeReader to create an async enumerable of JsonDocuments

using System.Buffers;
using System.IO.Pipelines;
using System.Text.Json;

var stream = new MemoryStream("{\"id\":67,\"jsonrpc\":\"2.0\"}[{\"id\":68,\"jsonrpc\":\"2.0\"},{\"id\":69,\"jsonrpc\":\"2.0\"}]{\"id\":70,\"jsonrpc\":\"2.0\"}"u8.ToArray());
var reader = PipeReader.Create(stream);

await foreach (var jsonDocument in ParseJson(reader))
{
    Console.WriteLine(jsonDocument.RootElement.ToString());
    jsonDocument.Dispose();
}

static async IAsyncEnumerable<JsonDocument> ParseJson(PipeReader reader)
{
    while (true)
    {
        ReadResult result = await reader.ReadAsync();
        ReadOnlySequence<byte> buffer = result.Buffer;

        while (!buffer.IsEmpty && TryParseJson(ref buffer, out JsonDocument jsonDocument))
        {
            yield return jsonDocument;
        }

        if (result.IsCompleted)
        {
            break;
        }

        reader.AdvanceTo(buffer.Start, buffer.End);
    }

    reader.Complete();
}

static bool TryParseJson(ref ReadOnlySequence<byte> buffer, out JsonDocument jsonDocument)
{
    var reader = new Utf8JsonReader(buffer, isFinalBlock: false, default);

    if (JsonDocument.TryParseValue(ref reader, out jsonDocument))
    {
        buffer = buffer.Slice(reader.BytesConsumed);
        return true;
    }

    return false;
}
ewilansky commented 11 months ago

@benaadams solution worked nicely for me. In my case, I needed to read in a file stream instead of a memory stream, otherwise exactly the same.

var path = ./manifest.ndjson";
await using FileStream fileStream = new(path, FileMode.Open, FileAccess.Read, FileShare.Read, bufferSize: 4096, useAsync: true);
... 

If reading the fileStream synchronously, this simpler method call also works.

var path = ./manifest.ndjson";
using var fileStream = File.OpenRead(path);
...
0xced commented 7 months ago

Can use System.IO.Pipleines and a PipeReader to create an async enumerable of JsonDocuments

Awesome solution! Thanks for sharing, Ben.

I used it to migrate from Newtonsoft.Json to System.Text.Json in Docker.DotNet.

baterja commented 6 months ago

I'm also interested in JSONL handling but from the writer's side. I was looking for a way to produce JSONL/NDJSON necessary for Amazon Athena (Apache Hive).

ericwj commented 6 months ago

I think it is very odd to prefer having to parse JSON twice instead of just implementing an option that doesn't throw if there is data left after parsing. All that would take is calling deserialize in a loop with options that specify the do not throw setting. Now even with a fair bit of code in some internal class still you have to go find that in dotnet/runtime and copy it wholesale into your project to get such a simple thing done.

Followin commented 6 months ago

Just add an option that doesn't try to read everything beyond the object it's provided... Provided an opening brace, stop at the corresponding closing one. And the same for the brackets. That's it. It taking 3 years of discussions with bunch of hoorays for the solutions that read everything twice or into memory is just disappointing to be honest.

SteveSandersonMS commented 3 months ago

AI-related use cases:

In both cases this can be addressed by having some API like:

T nextItem = await JsonSerializer.ReadNextAsync<T>(stream);

or:

IAsyncEnumerable<T> allItems = JsonSerializer.DeserializeAsyncEnumerable(stream, separator: string.Empty);

@stephentoub pointed out that the DeserializeAsyncEnumerable approach would perform better because the ReadNextAsync would be limited to reading a single character at a time from the stream, since it has no way to store not-yet-consumed data and in general we can't rewind the stream after a read.

SteveSandersonMS commented 3 months ago

I guess one other possible API design is like this:

using var readContext = new JsonStreamReadContext(stream); // TODO: better naming

while (await JsonSerializer.ReadNextAsync<T>(readContext) is {} nextItem)
{
    // ...
}

... as this would give a place to track read-but-not-yet-consumed data from the stream. Not saying it's any better than DeserializeAsyncEnumerable though.

eiriktsarpalis commented 3 months ago

Here is an API proposal based on a prototype I've been working on:

namespace System.Text.Json;

public partial struct JsonReaderOptions
{
    public bool AllowTrailingContent { get; set; }
}

public partial class JsonSerializerOptions
{
    public bool AllowTrailingContent { get; set; }
}

namespace System.Text.Json.Serialization;

public partial class JsonSourceGenerationOptionsAttribute
{
    public bool AllowTrailingContent { get; set; }
}

API Usage

Enabling the setting endows Utf8JsonReader with the ability to read through multiple root-level JSON documents that are separated by whitespace:

var reader = new Utf8JsonReader("null {} 1 \r\n [1,2,3]"u8, new() { AllowTrailingContent = true });

reader.Read();
Console.WriteLine(reader.TokenType); // Null

reader.Read();
Console.WriteLine(reader.TokenType); // StartObject
reader.Skip();

reader.Read();
Console.WriteLine(reader.TokenType); // Number

reader.Read();
Console.WriteLine(reader.TokenType); // StartArray
reader.Skip();

Console.WriteLine(reader.Read()); // False

This additionally makes it possible to read JSON from payloads that may contain trailing data that is invalid JSON:

var reader = new Utf8JsonReader("[1,2,3]    <NotJson/>"u8, new() { AllowTrailingContent = true });

reader.Read();
reader.Skip(); // Success
reader.Read(); // throws JsonReaderException

The equivalent JsonSerializerOptions setting can be enabled to support deserialization of values while discarding any trailing data:

JsonSerializerOptions options = new() { AllowTrailingContent = true };

JsonSerializer.Deserializer<int[]>("[1,2,3]   { }[]{}", options); // Success
JsonSerializer.Deserializer<int[]>("[1,2,3]   <!NOT JSON!>", options); // Success

New DeserializeAsyncEnumerable overloads

The following APIs make it possible to stream multiple root-level JSON values using IAsyncEnumerable:

namespace System.Text.Json;

+public enum JsonDeserializeAsyncEnumerableMode
+{
+   Array = 0,
+   RootLevelValues = 1,
+}

public partial static class JsonSerializer
{
    public static IAsyncEnumerable<T> DeserializeAsyncEnumerable<T>(Stream utf8Json, JsonSerializerOptions options = null, CancellationToken cancellationToken = default);
    public static IAsyncEnumerable<T> DeserializeAsyncEnumerable<T>(Stream utf8Json, JsonTypeInfo<T> jsonTypeInfo, CancellationToken cancellationToken = default);
+   public static IAsyncEnumerable<T> DeserializeAsyncEnumerable<T>(Stream utf8Json, JsonDeserializeAsyncEnumerableMode mode, JsonSerializerOptions options = null, CancellationToken cancellationToken = default);
+   public static IAsyncEnumerable<T> DeserializeAsyncEnumerable<T>(Stream utf8Json, JsonTypeInfo<T> jsonTypeInfo, JsonDeserializeAsyncEnumerableMode mode, CancellationToken cancellationToken = default);
}

Which enables scenaria like the following:

string json = """[0] [0,1] [0,1,1] [0,1,1,2] [0,1,1,2,3]""";
using var stream = new MemoryStream(Encoding.UTF8.GetBytes(json));

await foreach(int[] item in JsonSerializer.DeserializeAsyncEnumerable<int[]>(stream, JsonDeserializeAsyncEnumerableMode.RootLevelValues))
{
    Console.WriteLine(item.Length);
}

cc @SteveSandersonMS @stephentoub

stephentoub commented 3 months ago

Here is an API proposal based on a prototype I've been working on

Thanks, that looks pretty good.

For the JsonDeserializeAsyncEnumerableMode, would we imagine a future where you could opt-in to both RootLevelValue and Array, i.e. it'd yield top-level objects but if they were arrays it would instead enumerate their contents? I don't know how valuable that would be, but it would impact the shape of the enum.

eiriktsarpalis commented 3 months ago

would we imagine a future where you could opt-in to both RootLevelValue and Array, i.e. it'd yield top-level objects but if they were arrays it would instead enumerate their contents?

Probably not, and that's because of the ambiguity arising from the case where the element types themselves serialize as arrays. E.g. it would be unclear if DeserializeAsyncEnumerable<int[]>("""[]""") should be returning an empty or singleton IAE.

bartonjs commented 3 months ago

Video

namespace System.Text.Json
{
    public partial struct JsonReaderOptions
    {
        public bool AllowMultipleValues { get; set; }
    }

    public partial static class JsonSerializer
    {
        public static IAsyncEnumerable<T> DeserializeAsyncEnumerable<T>(Stream utf8Json, bool topLevelValues, JsonSerializerOptions options = null, CancellationToken cancellationToken = default);
        public static IAsyncEnumerable<T> DeserializeAsyncEnumerable<T>(Stream utf8Json, JsonTypeInfo<T> jsonTypeInfo, bool topLevelValues, CancellationToken cancellationToken = default);
    }
}