jamescourtney / FlatSharp

Fast, idiomatic C# implementation of Flatbuffers
Apache License 2.0
497 stars 50 forks source link

Provide option for RPC service interfaces without explicitly building Grpc clients #175

Closed TYoungSL closed 3 years ago

TYoungSL commented 3 years ago

Example service interface;

interface IRpcService {
  System.Threading.Tasks.Task<ReturnType> RpcMethod(ParameterType request);
}

Example explicit interface implementation on Grpc client;

// existing generated Grpc client member
public virtual Grpc.Core.AsyncUnaryCall<ReturnType> RpcMethod(ParameterType request, Grpc.Core.Metadata? headers = null, System.DateTime? deadline = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken))
{
  checked
  {
    return RpcMethod(request, new Grpc.Core.CallOptions(headers, deadline, cancellationToken));
  }
}
// added explicit interface implementation of RPC service interface
System.Threading.Tasks.Task<ReturnType> IRpcService.RpcMethod(ParameterType request)
  => RpcMethod(request).ResponseAsync;

We'd like to have the option of generating these abstractions from fbs rpc_service declarations.

We have IPC/LPC use cases for these types of interfaces, as well as other RPC transports.

jamescourtney commented 3 years ago

For unary calls, I think this is a reasonable ask. However, how do you keep the gRPC-ness out of the interface for streaming interfaces?

A simpler alternative: FlatSharp generates everything as partial, so you can pretty easily add these extensions yourself using some partial classes that you declare.

rpc_service FooService
{
    SingleOperation(Empty) : Empty;
}
public static partial class FooService
{
    public partial class FooServiceClient : ISomeInterface
    {
         Task<Empty> ISomeInterface.SingleOperation(Empty empty) => this.SingleOperation(empty).ResponseAsync;
    }
}
TYoungSL commented 3 years ago

(Scratched previous message.) Just how I'd imagine it, take 2;

In lieu of what I describe below with the mimicking all of the interfaces to remove the Grpc dependency, the simpler option is to just reduce the dependency to a subset of just Grpc.Core.Api (would still need to have an alternative implemented container to avoid using Grpc.Core, could be in FlatSharp.Runtime or generated).

Add interfaces to the runtime to mimic and wrap IClientStreamWriter<TRequest> and IAsyncStreamReader<TRequest>, return them in a disposable container.

public interface IAsyncStreamReaderMimic<out T>
{
    T Current { get; }
    Task<bool> MoveNext(CancellationToken cancellationToken);
}
public interface IClientStreamWriterMimic<in T>
{
    Task WriteAsync(T message);
    Task CompleteAsync();
}
public interface IAsyncStreamingCallMimic<TResponse> : IDisposable
{
  public IAsyncStreamReaderMimic<TResponse> Response {get;}
}
public interface IAsyncDuplexStreamingCallMimic<TRequest,TResponse> : IDisposable
{
  public IClientStreamWriterMimic<TRequest> Request {get;}
  public IAsyncStreamReaderMimic<TResponse> Response {get;}
}
public virtual Grpc.Core.AsyncDuplexStreamingCall<Request, Response> RpcMethod(Grpc.Core.Metadata? headers = null, System.DateTime? deadline = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken))
{
    checked
    {
        return RpcMethod(new Grpc.Core.CallOptions(headers, deadline, cancellationToken));
    }
}
IAsyncDuplexStreamingCallMimic<Request,Response> IRpcService.RpcMethod()
{
  var state = RpcMethod();
  // ... shove into IAsyncDuplexStreamingCallMimic ...
  return mimic;
}
jamescourtney commented 3 years ago

Instead of all of those interfaces, it may make more sense to just use Channel<T> and not define any interfaces:

public interface ISomeRpcService
{
      Task<TResponse> UnaryCall(TRequest request);

      ChannelReader<TResponse> ServerStreaming(TRequest request);

      Task<TResponse> ClientStreaming(ChannelReader<TRequest> request);

      ChannelReader<TResponse> BidirectionalStreaming(ChannelReader<TRequest> request);
}

The downside is that for client streaming, FlatSharp will need to spin up a background task to monitor the channel for inputs. However, that doesn't seem too tricky or evil, especially if the use is optional.

TYoungSL commented 3 years ago

System.Threading.Channels.ChannelReader<T> is .NET Standard 2.1, LGTM.

jamescourtney commented 3 years ago

177

jamescourtney commented 3 years ago

@TYoungSL -- Can you test out the package from this CI build: https://github.com/jamescourtney/FlatSharp/actions/runs/972817071. You can download it from the 'artifacts' list at the bottom.

It should contain the changes that you want. You have to specify the fs_rpcInterface metadata on your definition with an optional interface name.

// if the interface name is omitted, then the name will be IMyService
rpc_service MyService (fs_rpcInterface:"ISomeInterface")
{
   ...
}

The build is completely experimental, but should at least compile. Testing will be the tricky part of this, as there are threading issues, cancellation issues, and lots of error handling to get right. The generated code looks right to me, but lots of things do :)

jamescourtney commented 3 years ago

This is merged in #177. It will be included in the next feature release of FlatSharp (5.5.0) once the other features for that release are done as well. You can grab the linked CI build above or any of the builds from master after the merge to play with it early.

TYoungSL commented 3 years ago

Thanks, I'll be trying it out Monday for sure. 👍

TYoungSL commented 3 years ago

We're using the generated interfaces from the 5.5.0 CI artifact as components to service contracts.

Looks great! We can throw LPC/IPC wrappers around them just fine, and use Grpc when needed.

jamescourtney commented 3 years ago

That's good to hear. Glad it works well for you! If you don't run into any major issues, I'll publish the last build from master as version 5.5 in the next few days.

jamescourtney commented 3 years ago

We're using the generated interfaces from the 5.5.0 CI artifact as components to service contracts.

Looks great! We can throw LPC/IPC wrappers around them just fine, and use Grpc when needed.

Does this mean that you no longer need #180?

TYoungSL commented 3 years ago

We still need a way to Pause/Resume, prioritize or control RPC flow.

Specifically with this guy;

      async Task IPipelineTransformerInterface.SendNodes(
        ChannelReader<TransformerNode> requestChannel,
        ChannelWriter<StirlingData.Status> responseChannel,
        CancellationToken token)

We don't just want to flood it with nodes, we expect to get higher priority requests.

Ceasing buffering or queuing does not solve the problem of controlling actual transfer of already buffered data, unless I'm mistaken and there are no buffers.

There's 2 pump threads (via Task.Run) that are created by FlatSharp that just loop forwarding reads and writes from buffers into the actual RPC facility right? Is the response stream or channel without buffers?

jamescourtney commented 3 years ago

Is the response stream or channel without buffers?

FlatSharp's code is a thin pump around those channels into gRPC's concept of async streams. How you define those channels is entirely up to you. If you're looking to avoid a flood of requests, then I'd suggest using Channel.CreateBounded<T>(size), which will cap the size of the channel. Then you can always insert the highest priority item by using channelWriter.WaitToWriteAsync() to wait until the channel has space.

jamescourtney commented 3 years ago

5.5.0 is now on Nuget.