connectrpc / connect-go

The Go implementation of Connect: Protobuf RPC that works.
https://connectrpc.com
Apache License 2.0
2.89k stars 96 forks source link

Support for dynamic or reflected types for client #523

Closed ewr23523 closed 9 months ago

ewr23523 commented 1 year ago

Disclaimer: It may be possible that connect does support this and my lack of understanding makes me unable to make it work. However I couldn't find anything in GitHub issues or in docs (please note, that there is a similar issue #312, but author decided to go with other approach and there is no answer about dynamic types).

Is your feature request related to a problem? Please describe. How one can perform connect request with dynamic types, like when a request is created using google.golang.org/protobuf/types/dynamicpb package and there is no concrete Go type available (the service, method and type information is inferred using reflection or read from a file containing the file descriptor set)?

As an example we can talk about tools like grpcurl - how to perform a task similar to one done by such a tool, but using connect protocol?

Describe the solution you'd like I imagine something like UntypedClient (in contrast to currently available Client that uses generic) or a special handling of dynamic messages inside of the current implementation.

It feels like it should be possible to call a service using connect starting with just a "google.golang.org/protobuf/reflect/protoreflect".MethodDescriptor and without reimplementing of large parts of connect-go package.

Describe alternatives you've considered I have found something about (and experimented with) creating custom codec, but it feels like it should be possible to just use connect with dynamic message, without special hacks, especially that dynamicpb is part of protobuf in Go.

I have also attached in the following messages results of my experimentation - a workaround if someone needs to achieve this with connect.

Additional context If I'm wrong and there is other issue or it is mentioned in the documentation, then I'm sorry that I have bothered you. If there is already a support for such things in connect, then please provide explanation in this issue, to allow others (like me ;) ) to find it.

ewr23523 commented 1 year ago

For anyone interested workaround with custom codec that uses dynamicpb.Message:

Show code ```go func NewDynamicpbClient(descriptor protoreflect.MethodDescriptor, httpClient connect.HTTPClient, url string, options ...connect.ClientOption) *connect.Client[dynamicpb.Message, dynamicpb.Message] { codec := &dynamicpbCodec{ responseDescriptor: descriptor.Output(), } optionsWithCodec := make([]connect.ClientOption, 0, len(options)+1) optionsWithCodec = append(optionsWithCodec, options...) optionsWithCodec = append(optionsWithCodec, connect.WithCodec(codec)) return connect.NewClient[dynamicpb.Message, dynamicpb.Message](httpClient, url, optionsWithCodec...) } func NewDynamicpbJSONClient(descriptor protoreflect.MethodDescriptor, httpClient connect.HTTPClient, url string, options ...connect.ClientOption) *connect.Client[dynamicpb.Message, dynamicpb.Message] { codec := &dynamicpbJSONCodec{ name: "json", responseDescriptor: descriptor.Output(), } optionsWithCodec := make([]connect.ClientOption, 0, len(options)+1) optionsWithCodec = append(optionsWithCodec, options...) optionsWithCodec = append(optionsWithCodec, connect.WithCodec(codec)) return connect.NewClient[dynamicpb.Message, dynamicpb.Message](httpClient, url, optionsWithCodec...) } type dynamicpbCodec struct { responseDescriptor protoreflect.MessageDescriptor } func (c *dynamicpbCodec) Name() string { return "proto" } func (c *dynamicpbCodec) Marshal(message any) ([]byte, error) { dynamicpbMessage, ok := message.(*dynamicpb.Message) if !ok { return nil, errNotDynamicpbMessage(message) } return proto.Marshal(dynamicpbMessage) } func (c *dynamicpbCodec) MarshalStable(message any) ([]byte, error) { dynamicpbMessage, ok := message.(*dynamicpb.Message) if !ok { return nil, errNotDynamicpbMessage(message) } options := proto.MarshalOptions{Deterministic: true} return options.Marshal(dynamicpbMessage) } func (c *dynamicpbCodec) Unmarshal(data []byte, message any) error { dynamicpbMessage, ok := message.(*dynamicpb.Message) if !ok { return errNotDynamicpbMessage(message) } *dynamicpbMessage = *dynamicpb.NewMessage(c.responseDescriptor) return proto.Unmarshal(data, dynamicpbMessage) } func (c *dynamicpbCodec) IsBinary() bool { return true } type dynamicpbJSONCodec struct { name string responseDescriptor protoreflect.MessageDescriptor } func (c *dynamicpbJSONCodec) Name() string { return c.name } func (c *dynamicpbJSONCodec) Marshal(message any) ([]byte, error) { dynamicpbMessage, ok := message.(*dynamicpb.Message) if !ok { return nil, errNotDynamicpbMessage(message) } var options protojson.MarshalOptions return options.Marshal(dynamicpbMessage) } func (c *dynamicpbJSONCodec) MarshalStable(message any) ([]byte, error) { messageJSON, err := c.Marshal(message) if err != nil { return nil, err } compactedJSON := bytes.NewBuffer(messageJSON[:0]) if err = json.Compact(compactedJSON, messageJSON); err != nil { return nil, err } return compactedJSON.Bytes(), nil } func (c *dynamicpbJSONCodec) Unmarshal(data []byte, message any) error { dynamicpbMessage, ok := message.(*dynamicpb.Message) if !ok { return errNotDynamicpbMessage(message) } if len(data) == 0 { return errors.New("zero-length payload is not a valid JSON object") } *dynamicpbMessage = *dynamicpb.NewMessage(c.responseDescriptor) options := protojson.UnmarshalOptions{DiscardUnknown: true} return options.Unmarshal(data, dynamicpbMessage) } func (c *dynamicpbJSONCodec) IsBinary() bool { return false } func errNotDynamicpbMessage(message any) error { return fmt.Errorf("%T isn't *dynamicpb.Message", message) } ```
ewr23523 commented 1 year ago

