goccy / bigquery-emulator

BigQuery emulator server implemented in Go
MIT License
835 stars 109 forks source link

Storage Write API with proto wrapper types causes nil pointer #364

Open alfredgunnar opened 2 weeks ago

alfredgunnar commented 2 weeks ago

What happened?

When using the emulator to test code for batch writing protobuf messages to BigQuery using the Storage Write API, there's a nil pointer error from the emulator if the message contains a wrapper type.

Here's the stack trace from the emulator:

github.com/goccy/bigquery-emulator/types.normalizeData({0x2accf40?, 0xc000a90d90?}, 0x0)
      /work/types/types.go:547 +0xb0
github.com/goccy/bigquery-emulator/types.normalizeData({0x2b67120?, 0xc000ad0300?}, 0xc0004d87e0)
      /work/types/types.go:575 +0x488
github.com/goccy/bigquery-emulator/types.NewTableWithSchema(0xc00048ae00, {0xc000a8d818, 0x1, 0x10?})
      /work/types/types.go:479 +0x479
github.com/goccy/bigquery-emulator/server.(*storageWriteServer).insertTableData(0xc0009781e0, {0x30e0650, 0xc000775f50}, 0xc000648b19?, 0xc0005b5a40, {0xc000a8d818?, 0x4?, 0x76?})
      /work/server/storage_handler.go:662 +0x4e
github.com/goccy/bigquery-emulator/server.(*storageWriteServer).BatchCommitWriteStreams(0xc0009781e0, {0x30e0650, 0xc000775f50}, 0x0?)
      /work/server/storage_handler.go:727 +0x472
cloud.google.com/go/bigquery/storage/apiv1/storagepb._BigQueryWrite_BatchCommitWriteStreams_Handler({0x2d39ba0?, 0xc0009781e0}, {0x30e0650, 0xc000775f50}, 0xc0007f4480, 0x0)
      /go/pkg/mod/cloud.google.com/go/bigquery@v1.53.0/storage/apiv1/storagepb/storage.pb.go:3371 +0x169
google.golang.org/grpc.(*Server).processUnaryRPC(0xc000540000, {0x30e0650, 0xc000775ec0}, {0x30eff60, 0xc0000f5520}, 0xc00011d440, 0xc0009782a0, 0x4b5b248, 0x0)
      /go/pkg/mod/google.golang.org/grpc@v1.59.0/server.go:1343 +0xe03
google.golang.org/grpc.(*Server).handleStream(0xc000540000, {0x30eff60, 0xc0000f5520}, 0xc00011d440)
      /go/pkg/mod/google.golang.org/grpc@v1.59.0/server.go:1737 +0xc4c
google.golang.org/grpc.(*Server).serveStreams.func1.1()
      /go/pkg/mod/google.golang.org/grpc@v1.59.0/server.go:986 +0x86
created by google.golang.org/grpc.(*Server).serveStreams.func1 in goroutine 130
      /go/pkg/mod/google.golang.org/grpc@v1.59.0/server.go:997 +0x145

What did you expect to happen?

I'd expect this to work in the same way as BigQuery where wrapper types are supported.

How can we reproduce it (as minimally and precisely as possible)?

With this proto file:

syntax = "proto3";

package example.v1;

import "gen_bq_schema/bq_table.proto";
import "google/protobuf/wrappers.proto";

// An example message
message Message {
  option (gen_bq_schema.bigquery_opts).table_name = "message";

  // An example string field
  string string_field = 1;

  // An example double wrapper
  google.protobuf.DoubleValue double_wrapper_field = 2;
}
(which generates this go code - expand to see)

