francoispqt / gojay

high performance JSON encoder/decoder with stream API for Golang
MIT License
2.11k stars 112 forks source link

Infinite loop on decode stream #84

Closed xocasdashdash closed 5 years ago

xocasdashdash commented 5 years ago

(Sorry for the empty post, i fat fingered the keyboard) I've found an issue when I'm decoding an stream where gojay would go into an infinite loop on some chunks of the stream. The code basically was this:

rawBytes, bufferErr := b.ReadBytes('\n')
entries := 1
breakOut := false
logger.Log.Debug("First line read, going into loop")
//TODO: Review loop code
for (bufferErr == io.EOF || bufferErr == nil) && !readCanceled && !breakOut {
    if len(rawBytes) > 0 {
        logger.Log.Debugf("Going to decode %d bytes", len(rawBytes))
        rawBuffer := bytes.NewBuffer(rawBytes)
        dec := gojay.Stream.BorrowDecoder(rawBuffer)

        err := dec.DecodeStream(streamChan)
        if err != nil {
            logger.Log.Errorf("error decoding \"%+v\" %+v", rawBytes, err)
        } else {
            streamChan <- m
        }
        dec.Release()
    }
    rawBytes, bufferErr = b.ReadBytes('\n')
    entries = entries + 1
    logger.Log.Debugf("Read %d bytes", len(rawBytes))
    if len(rawBytes) == 0 {
        if bufferErr == io.EOF {
            breakOut = true
        } else {
            logger.Log.Debug("No data but not at EOF. Error: %+v", bufferErr)
        }
    }
    logger.Log.Debugf("Exit? Err: %+v, RC: %+v, BO: %+v", bufferErr, readCanceled, breakOut)
}

Basically some chunks of "rawBuffer" would make the buffer end logic fail on https://github.com/francoispqt/gojay/blob/master/decode.go#L961-L990 and then the loop would get stuck.

I fixed by working with specific messages and putting the messages to a channel myself

francoispqt commented 5 years ago

Hi, The only case is if it constantly reads 0 bytes and doesn't return an error, why would it do that? I'm going to run a few tests to reproduce and try to figure out a solution.

berglh commented 5 years ago

I'm actually confused; does the Gojay Stream API actually expect to have a struct defining the fields for the JSON objects being streamed or does it work with something similar to map[string]interface{} like encoding/json?

I'm interested in the Stream API to stream arbitrary JSON from a file and manually walk the map to find/replace values accordingly. Both the source JSON and the path to the value having work done will be changing on each run. Rather than recompile every-time I run this process on a data set, I'd rather write a generic tool that's relatively high performance.

The example code by @xocasdashdash didn't define a struct for streamChan or even instantiate streamChan using make(chan *type).

I know it seems like a fools game to optimise something so dynamic in nature; jsoniter improved my throughput ~15% but everything seems to say Gojay is the fastest, so I'm wondering what the similar approach to doing things with json.NewDecoder? This is my current approach, it works well enough, but I'm interested in learning Gojay and more performant in this kind of usage. Here is the code I'm looking to adapt to Gojay:

    // Open file for steraming
    jsonFile, err := os.Open(file) // Open up a file to read in bytes
    if err != nil {
        fmt.Fprintf(os.Stderr, "Error opening file: %s", err)
        os.Exit(1)
    }
    defer jsonFile.Close() // Close the file when finished

    // Set vars for decoding
    var json = jsoniter.ConfigCompatibleWithStandardLibrary // Use jsoniter instead of encoding/json
    decoder := json.NewDecoder(jsonFile)                    // Create a new JSON decoder for the JSON file
    ID := 0                                                 // Set Job ID value

    // Decoding loop
    for {
        data := make(MapStr)         // Local map string interface required for job submission
        err := decoder.Decode(&data) // Decode the next JSON document
        if err != nil {
            if err.Error() != "EOF" {
                fmt.Fprintf(os.Stderr, "\nError decoding JSON: %s", err)
                break
            }
            if err.Error() == "EOF" {
                break
            }
        }
        ID++                             // Incerement Job ID counter
        jobs <- &Job{ID: ID, Work: data} // Submit a job to the jobs channel

        if ID%1000 == 0 {
            fmt.Fprintf(os.Stderr, "\rTotal JSON docs: %d", ID) // Print our totals periodically
        }
    }

ref: https://github.com/berglh/json-salt/blob/workers/json-salt.go#L119-L147

francoispqt commented 5 years ago

Hi sorry for taking so long to answer your request, I was very busy these past few months. So the bug you mentioned is fixed, it was a problem when the buffer was reset to 0 and wouldn't grow anymore.

Regarding gojay, it's not meant to decode arbitrary JSON, only data implementing the Unmarshaler interface, so it won't really be much more performant in your case.

I'm closing this issue as the problem is fixed.

xocasdashdash commented 5 years ago

@berglh Sorry, just saw this. I do have those interfaces implemented, but I did not think it was necessary for this issue.