polarsignals / frostdb

❄️ Coolest database around 🧊 Embeddable column database written in Go.
Apache License 2.0
1.27k stars 65 forks source link

There is a bug in the table compactParts function #873

Closed jicanghaixb closed 2 months ago

jicanghaixb commented 2 months ago

I use frostdb to store message column, I found a bug, After compressing the arrow record, there is a misalignment of values in the column, The problem lies in the MergeDynamicRowGroups function. Below is a test case that reproduces the error:

package frostdb

import (
    "io"
    "os"
    "strings"
    "testing"

    "github.com/parquet-go/parquet-go"
    "github.com/stretchr/testify/require"

    "github.com/polarsignals/frostdb/dynparquet"
    schemapb "github.com/polarsignals/frostdb/gen/proto/go/frostdb/schema/v1alpha1"
)

var schema = &schemapb.Schema{
    Name: "test",
    Columns: []*schemapb.Column{
        {
            Name: "name",
            StorageLayout: &schemapb.StorageLayout{
                Type:     schemapb.StorageLayout_TYPE_STRING,
                Encoding: schemapb.StorageLayout_ENCODING_RLE_DICTIONARY,
            },
            Dynamic: false,
        }, {
            Name: "instance",
            StorageLayout: &schemapb.StorageLayout{
                Type:     schemapb.StorageLayout_TYPE_STRING,
                Encoding: schemapb.StorageLayout_ENCODING_RLE_DICTIONARY,
            },
            Dynamic: false,
        }, {
            Name: "down_sample",
            StorageLayout: &schemapb.StorageLayout{
                Type:     schemapb.StorageLayout_TYPE_STRING,
                Encoding: schemapb.StorageLayout_ENCODING_RLE_DICTIONARY,
            },
            Dynamic: false,
        }, {
            Name: "labels",
            StorageLayout: &schemapb.StorageLayout{
                Type:     schemapb.StorageLayout_TYPE_STRING,
                Encoding: schemapb.StorageLayout_ENCODING_RLE_DICTIONARY,
                Nullable: true,
            },
            Dynamic: true,
        }, {
            Name: "timestamp",
            StorageLayout: &schemapb.StorageLayout{
                Type:        schemapb.StorageLayout_TYPE_INT64,
                Encoding:    schemapb.StorageLayout_ENCODING_DELTA_BINARY_PACKED,
                Compression: schemapb.StorageLayout_COMPRESSION_LZ4_RAW,
            },
            Dynamic: false,
        }, {
            Name: "trace",
            StorageLayout: &schemapb.StorageLayout{
                Type: schemapb.StorageLayout_TYPE_STRING,
                // NOTE: using RLE dictionary causes compaction to explode
                // in memory due to high cardinality of the dictionaries.
                Encoding:    schemapb.StorageLayout_ENCODING_PLAIN_UNSPECIFIED,
                Compression: schemapb.StorageLayout_COMPRESSION_LZ4_RAW,
                Nullable:    true,
            },
            Dynamic: false,
        }, {
            Name: "metrics",
            StorageLayout: &schemapb.StorageLayout{
                Type:        schemapb.StorageLayout_TYPE_DOUBLE,
                Encoding:    schemapb.StorageLayout_ENCODING_PLAIN_UNSPECIFIED,
                Compression: schemapb.StorageLayout_COMPRESSION_LZ4_RAW,
            },
            Dynamic: true,
        }, {
            Name: "message",
            StorageLayout: &schemapb.StorageLayout{
                Type: schemapb.StorageLayout_TYPE_STRING,
                // NOTE: using RLE dictionary causes compaction to explode
                // in memory due to high cardinality of the dictionaries.
                Encoding:    schemapb.StorageLayout_ENCODING_PLAIN_UNSPECIFIED,
                Compression: schemapb.StorageLayout_COMPRESSION_LZ4_RAW,
                Nullable:    true,
            },
            Dynamic: false,
        },
    },
    SortingColumns: []*schemapb.SortingColumn{
        {
            Name:      "name",
            Direction: schemapb.SortingColumn_DIRECTION_ASCENDING,
        }, {
            Name:      "instance",
            Direction: schemapb.SortingColumn_DIRECTION_ASCENDING,
        }, {
            Name:      "down_sample",
            Direction: schemapb.SortingColumn_DIRECTION_ASCENDING,
        }, {
            Name:      "timestamp",
            Direction: schemapb.SortingColumn_DIRECTION_ASCENDING,
        }, {
            Name:       "trace",
            Direction:  schemapb.SortingColumn_DIRECTION_ASCENDING,
            NullsFirst: true,
        }, {
            Name:       "labels",
            Direction:  schemapb.SortingColumn_DIRECTION_ASCENDING,
            NullsFirst: true,
        },
    },
}

