grpc / grpc-go

The Go language implementation of gRPC. HTTP/2 based RPC
https://grpc.io
Apache License 2.0
21.06k stars 4.37k forks source link

New `Compressor`: Support metadata other than `Name` for trained compression #7017

Open coxley opened 8 months ago

coxley commented 8 months ago

According to a comment on #7003, the grpc-go maintainers would like feedback on the API before eventually marking it as stable. So here are some of my current thoughts. :)

Use case(s) - what problem will this feature solve?

The only "metadata" supported for gRPC compressors today is the name. This is sent in the content encoding header between the client and server.

This works for most use-cases, but it breaks down a bit if the compressor needs extra context about the request being compressed. Specifically for dictionary compression when using a format like zstd. Decompression is a bit easier — assuming your dictionary IDs are unique.

For example, we train dictionaries by "category" and "namespace". A "category" would relate to a common type of data (eg: pubsub topic), whereas all of the instances of that data in a given "namespace" relate closely to each other and are what dictionaries are trained on.

The gRPC client knows what "category" and "namespace" it's issuing a request for, but the Compressor only has access to an io.Writer.

If the Compressor had extra context, it could decide which dictionary to compress with. Or alternatively, the decision on which to use could be done before invoking the call and passed explicitly. Unless I'm missing something, neither is possible today.

I can only think of two workarounds at the moment, neither of which feel great:

Proposed Solution

I assume you folks would know the best way to fit this in organically. My current thoughts are:

The latter is more flexible and makes less assumptions. CompressionDictID would only work for us because we are fine with 4 byte globally distinct IDs. But that is likely a deal breaker for other workloads. Setting custom metadata to control how this is resolved would let both the client and the server make the right call.

coxley commented 8 months ago

Here's an example for how I will probably work around this until there's a better way. Feel free to steer me another direction :)

Protobuf

syntax = "proto3";

package zstd;
option go_package = "repro/zstd";

// Compressed is a wrapper for zstd encoded data in lieu of better ways of sharing metadata
message Compressed {
  string category = 1;
  string namespace = 2;
  bytes data = 3;
}

Codec


// codec is a dictionary-aware compression implementation
//
// Given a category and namespace, it knows how to find the correct dictionary
// to compress with. For decompression, it reads the dictionary ID from the
// zstd payload.
//
// This could later be augmented to have decompression IDs category:namespace
// aware vs. assuming they're globally scoped.
type codec struct {
    Category   string
    Namespace string
}

func (c *codec) Marshal(v any) ([]byte, error) {
    return marshal(c.Category, c.Namespace, v)
}

func (c *codec) Unmarshal(data []byte, v any) error {
    return unmarshal(data, v)
}

func (c *codec) Name() string {
    return "zstd-wrapper"
}

// marshal and compress 'v', wrap in [zstd.Compressed], and return the result
// of marshalling that
//
// Skip wrapping if 'v' is already a [zstd.Compressed]
func marshal(category, namespace string, v any) ([]byte, error) {
    switch vv := v.(type) {
    case *zstd.Compressed:
        return proto.Marshal(vv)
    case proto.Message:
        data, err := proto.Marshal(vv)
        if err != nil {
            return nil, err
        }
        container := &zstd.Compressed{
            Category:   category,
            Namespace: namespace,
            // TODO: Imagine there was compression happening before this
            Data: data,
        }

        fmt.Println("using the codec for compression")
        return proto.Marshal(container)
    default:
        return nil, fmt.Errorf("unsure what to do for type: %T", v)
    }
}

// unmarshal data into a [zstd.Compressed], then unmarshal it's data field into 'v'
func unmarshal(data []byte, v any) error {
    switch vv := v.(type) {
    case *zstd.Compressed:
        return proto.Unmarshal(data, vv)
    case proto.Message:
        var container zstd.Compressed
        if err := proto.Unmarshal(data, &container); err != nil {
            return err
        }
        fmt.Println("using the codec for decompression")
        // TODO: Imagine there was decompression happening before this
        return proto.Unmarshal(container.Data, vv)
    default:
        return fmt.Errorf("unsure what to do for type: %T", v)
    }
}

Usage


