segmentio / parquet-go

Go library to read/write Parquet files
https://pkg.go.dev/github.com/segmentio/parquet-go
Apache License 2.0
341 stars 102 forks source link

corruption of already read byte values on read of subsequent pages #475

Closed asubiotto closed 1 year ago

asubiotto commented 1 year ago

When reading rows from a file, ReadRows only reads up to the smallest number of rows remaining in the current page for each column (https://github.com/segmentio/parquet-go/blob/1d85e81366814efe13170410a86be216a8c9b996/row_group.go#L311-L312). When calling ReadRows multiple times (for each page), it seems like the byte values read on previous calls to ReadRows are overwritten to point to the values in the new page. I would expect the rows returned from ReadRows to be safe for reuse without performing a deep copy.

To reproduce, run the following test:

func TestReadRowNoOverwrite(t *testing.T) {
    type model struct {
        Bytes []byte
    }
    var b bytes.Buffer
    w := parquet.NewGenericWriter[model](&b, &parquet.WriterConfig{
        PageBufferSize: 1,
    })

    testBytes := make([][]byte, 2)
    for i := range testBytes {
        testBytes[i] = []byte(fmt.Sprintf("%d", i))
        if _, err := w.Write([]model{{Bytes: testBytes[i]}}); err != nil {
            t.Fatal(err)
        }
    }

    if err := w.Close(); err != nil {
        t.Fatal(err)
    }

    rows := parquet.NewGenericReader[model](bytes.NewReader(b.Bytes()))
    defer rows.Close()
    rowBuf := make([]parquet.Row, rows.NumRows())
    for n := 0; int64(n) < rows.NumRows(); {
        readN, err := rows.ReadRows(rowBuf[n:])
        n += readN
        if err != nil {
            if errors.Is(err, io.EOF) {
                break
            }
            t.Fatal(err)
        }
    }
    for i := range testBytes {
        if !bytes.Equal(testBytes[i], rowBuf[i][0].Bytes()) {
            t.Fatalf(
                "row mismatch at %d, expected %v but found %v",
                i, string(testBytes[i]), string(rowBuf[i][0].Bytes()),
            )
        }
    }
}

cc @achille-roussel

achille-roussel commented 1 year ago

You are correct, the values written to the rows by ReadRows are valid until the next call to the method, since BYTE_ARRAY and FIXED_LEN_BYTE_ARRAY values may refer to memory buffers held within the reader.

I realize that this is not properly documented on the RowReader interface, which is something we should fix https://pkg.go.dev/github.com/segmentio/parquet-go#RowReader

The main reason behind this design decision is to avoid memory copies when the application can work on a batch of rows, then move on to the next and doesn't need to retain values across calls to ReadRows (e.g. this is the case when merging row groups, or streaming rows over the network, etc...).

If you need to retain rows across calls to ReadRows, then making a copy with parquet.Row.Clone is necessary. Alternatively you can work on higher level types using GenericReader[T].Read which copies byte slice and string values and does not share memory with the underlying page buffers.

Let me know if that helps!

asubiotto commented 1 year ago

Gotcha, makes sense. Added a PR to mention this in the comment: https://github.com/segmentio/parquet-go/pull/476