slingdata-io / sling-cli

Sling is a CLI tool that extracts data from a source storage/database and loads it in a target storage/database.
https://docs.slingdata.io
GNU General Public License v3.0
299 stars 16 forks source link

Parquet outputs empty strings instead of nulls #327

Closed joshuajohananorthos closed 3 hours ago

joshuajohananorthos commented 1 week ago

Creating separate issue for fix of parquet nulls from #304

johanan commented 1 week ago

@flarco Do you a direction I can take on this? I have been running into dead ends.

Here is what I was able to test:

In parquet.go in the function NewParquetWriter it builds the schema based on the columns from the source. That function calls NewRecNode which builds a node for each column. From there I just statically set every field as optional by changing the line

field.Node = parquet.Optional(nodeOf(col, []string{}))

Which changes the schema, but does not work. Here is the output of fmt.Printf("%+v\n", config.Schema)

fields: [required binary (STRING); required binary (STRING);]
message {
        optional binary uuid (STRING);
        optional binary another_col (STRING);
}

The schema says they are optional, but the fields are not optional. From here I have been trying to find the correct type to change to make the fields optional.

I have added these receiver functions trying to find the type that I need to modify:

func (f *structField) Optional() bool { return true }

func (f *structField) Required() bool { return false }
// later in the parquet.go
func (f *goNode) Optional() bool { return true }

func (f *goNode) Required() bool { return false }

I really thought updating structfield would work as it the type defined for fields in RecNode. I just cannot get the fields to be optional.

The code will compile and run, but it just outpus NULL for everything. It has the correct columns and rows, but literally everything is NULL.

I also could not see where parquet_arrow.go was called during my tests of going from a database -> file. From what I can tell the parquet was written using parquet-go in parquet.go.

flarco commented 2 days ago

Thanks, sorry for the delay. I am facing the same thing, everything comes out as null, very strange. When I put Required() bool { return true }, the values are written correct but not nulls. Looking more...

joshuajohananorthos commented 1 day ago

Thanks for checking. I wanted to make sure it was not just missing something as I do not have a lot of Go experience. I started a new project to see if I can write a parquet file with nulls and then take what I learned there and apply it here.

flarco commented 1 day ago

I just added TestParquetWrite3 here: https://github.com/slingdata-io/sling-cli/pull/318/commits/a26a8747228321df6a80af28496adfbf022dfa01

Running it with go test -run TestParquetWrite3 will write a new parquet file.

If you inspect that file, you'll see the proper nulls!

So that proves it is doable, it's just finding that logic in the github.com/parquet-go/parquet-go lib. In TestParquetWrite3 I am using the NewGenericWriter with Go generics, which is the recommended approach. The issue, is that the struct needs to be defined as a hard-coded schema (which doesn't work with sling since it needs to be defined during run-time -- to adhere to the source stream column names/types).

So now need to found out how to reproduce this behavior with the parquet.NewWriter method, which actually has been marked as deprecated... But not sure how else to do this.

flarco commented 1 day ago

Something like this could work, but need the correct config.Schema. getParquetSchema(columns) doesn't work for optional at the moment


    config, err := parquet.NewWriterConfig()
    g.LogFatal(err)

    columns := NewColumns(
        Column{Name: "col_int1", Type: IntegerType},
        Column{Name: "col_int2", Type: IntegerType},
        Column{Name: "col_time", Type: TimestampType},
    )
    config.Schema = getParquetSchema(columns)
    config.Compression = &parquet.Snappy

    w := parquet.NewGenericWriter[map[string]any](file, config)

    now := time.Now()
    rows := []map[string]any{
        {"col_int1": g.Int(1), "col_int2": 0, "col_time": &now},
        {"col_int1": nil, "col_int2": 1, "col_time": nil},
    }
flarco commented 1 day ago

This works!

func TestParquetWrite4(t *testing.T) {

    file, err := os.CreateTemp(env.GetTempFolder(), "*.parquet")
    g.Info(file.Name())
    g.LogFatal(err)

    config, err := parquet.NewWriterConfig()
    g.LogFatal(err)

    columns := NewColumns(
        Column{Name: "col_int1", Type: IntegerType},
        Column{Name: "col_int2", Type: IntegerType},
        Column{Name: "col_time", Type: TimestampType},
    )
    config.Schema = getParquetSchema(columns)
    config.Compression = &parquet.Snappy

    w := parquet.NewGenericWriter[map[string]any](file, config)

    now := time.Now()
    rows := []map[string]any{
        {"col_int1": 1, "col_int2": 0, "col_time": now},
        {"col_int1": nil, "col_int2": 1, "col_time": nil},
    }

    _, err = w.Write(rows)
    g.LogFatal(err)

    err = w.Close()
    g.LogFatal(err)

    err = file.Close()
    g.LogFatal(err)

}

I'll have to refactor a few things. I'll take it from here, thanks for your help.

flarco commented 3 hours ago

Fixed with https://github.com/slingdata-io/sling-cli/pull/318/commits/beb7ae77fb6df8ee15a9b618c2c305fa1762a564 Releasing v1.2.12 soon. Closing.