higherkindness / mu-haskell

Mu (μ) is a purely functional framework for building micro services.
http://higherkindness.io/mu-haskell/
Apache License 2.0
333 stars 19 forks source link

[grpc-client] Client stream is not pushed until the server starts pushing responses in Bidirectional streaming RPC #313

Closed akshaymankar closed 3 years ago

akshaymankar commented 3 years ago

Testing a simple service defined like this:

syntax = "proto3";
package mu.grpc.test;

message GreetingRequest {string name = 1; }

message GreetingResponse {string greeting = 1;}

service GreetingsService {
  rpc bidirectional (stream GreetingRequest) returns (stream GreetingResponse);
}

The bidirectional function in the server is implemented like this:

bidirectional :: forall m. (MonadServer m, MonadIO m) => ConduitT () GreetingRequest m () -> ConduitT GreetingResponse Void m () -> m ()
bidirectional requests responses = do
  C.runConduit $ requests .| serveResponse .| responses
  liftIO $ putStrLn "Done!"
  where
    serveResponse :: ConduitT GreetingRequest GreetingResponse m ()
    serveResponse = do
      liftIO $ putStrLn "Server Waiting"
      mreq <- C.await
      case mreq of
        Nothing -> pure ()
        Just req -> do
          C.yield $ record1 $ "Greetings! " <> view #name req
          serveResponse

The client code looks like this:

    bds :: ConduitT GreetingRequest (GRpcReply GreetingResponse) IO () <- gRpcCall @'MsgProtoBuf @GreetingsService @"GreetingsService" @"bidirectional" client Compressed
    let source = do
          liftIO $ putStrLn "Pushing Ramesh"
          C.yield $ record1 "Ramesh"
          liftIO $ putStrLn "Pushing Suresh"
          C.yield $ record1 "Suresh"
          liftIO $ putStrLn "Client stream complete"
        printer = do
          liftIO $ putStrLn "Printer Waiting"
          mx <- C.await
          case mx of
            Nothing -> pure ()
            Just x -> do
              liftIO $ print x
              printer
    C.runConduit $ source .| bds .| printer

Running these prints this and gets stuck:

Printer waiting
Server waiting

If I change the server so it responds before even consuming a request things move forward. Server looks like this:

bidirectional' :: forall m. (MonadServer m, MonadIO m, MonadUnliftIO m) => ConduitT () GreetingRequest m () -> ConduitT GreetingResponse Void m () -> m ()
bidirectional' requests responses = do
  C.runConduit $ requests .| serveResponse' .| responses
  liftIO $ putStrLn "Done!"
  where
    serveResponse' :: ConduitT GreetingRequest GreetingResponse m ()
    serveResponse' = do
      C.yield $ record1 "Greetings, Stranger!"
      serveResponse

    serveResponse :: ConduitT GreetingRequest GreetingResponse m ()
    serveResponse = do
      liftIO $ putStrLn "Server Waiting"
      mreq <- C.await
      case mreq of
        Nothing -> do
          liftIO $ putStrLn "Got Nothing!"
          pure ()
        Just req -> do
          C.yield $ record1 $ "Greetings! " <> view #name req
          serveResponse

Now, things work but they still get stuck at this:

Printer Waiting
Server Waiting
Pushing Ramesh
Pushing Suresh
Client stream complete
GRpcOk record GreetingResponse { greeting: "Greetings, Stranger!" }
Printer Waiting
Server Waiting
Server Waiting
GRpcOk record GreetingResponse { greeting: "Greetings! Ramesh" }
Printer Waiting
GRpcOk record GreetingResponse { greeting: "Greetings! Suresh" }
Printer Waiting

So, there is another bug with how the closing of the server stream is handled in the client. Maybe it is related. Yet another interesting thing to note is that the server responses are always printed after client finishes streaming (even the response which doesn't depend on the request), this might also cause problems when client starts depending on server responses, but I haven't tested that yet.

akshaymankar commented 3 years ago

Hmm, more I stare at it, more I am convinced that the handler for bidirectional streaming client shouldn't be of type:

CompressMode -> IO (ConduitT v (GRpcReply r) IO ())

instead it should be two separate conduits: one for client stream and another for server stream. Having two separate conduits allows for concurrently writing into the client stream and reading from the server stream.

Another thing missing from current implementation is GRpcReply () which should indicate whether eventually the grpc server responded with GRpcOk or not.

Having this second GRpcReply raises a question of why we have the first one in the conduit and what other information it conveys. It turns out, it is only conveying whether the IncomingEvent from server was parsed successfully or not (It contains SomeException so there could be more failures). Also, the documentation for IncomingEvent says that the loop stops when it receives something invalid, so perhaps it is also fine to raise that exception in IO or simply return it as the return value of the conduit. So, I think the server stream should just be a stream of r and not be wrapped in GRpcReply.

In all, I think the type of the handler should be one of these:

  1. Throw the exception in IO, and return the final GRpcReply () wrapped in a TMVar for whenever server is done.
    CompressMode -> IO (ConduitT v Void IO (), ConduitT() r IO (), TMVar (GRpcReply ()))
  2. Return the exception as a return value of the server conduit
    CompressMode -> IO (ConduitT v Void IO (), ConduitT() r IO (Maybe SomeException), TMVar (GRpcReply ()))

It may also make sense to return Async (GRpcReply ()) instead of TMVar as it will give users ability to kill the thread in case it gets stuck, but maybe rawGeneralStream takes care of that already and I am overthinking this.

I have a implemented option 1 to test things, It is trivial to move to option 2. Please let me know if this kind of breaking change is OK.

serras commented 3 years ago

It makes a lot of sense, in fact for the server we also ended up exposing two conduits. I am fine with the breaking change, since it seems obvious that this functionality was not that much in use.

The only thing I don't fully understand is why the TMVar should be exposed, and we cannot get away with simply detecting when the corresponding conduit is closed.

akshaymankar commented 3 years ago

I see that when rawGeneralStream is called, there can still be errors in the ClientIO monad and the result is wrapped inEither TooMuchConcurrency. There needs to be some way of communicating these errors to the users of the client. I am actually not sure what the best way to do that would be, TMVar seems a bit odd to me too. One option could be to put it in the result of the response conduit, So something like:

CompressMode -> IO (ConduitT v Void IO (), ConduitT() r IO (GRpcReply ()))
akshaymankar commented 3 years ago

In the end I decided that TMVarwas an ugly solution and added the GRpcReply () to result value of the second Conduit.