And another workaround with custom codec that uses both dynamicpb.Message and protoreflect.MessageType (depending on what is available):

Show code ```go type DynamicProtoMessage struct { Content proto.Message } func protoMessageConstructorFromDescriptor(descriptor protoreflect.MessageDescriptor) func() proto.Message { messageType, err := protoregistry.GlobalTypes.FindMessageByName(descriptor.FullName()) if err != nil { return func() proto.Message { return dynamicpb.NewMessage(descriptor) } } return func() proto.Message { return messageType.New().Interface() } } func NewDynamicProtoClient(descriptor protoreflect.MethodDescriptor, httpClient connect.HTTPClient, url string, options ...connect.ClientOption) *connect.Client[DynamicProtoMessage, DynamicProtoMessage] { codec := &dynamicProtoBinaryCodec{ newResponse: protoMessageConstructorFromDescriptor(descriptor.Output()), } optionsWithCodec := make([]connect.ClientOption, 0, len(options)+1) optionsWithCodec = append(optionsWithCodec, options...) optionsWithCodec = append(optionsWithCodec, connect.WithCodec(codec)) return connect.NewClient[DynamicProtoMessage, DynamicProtoMessage](httpClient, url, optionsWithCodec...) } func NewDynamicProtoJSONClient(descriptor protoreflect.MethodDescriptor, httpClient connect.HTTPClient, url string, options ...connect.ClientOption) *connect.Client[DynamicProtoMessage, DynamicProtoMessage] { codec := &dynamicProtoJSONCodec{ name: "json", newResponse: protoMessageConstructorFromDescriptor(descriptor.Output()), } optionsWithCodec := make([]connect.ClientOption, 0, len(options)+1) optionsWithCodec = append(optionsWithCodec, options...) optionsWithCodec = append(optionsWithCodec, connect.WithCodec(codec)) return connect.NewClient[DynamicProtoMessage, DynamicProtoMessage](httpClient, url, optionsWithCodec...) } type dynamicProtoBinaryCodec struct { newResponse func() proto.Message } func (c *dynamicProtoBinaryCodec) Name() string { return "proto" } func (c *dynamicProtoBinaryCodec) Marshal(message any) ([]byte, error) { dynamicProtoMessage, ok := message.(*DynamicProtoMessage) if !ok { return nil, errNotDynamicProtoMessage(message) } return proto.Marshal(dynamicProtoMessage.Content) } func (c *dynamicProtoBinaryCodec) MarshalStable(message any) ([]byte, error) { dynamicProtoMessage, ok := message.(*DynamicProtoMessage) if !ok { return nil, errNotDynamicProtoMessage(message) } options := proto.MarshalOptions{Deterministic: true} return options.Marshal(dynamicProtoMessage.Content) } func (c *dynamicProtoBinaryCodec) Unmarshal(data []byte, message any) error { dynamicProtoMessage, ok := message.(*DynamicProtoMessage) if !ok { return errNotDynamicProtoMessage(message) } dynamicProtoMessage.Content = c.newResponse() return proto.Unmarshal(data, dynamicProtoMessage.Content) } func (c *dynamicProtoBinaryCodec) IsBinary() bool { return true } type dynamicProtoJSONCodec struct { name string newResponse func() proto.Message } func (c *dynamicProtoJSONCodec) Name() string { return c.name } func (c *dynamicProtoJSONCodec) Marshal(message any) ([]byte, error) { dynamicProtoMessage, ok := message.(*DynamicProtoMessage) if !ok { return nil, errNotDynamicProtoMessage(message) } var options protojson.MarshalOptions return options.Marshal(dynamicProtoMessage.Content) } func (c *dynamicProtoJSONCodec) MarshalStable(message any) ([]byte, error) { messageJSON, err := c.Marshal(message) if err != nil { return nil, err } compactedJSON := bytes.NewBuffer(messageJSON[:0]) if err = json.Compact(compactedJSON, messageJSON); err != nil { return nil, err } return compactedJSON.Bytes(), nil } func (c *dynamicProtoJSONCodec) Unmarshal(data []byte, message any) error { dynamicProtoMessage, ok := message.(*DynamicProtoMessage) if !ok { return errNotDynamicProtoMessage(message) } if len(data) == 0 { return errors.New("zero-length payload is not a valid JSON object") } dynamicProtoMessage.Content = c.newResponse() options := protojson.UnmarshalOptions{DiscardUnknown: true} return options.Unmarshal(data, dynamicProtoMessage.Content) } func (c *dynamicProtoJSONCodec) IsBinary() bool { return false } func errNotDynamicProtoMessage(message any) error { return fmt.Errorf("%T isn't *DynamicProtoMessage", message) } ```
ewr23523 commented 1 year ago

