segmentio / parquet-go

Go library to read/write Parquet files
https://pkg.go.dev/github.com/segmentio/parquet-go
Apache License 2.0
341 stars 103 forks source link

Dynamic schema generation #392

Closed tschaub closed 1 year ago

tschaub commented 1 year ago

I was hoping I might be able to build a schema without having a struct type ahead of time. The use case is reading other structured data (where the type can only be inferred at runtime) and generating a Parquet file.

I see lots of useful logic used by the SchemaOf function, but I'm not yet seeing any exported functions that might let me build a schema without having a struct to use as a model.

It looks like I could write my own implementation of the Node interface and use that as the root in the NewSchema function. Is there another way that a schema might be built dynamically, or does this go against the aim of this library?

tschaub commented 1 year ago

I came across Group and other Node-returning functions like String and Leaf. I think these will allow me to build up the root for a schema. In this case I'm trying to read in JSON data and convert to an appropriate Parquet schema. Given the limited types in JSON, this looks pretty doable with the functions that are exported.

I'll close this unless anyone has any other suggestions.

tschaub commented 1 year ago

Hmm, I can create the schema, but am unable to create rows that can be written. Here is a modification of the write (any) example that demonstrates the issue I'm running into:

package main

import (
    "bytes"
    "log"

    "github.com/segmentio/parquet-go"
)

func main() {
    schema := parquet.SchemaOf(struct {
        Name   string
        Number int
    }{})

    buf := new(bytes.Buffer)
    err := parquet.Write(
        buf,
        []any{
            map[string]any{"Name": "Luke", "Number": 42},
            map[string]any{"Name": "Han", "Number": 21},
        },
        schema,
    )
    if err != nil {
        log.Fatal(err)
    }
}

This panics with panic: cannot create parquet value of type BYTE_ARRAY from go value of type interface {} in value.go.

In my case, I'm dynamically generating the schema instead of calling SchemaOf with a struct. I'm reading JSON data, so my rows end up as map[string]any. It looks like this write any type use only works using a map with a more specific type.

Curious if anybody thinks there is hope to use this library to write parquet data where the schema is only known at runtime.

hhoughgg commented 1 year ago
        ds := someData()        
        schema := generateAtRunTime()
        structFields := make([]reflect.StructField, 0, len(schema))
    for _, field := range schema {
        structFields = append(structFields, reflect.StructField{
            Name: capitalizeString(field.Name),
            Type: field.Type,
            Tag:  reflect.StructTag(field.Tag),
        })
    }

    customStruct := reflect.StructOf(structFields)
    customStructSlice := reflect.MakeSlice(reflect.SliceOf(customStruct), len(ds), len(ds))

You can use reflection to dynamically build a struct based on some run time schema. Then you can set each field also using reflection: customStructSlice.Index(0).Field(0).SetInt(100).

You can convert each struct back to an interface with customStructSlice.Index(0).Interface() and use it as an argument to write as well as schemaOf().

I don't think this library supports map[string]any. There are other libraries that do support it but I would do the above instead as in my experience the performance is much better. You can allocate all your memory in a single go with the above vs an allocation must be done for every single map key with map[string]any.

tschaub commented 1 year ago

Thank you for the suggestion, @hhoughgg. I'll give this approach a try.

I can't generate the schema until I parse (some of) the JSON data, so I'm going to have some map[string]any at some point. I guess the steps will look something like:

  1. decode JSON records as []map{string}any
  2. generate struct type representing the decoded records above
  3. generate schema from struct type
  4. convert map{string}any elements into structs
  5. write the structs to parquet
tschaub commented 1 year ago

The approach above is working for me, so I'll close this. Thanks for the pointers, @hhoughgg.

KasonBraley commented 1 year ago

@tschaub I am having the problem as you did and have your steps have helped me out a lot to almost get there. But I am stuck on step 5, and unable to have my custom struct created via reflect able to be written correctly. Were you able to used the GenericWriter with your approach? Or did you have to revert back to the deprecated Writer type? I've been trying to use the GenericWriter, but the problem I am running in to is that a reflect.Type or reflect.Value does not appear to satisfy generics. I am not very familiar with the reflect package, nor Parquet in general, so I could just be missing something.