func openParquetFile(file string) (*parquet.File, io.Closer, error) {
    f, err := os.Open(file)
    if err != nil {
        return nil, nil, err
    }
    stats, err := f.Stat()
    if err != nil {
        return nil, f, err
    }
    pf, err := parquet.OpenFile(f, stats.Size())
    if err != nil {
        return nil, f, err
    }

    return pf, f, nil
}

func TestMergeDynamicRowGroupsBug(t *testing.T) {
    pf1, closer1, err := openParquetFile("01HYD0A3WQJQ8S8CFBJJQ1KADS.parquet")
    require.NoError(t, err)
    defer closer1.Close()
    buf1, err := dynparquet.NewSerializedBuffer(pf1)
    require.NoError(t, err)
    pf2, closer2, err := openParquetFile("01HYD0AN2JY68GP4EMQ98JGDT1.parquet")
    require.NoError(t, err)
    defer closer2.Close()
    buf2, err := dynparquet.NewSerializedBuffer(pf2)
    require.NoError(t, err)
    sc, err := dynparquet.SchemaFromDefinition(schema)

    mbuf1 := buf1.MultiDynamicRowGroup()
    var rowsBuf = make([]parquet.Row, 8192)
    reader := mbuf1.Rows()
    for {
        n, err := reader.ReadRows(rowsBuf)
        if err != nil && err != io.EOF {
            require.NoError(t, err)
        }
        if n == 0 {
            break
        }
        for i := 0; i < n; i++ {
            require.True(t, strings.HasPrefix(rowsBuf[i][8].String(), "10."))
        }
    }
    mbuf2 := buf2.MultiDynamicRowGroup()
    reader = mbuf2.Rows()
    for {
        n, err := reader.ReadRows(rowsBuf)
        if err != nil && err != io.EOF {
            require.NoError(t, err)
        }
        if n == 0 {
            break
        }
        for i := 0; i < n; i++ {
            require.True(t, strings.HasPrefix(rowsBuf[i][8].String(), "10."))
        }
    }
    merged, err := sc.MergeDynamicRowGroups([]dynparquet.DynamicRowGroup{mbuf1, mbuf2})
    reader = merged.Rows()
    for {
        n, err := reader.ReadRows(rowsBuf)
        if err != nil && err != io.EOF {
            require.NoError(t, err)
        }
        if n == 0 {
            break
        }
        for i := 0; i < n; i++ {
            require.True(t, strings.HasPrefix(rowsBuf[i][8].String(), "10."))
        }
    }
}

test case can't pass, message column values have been misplaced

Details

=== RUN TestMergeDynamicRowGroupsBug xxxx_test.go:183: Error Trace: github.com/polarsignals/frostdb/xxxx_test.go:183 Error: Should be true Test: TestMergeDynamicRowGroupsBug --- FAIL: TestMergeDynamicRowGroupsBug (0.15s)

test_parquet_file.zip

thorfour commented 2 months ago

Thanks for the bug report! Will take a look!

thorfour commented 2 months ago

Any chance you can either upload the files or create some files in the unit test that exacerbate the error you're seeing?

(the test_parquet_file.zip doesn't link to any files to download)

jicanghaixb commented 2 months ago

Sorry,I update attache file. The new download URL is "https://github.com/polarsignals/frostdb/files/15396553/test_parquet_file.zip"

thorfour commented 2 months ago

Currently this looks like a bug in the parquet-go library. The merged row reader correctly reads the rows into the buffer, but after the row is correctly in the buffer a subsequent read from the row group causes it to scramble the previous buffer

jicanghaixb commented 2 months ago

Thanks, looking forward to this bug being fixed as soon as possible