Usage demonstration (uses connect-demo repo):

Show code ```go package main import ( "context" "fmt" "log" "net/http" "github.com/bufbuild/connect-go" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/reflect/protoreflect" "google.golang.org/protobuf/reflect/protoregistry" "google.golang.org/protobuf/types/dynamicpb" _ "github.com/bufbuild/connect-demo/internal/gen/buf/connect/demo/eliza/v1" // register buf.connect.demo.eliza.v1 descriptors ) func main() { ctx := context.Background() baseUrl := "https://demo.connect.build" serviceName := "buf.connect.demo.eliza.v1.ElizaService" methodName := "Say" url := baseUrl + "/" + serviceName + "/" + methodName desc, err := protoregistry.GlobalFiles.FindDescriptorByName(protoreflect.FullName(serviceName + "." + methodName)) if err != nil { log.Fatal(err) } methodDesc, ok := desc.(protoreflect.MethodDescriptor) if !ok { log.Fatal("found descriptor is not a message descriptor") } requestType, err := protoregistry.GlobalTypes.FindMessageByName(methodDesc.Input().FullName()) if err != nil { log.Fatal(err) } dynamicpbClient := NewDynamicpbClient(methodDesc, http.DefaultClient, url) dynamicpbRequest := connect.NewRequest(dynamicpb.NewMessage(methodDesc.Input())) dynamicpbResponse, err := dynamicpbClient.CallUnary(ctx, dynamicpbRequest) if err != nil { log.Fatal(err) } fmt.Printf("\nDynamicpbClient:\n Request: %s\n Response: %s\n", formatpb(dynamicpbRequest.Msg), formatpb(dynamicpbResponse.Msg)) dynamicpbJSONClient := NewDynamicpbJSONClient(methodDesc, http.DefaultClient, url) dynamicpbJSONRequest := connect.NewRequest(dynamicpb.NewMessage(methodDesc.Input())) dynamicpbJSONResponse, err := dynamicpbJSONClient.CallUnary(ctx, dynamicpbJSONRequest) if err != nil { log.Fatal(err) } fmt.Printf("\nDynamicpbJSONClient:\n Request: %s\n Response: %s\n", formatpb(dynamicpbJSONRequest.Msg), formatpb(dynamicpbJSONResponse.Msg)) dynamicProtoClient := NewDynamicProtoClient(methodDesc, http.DefaultClient, url) dynamicProtoRequest := connect.NewRequest(&DynamicProtoMessage{requestType.New().Interface()}) dynamicProtoResponse, err := dynamicProtoClient.CallUnary(ctx, dynamicProtoRequest) if err != nil { log.Fatal(err) } fmt.Printf("\nDynamicProtoClient:\n Request: %s\n Response: %s\n", formatpb(dynamicProtoRequest.Msg.Content), formatpb(dynamicProtoResponse.Msg.Content)) dynamicProtoJSONClient := NewDynamicProtoJSONClient(methodDesc, http.DefaultClient, url) dynamicProtoJSONRequest := connect.NewRequest(&DynamicProtoMessage{requestType.New().Interface()}) dynamicProtoJSONResponse, err := dynamicProtoJSONClient.CallUnary(ctx, dynamicProtoJSONRequest) if err != nil { log.Fatal(err) } fmt.Printf("\nDynamicProtoJSONClient:\n Request: %s\n Response: %s\n", formatpb(dynamicProtoJSONRequest.Msg.Content), formatpb(dynamicProtoJSONResponse.Msg.Content)) } func formatpb(m proto.Message) string { b, err := protojson.Marshal(m) if err != nil { log.Fatal(err) } return string(b) } ```
akshayjshah commented 1 year ago