This is the error I am mostly getting: panic: reflect: call of reflect.Value.MapIndex on struct Value. Although I've tried various different approaches with no success.

This is essentially what I have, after retrieving the details needed for the schema creation from somewhere else. I am just not sure what to rows type in WriteFile, as the code below does not compile.

    group := parquet.Group{}
    structFields := make([]reflect.StructField, 0, len(dtcs))
    for _, dtc := range dtcs {
        tag := reflect.StructTag("parquet:" + strconv.Quote(dtc.ColumnName+",optional"))
        var reflectType reflect.Type

        switch dtc.DatastoreType {
        case "string":
            reflectType = reflect.TypeOf(string(""))
            group[dtc.ColumnName] = parquet.Optional(parquet.String())
        case "int":
            reflectType = reflect.TypeOf(int32(0))
            group[dtc.ColumnName] = parquet.Optional(parquet.Int(32))
        case "long":
            reflectType = reflect.TypeOf(int64(0))
            group[dtc.ColumnName] = parquet.Optional(parquet.Int(64))
        case "double":
            reflectType = reflect.TypeOf(float64(0))
            group[dtc.ColumnName] = parquet.Optional(parquet.Leaf(parquet.DoubleType))
        case "float":
            reflectType = reflect.TypeOf(float32(0))
            group[dtc.ColumnName] = parquet.Optional(parquet.Leaf(parquet.FloatType))
        default:
            log.Println("unsupported datastore type")
        }

        structFields = append(structFields, reflect.StructField{
            Name: caser.String(dtc.ColumnName),
            Type: reflectType,
            Tag:  tag,
        })
    }

    structForSchemaType := reflect.StructOf(structFields)

    newInput := reflect.New(structForSchemaType)

    for logKey, logValue := range m {
        structFieldValue := newInput.Elem().FieldByName(caser.String(logKey))

        if !structFieldValue.IsValid() {
            log.Printf("not valid: %v: %v", logKey, logValue)
            continue
        }

        switch structFieldValue.Kind() {
        case reflect.String:
            structFieldValue.SetString(fmt.Sprintf("%v", logValue))
        case reflect.Int, reflect.Int32, reflect.Int64:
            switch typedLogValue := logValue.(type) {
            case int:
                structFieldValue.SetInt(int64(typedLogValue))
            case string:
                val, err := strconv.ParseInt(typedLogValue, 10, 32)
                if err != nil {
                    // return fmt.Errorf("failed to parse 'int' value to type int: %w", err)
                }
                structFieldValue.SetInt(int64(val))
            }
        case reflect.Float32, reflect.Float64:
            switch typedLogValue := logValue.(type) {
            case float64:
                structFieldValue.SetFloat(typedLogValue)
            case string:
                val, err := strconv.ParseFloat(typedLogValue, 64)
                if err != nil {
                    // return fmt.Errorf("failed to parse 'double' value to type int: %w", err)
                }
                structFieldValue.SetFloat(val)
            }
        }
    }

    schema := parquet.NewSchema("Record", group)

    // doesn't complile. Error is `variable of type reflect.Value is not a type`
    err = parquet.WriteFile("./foo.parquet", []newInput{newInput}, schema)
    if err != nil {
        log.Fatal()
    }

Sorry for commenting on an old & closed issue. I'll probably create a new issue with this question if I don't get a response.

kevinburkesegment commented 1 year ago

Please reopen against parquet-go/parquet-go, we are moving library development there.

On Thu, Jul 27, 2023 at 18:08 Kason Braley @.***> wrote:

@tschaub https://urldefense.com/v3/__https://github.com/tschaub__;!!NCc8flgU!bC7w_83ujA_FqKNXRUfOBZEfU-2EVNgl57hJtZHuFBBn0el5ajHvLt_C0mjx_19iCeynz8za5ETTVkltB0KfvgiuMNE$ I am having the problem as you did and have your steps have helped me out a lot to almost get there. But I am stuck on step 5, and unable to have my custom struct created via reflect able to be written correctly. Were you able to used the GenericWriter with your approach? Or did you have to revert back to the deprecated Writer type? I've been trying to use the GenericWriter, but the problem I am running in to is that a reflect.Type or reflect.Value does not appear to satisfy generics. I am not very familiar with the reflect package, nor Parquet in general, so I could just be missing something.