```go // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.34.2 // protoc (unknown) // source: example/v1/message.proto package examplev1 import ( _ "github.com/GoogleCloudPlatform/protoc-gen-bq-schema/protos" protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" wrapperspb "google.golang.org/protobuf/types/known/wrapperspb" reflect "reflect" sync "sync" ) const ( // Verify that this generated code is sufficiently up-to-date. _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) // Verify that runtime/protoimpl is sufficiently up-to-date. _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) // An example message type Message struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields // An example string field StringField string `protobuf:"bytes,1,opt,name=string_field,json=stringField,proto3" json:"string_field,omitempty"` // An example double wrapper DoubleWrapperField *wrapperspb.DoubleValue `protobuf:"bytes,2,opt,name=double_wrapper_field,json=doubleWrapperField,proto3" json:"double_wrapper_field,omitempty"` } func (x *Message) Reset() { *x = Message{} if protoimpl.UnsafeEnabled { mi := &file_example_v1_message_proto_msgTypes[0] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } } func (x *Message) String() string { return protoimpl.X.MessageStringOf(x) } func (*Message) ProtoMessage() {} func (x *Message) ProtoReflect() protoreflect.Message { mi := &file_example_v1_message_proto_msgTypes[0] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { ms.StoreMessageInfo(mi) } return ms } return mi.MessageOf(x) } // Deprecated: Use Message.ProtoReflect.Descriptor instead. func (*Message) Descriptor() ([]byte, []int) { return file_example_v1_message_proto_rawDescGZIP(), []int{0} } func (x *Message) GetStringField() string { if x != nil { return x.StringField } return "" } func (x *Message) GetDoubleWrapperField() *wrapperspb.DoubleValue { if x != nil { return x.DoubleWrapperField } return nil } var File_example_v1_message_proto protoreflect.FileDescriptor var file_example_v1_message_proto_rawDesc = []byte{ 0x0a, 0x18, 0x65, 0x78, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x2f, 0x76, 0x31, 0x2f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0a, 0x65, 0x78, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x2e, 0x76, 0x31, 0x1a, 0x1c, 0x67, 0x65, 0x6e, 0x5f, 0x62, 0x71, 0x5f, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x2f, 0x62, 0x71, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x77, 0x72, 0x61, 0x70, 0x70, 0x65, 0x72, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x8a, 0x01, 0x0a, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x73, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x5f, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x73, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x12, 0x4e, 0x0a, 0x14, 0x64, 0x6f, 0x75, 0x62, 0x6c, 0x65, 0x5f, 0x77, 0x72, 0x61, 0x70, 0x70, 0x65, 0x72, 0x5f, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x44, 0x6f, 0x75, 0x62, 0x6c, 0x65, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x12, 0x64, 0x6f, 0x75, 0x62, 0x6c, 0x65, 0x57, 0x72, 0x61, 0x70, 0x70, 0x65, 0x72, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x3a, 0x0c, 0xea, 0x3f, 0x09, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x42, 0xb4, 0x01, 0x0a, 0x0e, 0x63, 0x6f, 0x6d, 0x2e, 0x65, 0x78, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x2e, 0x76, 0x31, 0x42, 0x0c, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x4b, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x65, 0x69, 0x6e, 0x72, 0x69, 0x64, 0x65, 0x2f, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2d, 0x70, 0x6c, 0x61, 0x6e, 0x6e, 0x65, 0x72, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x67, 0x6f, 0x2f, 0x65, 0x78, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x2f, 0x76, 0x31, 0x3b, 0x65, 0x78, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x76, 0x31, 0xa2, 0x02, 0x03, 0x45, 0x58, 0x58, 0xaa, 0x02, 0x0a, 0x45, 0x78, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x2e, 0x56, 0x31, 0xca, 0x02, 0x0a, 0x45, 0x78, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x5c, 0x56, 0x31, 0xe2, 0x02, 0x16, 0x45, 0x78, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x5c, 0x56, 0x31, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x0b, 0x45, 0x78, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x3a, 0x3a, 0x56, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( file_example_v1_message_proto_rawDescOnce sync.Once file_example_v1_message_proto_rawDescData = file_example_v1_message_proto_rawDesc ) func file_example_v1_message_proto_rawDescGZIP() []byte { file_example_v1_message_proto_rawDescOnce.Do(func() { file_example_v1_message_proto_rawDescData = protoimpl.X.CompressGZIP(file_example_v1_message_proto_rawDescData) }) return file_example_v1_message_proto_rawDescData } var file_example_v1_message_proto_msgTypes = make([]protoimpl.MessageInfo, 1) var file_example_v1_message_proto_goTypes = []any{ (*Message)(nil), // 0: example.v1.Message (*wrapperspb.DoubleValue)(nil), // 1: google.protobuf.DoubleValue } var file_example_v1_message_proto_depIdxs = []int32{ 1, // 0: example.v1.Message.double_wrapper_field:type_name -> google.protobuf.DoubleValue 1, // [1:1] is the sub-list for method output_type 1, // [1:1] is the sub-list for method input_type 1, // [1:1] is the sub-list for extension type_name 1, // [1:1] is the sub-list for extension extendee 0, // [0:1] is the sub-list for field type_name } func init() { file_example_v1_message_proto_init() } func file_example_v1_message_proto_init() { if File_example_v1_message_proto != nil { return } if !protoimpl.UnsafeEnabled { file_example_v1_message_proto_msgTypes[0].Exporter = func(v any, i int) any { switch v := v.(*Message); i { case 0: return &v.state case 1: return &v.sizeCache case 2: return &v.unknownFields default: return nil } } } type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_example_v1_message_proto_rawDesc, NumEnums: 0, NumMessages: 1, NumExtensions: 0, NumServices: 0, }, GoTypes: file_example_v1_message_proto_goTypes, DependencyIndexes: file_example_v1_message_proto_depIdxs, MessageInfos: file_example_v1_message_proto_msgTypes, }.Build() File_example_v1_message_proto = out.File file_example_v1_message_proto_rawDesc = nil file_example_v1_message_proto_goTypes = nil file_example_v1_message_proto_depIdxs = nil } ```