@jhump is the author of grpcurl, and is definitely the right person to chime in here :)

jhump commented 1 year ago

A slightly simpler way to do this with the existing APIs is to use a more general codec. That's sort of like what we did in buf curl, which provides most of the functions of grpcurl but adds support for using Git repos and Buf Schema Registry modules as the schema source and also adds support for gRPC-Web and Connect protocols. The codec there uses a deferredMessage for received messages. The codec simply stores the bytes in the deferredMessage, and the application can then umarshal those later, after instantiating a *dynamicpb.Message of the right type.

But I agree that it would be ideal to do this without needing custom codecs. As it is, if you want to use a custom codec that provides a different serialization format, it must be wrapped to handle these deferred/dynamic message cases.

I think the thing we are missing is a way to provide a factory or constructor. Right now, the framework assumes it can construct a new instance of a message simply by taking the address of a zero value. But that does not work with dynamic messages. We could add new functions that accept these as args, that supplement the existing NewClient and New*Handler functions.

-func NewClient[Req, Res any](
+func NewClientWithResponseFactory[Req, Res any](
    httpClient HTTPClient,
    url string,
+   factory func() *Res,
    options ...ClientOption
 ) *Client[Req, Res]

-func NewUnaryHandler[Req, Res any](
+func NewUnaryHandlerWithRequestFactory[Req, Res any](
    procedure string,
    unary func(context.Context, *Request[Req]) (*Response[Res], error),
+   factory func() *Req,
    options ...HandlerOption,
 ) *Handler 

// similar changes for new functions for the three kinds of streaming methods

Sadly, we can't use ClientOption or HandlerOption to supply the factories because (1) they are used in the generated code across all methods of a service, but message types in that scenario are usually heterogenous; and (2) there wouldn't be type-safety in that approach, to ensure that the factory's response type matches the actual generic type of the client or handler.

@akshayjshah, what do you think of something like the above? It's extra surface area in the exported API, but the nice property is that it can potentially enable other usages, since it is very general and not bespoke to the dynamicpb package.

akshayjshah commented 1 year ago

😭 Early versions of Connect looked exactly like this, and one of my favorite parts of introducing generics was dropping these function pointers. Wish I'd considered this use case more carefully back then! Let's do what we can now to make this easier for users like @ewr23523 and buf curl.

I can't shake the feeling that we should be able to make this just work in a more ergonomic way. Dynamic messages aren't that exotic (especially in a world where protobuf schema registries are common!), so it feels a little kludgy to lean on user-visible function pointers to make them work properly. I'm willing to do it if there's no better option, but I'd like to explore a more specific solution first.

For unrelated reasons, I was hoping to add func WithSchema(schema any) Option to Connect. I'd imagined it as a small convenience: generated code would use it to pass method descriptors to clients and handlers, which could then expose them to interceptors via Spec. But if clients and handlers had access to a MethodDescriptor, they'd be able to initialize dynamic messages automatically. I sketched out a partial implementation in the ajs/schema branch. With those changes, clients would look like this.

@jhump, what do you think? I don't love that we're introducing protobuf-specific logic outside codecs and error details, but it seems better for users.

jhump commented 1 year ago

I didn't suggest anything like that since I figured something more general and less protobuf-specific would be preferred. While the link you provided looks great, I have a couple of concerns:

  1. What does it do if the type parameter is dynamicpb.Message but no schema has been provided? Where and how does it fail? Since connect.NewClient doesn't return an error, it can't provide immediate feedback but would instead have to defer any errors to actual RPCs which is quite unfortunate to be unable to validate client config while the program is initializing.
  2. What if someone tries to use Connect with something other than protobuf? While protobuf is the expected use case, it is not part of the protocol (only the default codecs and the code generator plugin). An alternate IDL/serialization scheme could have the exact same need: a dynamic type that can't be correctly created by simply taking the address of a zero value. So this solution, since it is protobuf-specific, provides no escape hatch for that.
jhump commented 1 year ago

Another thing I thought of is the case where the user does not know the message type at compile time. For example, if they are using a protoregistry.Types to resolve the request and response types. This gives them only a proto.Message. While concrete implementations will surely always be pointers, the actual pointed-to type is not knowable. That means the resulting message types in the client or handler is *proto.Message, which doesn't play nice with the default codecs. (There's also the extra pointer indirection, but I don't see any way to change that.)

To work-around this, I've done tricks in the codec, similar to handling dynamic types in received messages. But this could be accommodate in the runtime by having the NewClient or New*Handler functions check if the type parameter is an interface type and, if so, handling the extra indirection -- for example, de-referencing the pointer to send a plain proto.Message (not *proto.Message) to the codec for marshalling. This only works for marshalling/sending messages, of course. For unmarshalling/receiving messages, a factory function would have to be provided (that would call MessageType.New(), for example).

williamleven commented 1 year ago

I came across a similar issue with using dynamic types for the server. Inspired by @jhump:s code snippet in this issue I created a small patch to extend the API with support for request factories.

I understand that you have some consideration to do before deciding on how this functionality could be best implemented but I thought I could share what worked for me anyway. Let me know if you would be interested in a contribution. https://github.com/connectrpc/connect-go/compare/main...williamleven:connect-go:feat/dynamic-message-factories

Have verified the above path to work with dynamicpb for Unary requests.

Would love to see a canonical solution to this.