protobuf-net / protobuf-net.Grpc

GRPC bindings for protobuf-net and grpc-dotnet
Other
857 stars 109 forks source link

[Question] GRPC server - do not deserialize protobuf request #293

Closed jmasek closed 1 year ago

jmasek commented 1 year ago

Hi, is it possible to somehow accept the raw bytes in GRPC service so no deserialization is done?

Context: We are building a high-performance service that accepts data from the OTEL collector and sends the data to another service that processes the protobuf. So there is no reason to deserialize the request in the first service.

Workaround: I tried the following code. It seems that it works but I am not sure if it is a legit solution or if we should approach it differently.

[Service("OpenTelemetry.Proto.Collector.Logs.V1.LogsService")]
public interface IExportLogsService
{
#pragma warning disable PBN2002
    Task<ExportLogsServiceResponse> Export(byte[] request, CallContext callContext);
#pragma warning restore PBN2002
}

Thanks

mgravell commented 1 year ago

The marshaller is pluggable, so: while this isn't provided out of the box, this is definitely something that is possible. We'd still need to copy the data, since the inputs etc here are lifetime bound - I'd guess copying to a lease of some kind, probably exposed as ReadOnlyMemory-byte or ReadOnlySequence-byte (depending on max size)

Happy to work with you to get something working here.

mgravell commented 1 year ago

Understanding how exactly you intend to use it would be key - if you just want to pass the same.value into downstream gRPC without ever processing it: I think that would be trivial.

jmasek commented 1 year ago

Yes, we are basically passing the very same value (binary protobuf) downstream to the internal buffer (the server is running on a local computer) and eventually to the cloud where the protobuf is forwarded to either otel collector or custom ingester that processes the data and push them to other services (loki, thanos, tempo, etc.). using a custom marshaller sounds like a good idea. Can you point me to a method/interface how to plug a custom one?

mgravell commented 1 year ago

Sure; I'll try to do an example after my lunch

mgravell commented 1 year ago

right; so I explored some things ^^^ - but while that works, it probably isn't what you were after; at the moment this allows zero-alloc handling of raw payloads - which is nice enough, but it probably isn't what you actually want, since I'm guessing in your real world, the downstream service is typed? Or is the intent here to use this with the non-typed API?

Basically: I have some ideas, but: let me restart the conversation; imagine that this thing existed, whatever it is; how would you expect to use it? what would the services look like? what would the code to pass it down look like? Is the service declaration the "real one", or can your intermediary use a different API to the actual service?

In particular, what does the inner service take as arguments?

mgravell commented 1 year ago

Thinking aloud, I wonder if what we really need here is some kind of proxy object that could be initialized from either a raw payload or a value, with automatic deserialization as needed; this is a little awkward re lifetimes, so if that's the way we'd need to look at this, it would take some work.

Hence the importance of understanding how you would anticipate working with this in reality.

jmasek commented 1 year ago

Hi Marc, thanks for the quick reply and example. First of all, the mine example is not working as I expected 😆 However, if I look at it, it works intuitively. If I use this signature.

[Service("OpenTelemetry.Proto.Collector.Logs.V1.LogsService")]
public interface IExportLogsService
{
#pragma warning disable PBN2002
    Task<ExportLogsServiceResponse> Export(byte[] request, CallContext callContext);
#pragma warning restore PBN2002
}

I am getting the same content as request.ResourceMetrics with the following code.

[Service("OpenTelemetry.Proto.Collector.Metrics.V1.MetricsService")]
public interface IExportMetricsService
{
    Task<CustomExportMetricsServiceResponse> Export(ExportMetricsServiceRequest request, CallContext callContext);
}

[ProtoContract]
public class ExportMetricsServiceRequest
{
    [ProtoMember(1)]
    public byte[] ResourceMetrics { get; set; } = null!;

}

So the deserialization of the request is done. In my use case, there is no need for this. I would like to take the whole request - without any deserialization. The following services treat it as raw data and only the final processing service in the cloud deserializes the data and tries to put them to loki/mimir/thanos service.

So back to your question. In my use case, really no intermediate object is not needed - only the raw data - byte[], arraySegment, ByteArrayStream, or something similar. Regarding the usage, I have two ideas.

Regarding your example - I am using a standard .Net Host with AddCodeFirstGrpc and AddSingeton and MapGrpcService. Is it possible to use a custom Marshaller with it?