and this generated bq schema (using protoc-gen-bq-schema):

[
 {
  "name": "string_field",
  "type": "STRING",
  "mode": "NULLABLE",
  "description": "An example string field"
 },
 {
  "name": "double_wrapper_field",
  "type": "FLOAT",
  "mode": "NULLABLE",
  "description": "An example double wrapper"
 }
]

it can be reproduced with this test:

package test

import (
    "context"
    "fmt"
    "os"
    "reflect"
    "testing"

    "cloud.google.com/go/bigquery"
    "cloud.google.com/go/bigquery/storage/apiv1/storagepb"
    "cloud.google.com/go/bigquery/storage/managedwriter"
    "cloud.google.com/go/bigquery/storage/managedwriter/adapt"
    "github.com/docker/go-connections/nat"
    examplev1 "SOME IMPORT PATH TO GO CODE GENERATED FROM PROTOBUF"
    "github.com/testcontainers/testcontainers-go"
    "github.com/testcontainers/testcontainers-go/wait"
    "google.golang.org/api/option"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
    "google.golang.org/protobuf/proto"

    "google.golang.org/protobuf/types/known/wrapperspb"
    "gotest.tools/v3/assert"
)

func Test(t *testing.T) {
    ctx := context.Background()

    // start emulator image
    projectID := "test"
    ctainer, httpPort, grpcPort, err := StartBigQueryEmulatorContainer(ctx, t, projectID)
    assert.NilError(t, err)
    t.Cleanup(func() {
        assert.NilError(t, ctainer.Terminate(ctx))
    })

    // BigQuery client
    bqClient, err := bigquery.NewClient(
        ctx,
        projectID,
        option.WithEndpoint(fmt.Sprintf("http://localhost:%s", httpPort.Port())),
        option.WithoutAuthentication(),
    )
    assert.NilError(t, err)

    // Storage Write API client
    conn, err := grpc.NewClient(
        fmt.Sprintf("localhost:%s", grpcPort.Port()),
        grpc.WithTransportCredentials(insecure.NewCredentials()),
    )
    assert.NilError(t, err)
    writerClient, err := managedwriter.NewClient(
        ctx,
        projectID,
        option.WithGRPCConn(conn),
    )
    assert.NilError(t, err)

    // setup table
    schemaBytes, err := os.ReadFile("message.schema") // the generated BigQuery schema
    assert.NilError(t, err)
    schema, err := bigquery.SchemaFromJSON(schemaBytes)
    assert.NilError(t, err)
    dataset := bqClient.Dataset("test")
    assert.NilError(t, dataset.Create(ctx, nil))
    table := dataset.Table("test")
    assert.NilError(t, table.Create(ctx, &bigquery.TableMetadata{
        Schema: schema,
    }))

    // setup stream
    msg := &examplev1.Message{
        StringField:        "test",
        DoubleWrapperField: &wrapperspb.DoubleValue{Value: 56.18}, // this is what causes the issue
    }
    emptyMsg := reflect.New(reflect.TypeOf(msg).Elem()).Interface().(*examplev1.Message)
    schemaDescriptor, err := adapt.NormalizeDescriptor(emptyMsg.ProtoReflect().Descriptor())
    assert.NilError(t, err)
    tableIdentifier, err := table.Identifier(bigquery.StorageAPIResourceID)
    assert.NilError(t, err)
    stream, err := writerClient.NewManagedStream(
        ctx,
        managedwriter.WithDestinationTable(tableIdentifier),
        managedwriter.WithSchemaDescriptor(schemaDescriptor),
        managedwriter.WithType(managedwriter.PendingStream),
        managedwriter.EnableWriteRetries(true),
    )
    assert.NilError(t, err)

    // Write to stream
    row, err := proto.Marshal(msg)
    result, err := stream.AppendRows(ctx, [][]byte{row})
    assert.NilError(t, err)
    _, err = result.GetResult(ctx) // this seems to be the call that triggers the issue
    assert.NilError(t, err)

    // Close stream
    _, err = stream.Finalize(ctx)
    assert.NilError(t, err)
    _, err = writerClient.BatchCommitWriteStreams(ctx, &storagepb.BatchCommitWriteStreamsRequest{
        Parent:       tableIdentifier,
        WriteStreams: []string{stream.StreamName()},
    })
    assert.NilError(t, err)
}