This is the error I am mostly getting: panic: reflect: call of reflect.Value.MapIndex on struct Value. Although I've tried various different approaches with no success.

This is essentially what I have, after retrieving the details needed for the schema creation from somewhere else. I am just not sure what to rows type in WriteFile, as the code below does not compile.

group := parquet.Group{} structFields := make([]reflect.StructField, 0, len(dtcs)) for _, dtc := range dtcs { tag := reflect.StructTag("parquet:" + strconv.Quote(dtc.ColumnName+",optional")) var reflectType reflect.Type

  switch dtc.DatastoreType {
  case "string":
      reflectType = reflect.TypeOf(string(""))
      group[dtc.ColumnName] = parquet.Optional(parquet.String())
  case "int":
      reflectType = reflect.TypeOf(int32(0))
      group[dtc.ColumnName] = parquet.Optional(parquet.Int(32))
  case "long":
      reflectType = reflect.TypeOf(int64(0))
      group[dtc.ColumnName] = parquet.Optional(parquet.Int(64))
  case "double":
      reflectType = reflect.TypeOf(float64(0))
      group[dtc.ColumnName] = parquet.Optional(parquet.Leaf(parquet.DoubleType))
  case "float":
      reflectType = reflect.TypeOf(float32(0))
      group[dtc.ColumnName] = parquet.Optional(parquet.Leaf(parquet.FloatType))
  default:
      log.Println("unsupported datastore type")
  }

  structFields = append(structFields, reflect.StructField{
      Name: caser.String(dtc.ColumnName),
      Type: reflectType,
      Tag:  tag,
  })

}

structForSchemaType := reflect.StructOf(structFields)

newInput := reflect.New(structForSchemaType)

for logKey, logValue := range m { structFieldValue := newInput.Elem().FieldByName(caser.String(logKey))

  if !structFieldValue.IsValid() {
      log.Printf("not valid: %v: %v", logKey, logValue)
      continue
  }

  switch structFieldValue.Kind() {
  case reflect.String:
      structFieldValue.SetString(fmt.Sprintf("%v", logValue))
  case reflect.Int, reflect.Int32, reflect.Int64:
      switch typedLogValue := logValue.(type) {
      case int:
          structFieldValue.SetInt(int64(typedLogValue))
      case string:
          val, err := strconv.ParseInt(typedLogValue, 10, 32)
          if err != nil {
              // return fmt.Errorf("failed to parse 'int' value to type int: %w", err)
          }
          structFieldValue.SetInt(int64(val))
      }
  case reflect.Float32, reflect.Float64:
      switch typedLogValue := logValue.(type) {
      case float64:
          structFieldValue.SetFloat(typedLogValue)
      case string:
          val, err := strconv.ParseFloat(typedLogValue, 64)
          if err != nil {
              // return fmt.Errorf("failed to parse 'double' value to type int: %w", err)
          }
          structFieldValue.SetFloat(val)
      }
  }

}

schema := parquet.NewSchema("Record", group)

// doesn't complile. Error is variable of type reflect.Value is not a type err = parquet.WriteFile("./foo.parquet", []newInput{newInput}, schema) if err != nil { log.Fatal() }

Sorry for commenting on an old & closed issue. I'll probably create a new issue with this question if I don't get a response.

— Reply to this email directly, view it on GitHub https://urldefense.com/v3/__https://github.com/segmentio/parquet-go/issues/392*issuecomment-1654828225__;Iw!!NCc8flgU!bC7w_83ujA_FqKNXRUfOBZEfU-2EVNgl57hJtZHuFBBn0el5ajHvLt_C0mjx_19iCeynz8za5ETTVkltB0KfvovIjlk$, or unsubscribe https://urldefense.com/v3/__https://github.com/notifications/unsubscribe-auth/AW5SL4UCIMO3YQAHILSLGEDXSMGJJANCNFSM6AAAAAARQJE4LQ__;!!NCc8flgU!bC7w_83ujA_FqKNXRUfOBZEfU-2EVNgl57hJtZHuFBBn0el5ajHvLt_C0mjx_19iCeynz8za5ETTVkltB0KfMh72wmY$ . You are receiving this because you are subscribed to this thread.Message ID: @.***>