mgravell commented 1 year ago

Yes, and the sample does exactly that (although for simplicity of hosting, it used the Google server API rather than the Microsoft API - but that's just details)

My point is: how is the downstream API defined? For this to work the simple way, you'd need - at least in your intermediate layer - both the server API (where you receive the inbound bytes) and the client API (where you call it) to take the custom type. So: if that is possible: sure, we can do this. However, if your client API is the same actual-type API as the final server is receiving: then it is harder.

jmasek commented 1 year ago

Yes, the intermediate layer uses custom type (to add custom data). To give you an idea. The type looks like this and the usage is following.

public class IngressMessage
{
    public ArraySegment<byte> PayloadArraySegment { get; }
    public Metadata Metadata{ get; }
}
public async Task<CustomExportMetricsServiceResponse> Export(byte[] request, CallContext callContext)
    {
        var message = new IngressMessage(ExtractMetadata(callContext.RequestHeaders), request);
        await _service.IngressDataAsync(message);
        return new CustomExportMetricsServiceResponse();
    }

However, the final processor of the data is using the very same type as is sent to our service. So we do not want to anyhow modify the request and pass it through our system to the processor.

mgravell commented 1 year ago

Right - then the approach in the example should work fine. If I port that example to an aspnet scenario, would that be clearer?

On Tue, 16 May 2023, 08:25 Jan Masek, @.***> wrote:

Yes, the intermediate layer uses custom type (to add custom data). To give you an idea. The type looks like this and the usage is following.

public class IngressMessage { public ArraySegment PayloadArraySegment { get; } public Metadata Metadata{ get; } }

public async Task Export(byte[] request, CallContext callContext) { var message = new IngressMessage(ExtractMetadata(callContext.RequestHeaders), request); await _service.IngressDataAsync(message); return new CustomExportMetricsServiceResponse(); }

However, the final processor of the data is using the very same type as is sent to our service. So we do not want to anyhow modify the request and pass it through our system to the processor.

— Reply to this email directly, view it on GitHub https://github.com/protobuf-net/protobuf-net.Grpc/issues/293#issuecomment-1549139577, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAAEHMC2ZN2ZWRQZH7WF7ZLXGMTWHANCNFSM6AAAAAAX5ZAIPE . You are receiving this because you commented.Message ID: @.***>

jmasek commented 1 year ago

Hi, so I implemented your solution. It works perfectly; however, I discovered that we need to allocate the data to a standard array later in the pipeline. Therefore, I decided to allocate the byte array right in the Marshaller and pass it around without hassle with Owned memory and renting/returning it. So for anyone, my solution looks the following way. Moreover, I removed serialization implementation because it should never be used. Payload class

public class RawPayload
{
    private RawPayload(byte[] payload)
    {
        Payload = payload;
    }
    public static RawPayload Empty { get; } = new(Array.Empty<byte>());

    public byte[] Payload;
    public static Marshaller<RawPayload> Marshaller { get; } = Marshallers.Create(
        Serialize, Deserialize);

    private static void Serialize(RawPayload value, SerializationContext context)
    {
        throw new NotImplementedException("Serialization not implemented");
    }

    private static RawPayload Deserialize(DeserializationContext context)
    {
        var length = context.PayloadLength;
        if (length < 0)
        {
            throw new ArgumentOutOfRangeException(nameof(length));
        }
        if (length == 0)
        {
            return Empty;
        }

        return new RawPayload(context.PayloadAsReadOnlySequence().ToArray());
    }
}

Service

[Service("OpenTelemetry.Proto.Collector.Metrics.V1.MetricsService")]
public interface IExportMetricsService
{
#pragma warning disable PBN2008
    Task<GenericResponse> Export(RawPayload payload, CallContext callContext);
#pragma warning restore PBN2008
}

ASP.Net DI

//otel endpoint
        services.AddCodeFirstGrpc();
        services.AddSingleton<ExportMetricsService>();
        services.AddSingleton<ExportLogsService>();
        services.AddSingleton<ExportTracesService>();
        var bc = BinderConfiguration.Default;
        bc.SetMarshaller(RawPayload.Marshaller);
        services.AddSingleton(bc);

Thank you a lot for big help and such fast replies! Appreciated.