type logger struct {
    t *testing.T
}

func (l *logger) Accept(log testcontainers.Log) {
    l.t.Logf("%s: %s", log.LogType, string(log.Content))
}

const (
    bqHttpPort = "9050/tcp"
    bqGrpcPort = "9060/tcp"
)

func StartBigQueryEmulatorContainer(
    ctx context.Context,
    t *testing.T,
    projectID string,
) (testcontainers.Container, nat.Port, nat.Port, error) {
    testLogger := logger{t: t}
    req := testcontainers.ContainerRequest{
        Image:        "ghcr.io/goccy/bigquery-emulator:0.6.0",
        ExposedPorts: []string{bqHttpPort, bqGrpcPort},
        SkipReaper:   true,
        Cmd:          []string{"bigquery-emulator", "--project", projectID, "--log-level", "debug"},
        WaitingFor: wait.ForAll(
            wait.ForListeningPort(bqHttpPort),
            wait.ForListeningPort(bqGrpcPort),
        ),
        LogConsumerCfg: &testcontainers.LogConsumerConfig{
            Consumers: []testcontainers.LogConsumer{&testLogger},
        },
    }
    bqContainer, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
        ContainerRequest: req,
        Started:          true,
    })
    if err != nil {
        return nil, "", "", err
    }

    mappedHttpPort, err := bqContainer.MappedPort(ctx, bqHttpPort)
    if err != nil {
        return nil, "", "", err
    }

    mappedGrpcPort, err := bqContainer.MappedPort(ctx, bqGrpcPort)
    if err != nil {
        return nil, "", "", err
    }

    return bqContainer, mappedHttpPort, mappedGrpcPort, err
}

Anything else we need to know?

It seems to be the call to result.GetResult that triggers the issue. If the DoubleWrapperField is not set, the issue is not seen.

mdelapenya commented 1 week ago

Just in case, there is a BigQuery container in the Google Cloud module: https://golang.testcontainers.org/modules/gcloud/#bigquery

I think it could simplify the setup here