segmentio / parquet-go

Go library to read/write Parquet files
https://pkg.go.dev/github.com/segmentio/parquet-go
Apache License 2.0
341 stars 102 forks source link

Library sometimes flips boolean values #501

Closed joe-elliott closed 1 year ago

joe-elliott commented 1 year ago

Somewhat regularly parquet-go will flip boolean values. The test at the bottom reproduces this issue.

Some things I've noticed:

type Bools struct {
    A bool
    B bool
}

func TestBools(t *testing.T) {
    f, err := os.CreateTemp(t.TempDir(), "data.parquet")
    if err != nil {
        t.Fatal(err)
    }
    defer f.Close()
    w := parquet.NewGenericWriter[*Bools](f)

    // write some data
    for i := 0; i < 100; i++ {
        // set one random bool true
        st := &Bools{}
        n := rand.Intn(2)
        switch n {
        case 0:
            st.A = true
        case 1:
            st.B = true
        }
        w.Write([]*Bools{st})
        if i%10 == 0 {
            w.Flush()
        }
    }
    w.Close()

    stat, err := f.Stat()
    if err != nil {
        t.Fatal(err)
    }

    pf, err := parquet.OpenFile(f, stat.Size())
    if err != nil {
        t.Fatal(err)
    }

    r := parquet.NewGenericReader[*Bools](pf)
    for {
        sts := make([]*Bools, 1)
        _, err := r.Read(sts)
        st := sts[0]
        if st != nil {
            if st.A && st.B {
                t.Fatal("both true")
            }
        }
        if err == io.EOF {
            break
        }
        if err != nil {
            t.Fatal(err)
        }
    }
}
joe-elliott commented 1 year ago

The row group size matters. If I cut row groups of size 7 this issue does not occur:

        if i%7 == 0 {
            w.Flush()
        }

If I cut row groups of size 8 it does occur:

        if i%8 == 0 {
            w.Flush()
        }
joe-elliott commented 1 year ago

The issue appears to be in the booleanColumnBuffer. The first row group is flushed and then a new row is written. This line is hit:

https://github.com/segmentio/parquet-go/blob/main/column_buffer.go#L833

to extend the slice to hold the new values. When this occurs some of its bits are already flipped to 1 (b/c it had been previously used) and the code that follows does not correct this.

This diff fixes the issue (and I can confirm it fixes it in Tempo as well), but I assume there is a "more correct" fix:

-- a/column_buffer.go
+++ b/column_buffer.go
@@ -775,6 +775,9 @@ func (col *booleanColumnBuffer) Pages() Pages { return onePage(col.Page()) }
 func (col *booleanColumnBuffer) Page() Page { return &col.booleanPage }

 func (col *booleanColumnBuffer) Reset() {
+       for i := range col.bits {
+               col.bits[i] = 0
+       }
        col.bits = col.bits[:0]
        col.offset = 0
kevinburkesegment commented 1 year ago

Hi Joe, Thanks very much for the report. I'm wondering if the race detector catches this?

joe-elliott commented 1 year ago

Hi Joe, Thanks very much for the report. I'm wondering if the race detector catches this?

I believe this happens entirely in one goroutine. I don't think there are concurrency issues here.

joe-elliott commented 1 year ago

Yeah, I can repro it with this:

func TestAnother(t *testing.T) {
    col := newBooleanColumnBuffer(BooleanType, 0, 2055208)

    writeBool(col, false)
    writeBool(col, false)
    writeBool(col, false)
    writeBool(col, false)
    writeBool(col, true)
    writeBool(col, false)
    writeBool(col, true)
    writeBool(col, true)
    writeBool(col, true)
    writeBool(col, true)
    col.Reset()
    writeBool(col, false) // 0
    writeBool(col, true)  // 1
    writeBool(col, false) // 2
    writeBool(col, true)  // 3
    writeBool(col, true)  // 4
    writeBool(col, false) // 5
    writeBool(col, false) // 6
    writeBool(col, false) // 7
    writeBool(col, true)  // 8

    fmt.Println(0, col.valueAt(0))
    fmt.Println(1, col.valueAt(1))
    fmt.Println(2, col.valueAt(2))
    fmt.Println(3, col.valueAt(3))
    fmt.Println(4, col.valueAt(4))
    fmt.Println(5, col.valueAt(5))
    fmt.Println(6, col.valueAt(6))
    fmt.Println(7, col.valueAt(7)) // prints true
    fmt.Println(8, col.valueAt(8))
}

func writeBool(col *booleanColumnBuffer, b bool) {
    _, err := col.WriteBooleans([]bool{b})
    if err != nil {
        panic(err)
    }
}
joe-elliott commented 1 year ago

This reproduces it even more dramatically:

func TestAnother(t *testing.T) {
    col := newBooleanColumnBuffer(BooleanType, 0, 2055208)

    _, err := col.WriteBooleans([]bool{true, true, true, true, true, true, true, true})
    if err != nil {
        panic(err)
    }
    col.Reset()
    writeBool(col, false) // 0
    _, err = col.WriteBooleans([]bool{false, false, false, false, false, false, false})
    if err != nil {
        panic(err)
    }

    fmt.Println(0, col.valueAt(0)) // prints false
    fmt.Println(1, col.valueAt(1)) // prints true
    fmt.Println(2, col.valueAt(2)) // prints true
    fmt.Println(3, col.valueAt(3)) // prints true
    fmt.Println(4, col.valueAt(4)) // prints true
    fmt.Println(5, col.valueAt(5)) // prints true
    fmt.Println(6, col.valueAt(6)) // prints true
    fmt.Println(7, col.valueAt(7)) // prints true
}

func writeBool(col *booleanColumnBuffer, b bool) {
    _, err := col.WriteBooleans([]bool{b})
    if err != nil {
        panic(err)
    }
}

In this example only 1 boolean is correctly set after calling "Reset()". The issue appears to be this line here:

https://github.com/segmentio/parquet-go/blob/main/column_buffer.go#L851

The code builds a bitmask and assigns the value to b to set all of the booleans in the current bit. Then it |=s the bitmask against the current bit. Interestingly using |= guarantees that this path can never drop an existing bit flag to 0 and whatever happened to be leftover in the page persists.

I believe this is the fix:

@@ -848,7 +851,7 @@ func (col *booleanColumnBuffer) writeValues(rows sparse.Array, _ columnLevels) {
                        }
                        x := uint(col.numValues) / 8
                        y := uint(col.numValues) % 8
-                       col.bits[x] |= (b << y) | (col.bits[x] & ^(0xFF << y))
+                       col.bits[x] = (b << y) | (col.bits[x] & ^(0xFF << y))
                        col.numValues += int32(i)
                }