package main

import (
    "context"
    "fmt"
    "log"
    "net"

    zstd "repro/gen"

    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
    "google.golang.org/grpc/encoding"
    pb "google.golang.org/grpc/examples/helloworld/helloworld"
    "google.golang.org/protobuf/proto"
)

const addr = "localhost:50051"

type server struct {
    pb.UnimplementedGreeterServer
}

func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
    return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil
}

func main() {
    // Register default codec without category/namespace for decompression
    encoding.RegisterCodec(&codec{})

    lis, err := net.Listen("tcp", addr)
    if err != nil {
        log.Fatalf("failed to listen: %v", err)
    }

    s := grpc.NewServer()
    pb.RegisterGreeterServer(s, &server{})

    go func() {
        if err := s.Serve(lis); err != nil {
            log.Fatalf("failed to serve: %v", err)
        }
    }()

    // Now connect from client
    conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
    if err != nil {
        log.Fatalf("did not connect: %v", err)
    }
    defer conn.Close()

    c := pb.NewGreeterClient(conn)
    r, err := c.SayHello(
        context.Background(),
        &pb.HelloRequest{Name: "coxley"},
        // TODO: This would ideally be cached and retrieved via an accessor somewhere
        grpc.ForceCodec(&codec{Category: "test", Namespace: "foo"}),
    )
    if err != nil {
        log.Fatalf("could not greet: %v", err)
    }
    fmt.Printf("Result: %v\n", r)
}
dfawley commented 8 months ago

Another option here would be to pass UseCompressor and SetSendCompressor an optional any argument that gets passed to the compressor as an extra parameter to Compress. This would not be a backward compatible change (it could be if we made it a variadic parameter, but that's a bit ugly from an API documentation POV). With this approach, Decompress can't really be parameterized, so the compressor would need to encode whatever the decompressor needs in its header.

If we wanted to pass the metadata around, then we could pass the metadata to both sides (outgoing metadata to the compressor, and incoming metadata to the decompressor). This then has the unfortunate effect of requiring the metadata to be polluted in order to parameterize the compressor.

Instead of passing metadata, we could pass the context directly. This would enable parameterization via the context instead of the metadata. I'm not sure we want to encourage blocking on this path, though, and passing a context might do that.

ejona86 commented 8 months ago

For reference, shared dictionary compression examples in HTTP: https://chromium.googlesource.com/chromium/src/+/53.0.2744.1/net/sdch/README.md (vcdiff, the first example of this) https://learn.microsoft.com/en-us/deployedge/learnmore-zsdch-compression (zstd) https://chromestatus.com/feature/5124977788977152 (originally just brotli)

Those last two are related, and link to a shared draft rfc

coxley commented 8 months ago

And with interesting timing, this recent one: https://developer.chrome.com/blog/shared-dictionary-compression

dfawley commented 8 months ago

From a quick reading, it looks like none of these approaches requires decompression to have access to anything besides the compressed data -- is that correct? So we don't have any known use case that requires passing the incoming metadata to the decompressor?

coxley commented 7 months ago

@dfawley I think that could be true. Some cases may need to have multiple decompressor "names" for ID space scoping if that's the direction we go, though. (eg: zstd-groupA, zstd-groupB)

At least in zstd, the dictionary ID is limited to 4 bytes. I'm not sure how other companies "scope" their dictionary registries, but it's at least a consideration. Maybe not end of the world.

dfawley commented 7 months ago

My proposed change, then would be:

package grpc

func UseCompressor(name string, compressorOptions ...any) CallOption {}

func SetSendCompressor(ctx context.Context, name string, compressorOptions ...any) error {}
package encoding

type Compressor interface {
    Compress(/*see below*/, ...any) /*see below*/
    ...
}

Essentially this simply adds "...any" to all the places where we set and invoke the compressor.

Regarding the Compressor interface, we are planning some changes to the encoding package to support scatter/gather and memory re-use. Those will be covered in #6619 and I would propose incorporating these changes into the work there.

How does all of that sound?

coxley commented 7 months ago

@dfawley Sorry for the late reply — my Github notifications organization is abysmal.

I think that sounds good as long as it does for you!