elixir-grpc / grpc

An Elixir implementation of gRPC
https://hex.pm/packages/grpc
Apache License 2.0
1.39k stars 212 forks source link

Keep the stream opened for a long time communication #217

Open rdiaz82 opened 2 years ago

rdiaz82 commented 2 years ago

Describe the question Hi!! I'm trying to implemente a pub / sub architecture using gRPC. I would like to maintain the stream opened between client / server in order to push in any moment information from the server to the client.

Initially I thought that simply keeping the stream object I could use it to call whenever I wanted the send_reply method, but it seems to not work in that way :(

Are there any possibility to maintain the stream opened between client and server?

Versions:

beligante commented 1 year ago

@rdiaz82

If you wan to do that, there are two things to remember.

  1. On the client side you have to supervise the Channel - this guy is the one that keeps a reference to the connection.

    • If you start your channel inside a gen_server (for example) :gun will send messages to the process telling you that the connection is lost and also when the connection is recovered
    • If you're using the Mint adapter, you have to reconnect manually, because Mint doesn't have a feature to reconnect automatically. But when the connection is lost, your process will receive a message ( {:elixir_grpc, :connection_down, pid} )
  2. On the server-side (if you're using elixir grpc), when you receive a stream request, you must only return that function once your stream is done. Otherwise, the connection is lost. Just a rough example.

  @spec my_server_stream_handler(Enumerable.t(), GRPC.Server.Stream.t()) :: any()
  def my_server_stream_handler(req_enum, stream) do
    server_process_pid = self()
    Task.async(fn -> 
      # do something with the request_enum that blocks this
      # monitor server_process_pid
    end)
    |> Task.await(:infinity)

    :ok
  end

If your server function callback returns, the client's connection will be closed. Another thing to remember is that you must monitor the pid of the process that starts the server callback- in the example above, the server_process_pid - If that process dies, it means that the connection with the client is lost OR the client decided to close

wwaldner-amtelco commented 4 months ago

I would like to piggy-back on this open issue. I ran into a bit of a weird situation that I can't quite figure out. Someone who understands the internal gubins on the gRPC Server impl might be able to provide some insights.

I am doing something similar to what @beligante is describing. All is working great, except I have something weird happening when the client sends another request on the same connection.

def event_processor(req, stream) do
Task.async(fn -> 
             GRPC.Server.send_reply(stream, <some event>)

            # this task is actually waiting and does not end so
           # the wrapping function should not return.
           # The actual implementation is a GenServer that does not send a reply until a time is reached.

    end)
    |> Task.await(:infinity)

    :ok

In the sample code above the server is sending some event to the client, the client in response to the event, sends a new request to the server. The server processes the new request OK, but what happens next is the puzzle. The event_processor() function gets called again. The first call to event_processor() did not complete yet and client did not disconnect nor send a new event_processor request. It seems like when client sends request on same connection that an existing streaming call is happening on, it exhibits a bit of a weird behavior.

beligante commented 4 months ago

I would like to piggy-back on this open issue. I ran into a bit of a weird situation that I can't quite figure out. Someone who understands the internal gubins on the gRPC Server impl might be able to provide some insights.

I am doing something similar to what @beligante is describing. All is working great, except I have something weird happening when the client sends another request on the same connection.

def event_processor(req, stream) do
Task.async(fn -> 
             GRPC.Server.send_reply(stream, <some event>)

            # this task is actually waiting and does not end so
           # the wrapping function should not return.
           # The actual implementation is a GenServer that does not send a reply until a time is reached.

    end)
    |> Task.await(:infinity)

    :ok

In the sample code above the server is sending some event to the client, the client in response to the event, sends a new request to the server. The server processes the new request OK, but what happens next is the puzzle. The event_processor() function gets called again. The first call to event_processor() did not complete yet and client did not disconnect nor send a new event_processor request. It seems like when client sends request on same connection that an existing streaming call is happening on, it exhibits a bit of a weird behavior.

I believe there's a misunderstand here. There are two concepts you need to understand: one is connection and second is a request.

event_processor/2 is invoked for EVERY request and doesn't matter which connection you're using (if it's the same or a different one).

Now, to your problem, what you're describing is a BIDIRECTIONAL STREAM, where inside a SINGLE request, you have BOTH SIDES (client and server) sending data at any moment.

On the repo there is this example where you can see a bidirectional request handling by the server

Let me know if this helped you. It took me some time to fully understand this

wwaldner-amtelco commented 4 months ago

Thanks for your reply.

I understand that there can be multiple connections and that each client request to event_processor will invoke the server request and start a new process. What I am trying to describe is an issue where event_processor is invoked multiple times on the server without the client doing multiple invocations. There appears to be something wrong with how the server implementation is done. It would be interesting to monitor the server process to see if it dies before the gRPC server makes another invocation of the event_processor().

To say all of this in other words, the Elixir gRPC server is invoking the event_processor function multiple times even though the client only makes a single request. This only happens when the client sends a different request while the event_processor() is still processing.

beligante commented 4 months ago

Thanks for your reply.

I understand that there can be multiple connections and that each client request to event_processor will invoke the server request and start a new process. What I am trying to describe is an issue where event_processor is invoked multiple times on the server without the client doing multiple invocations. There appears to be something wrong with how the server implementation is done. It would be interesting to monitor the server process to see if it dies before the gRPC server makes another invocation of the event_processor().

To say all of this in other words, the Elixir gRPC server is invoking the event_processor function multiple times even though the client only makes a single request. This only happens when the client sends a different request while the event_processor() is still processing.

Hnn that's really odd. Cause I've been using gRPC Streams in production for ~2years now and I never notice this behavior.

Are the multiple invocations happening at the same time? Or they take some time? if the second, could it be that your client is timing out expecting a reply from the server and retrying the request (since your server don't reply immediately)