grpc / grpc-node

gRPC for Node.js
https://grpc.io
Apache License 2.0
4.47k stars 647 forks source link

ClientReadableStream: override/extend .destroy(err?) method #1238

Open gavinvangent opened 4 years ago

gavinvangent commented 4 years ago

Is your feature request related to a problem? Please describe.

Yes, and no.

The documentation for the ClientReadableStream object states that it extends Readable, and therefore, I'd expect to be able to invoke base methods, like .destroy(err?).

I've spent too many hours now trying to determine why I'm able to process data from the server until suddenly, no data flows back from the server. Long story short, my process creates 10 ClientReadableStreams and then each stream is eventually destroyed (using .destroy(err?)).. After a short delay, the process then tries to continue processing and creates a new ClientReadableStream so that I always process 10 streams at a time. Eventually, this happens enough times for the data flow to just grind to a halt.

I then noticed the .cancel() function and decided to see what results I get out of that, and sure enough, I had no further issues. So clearly, calling .destroy(err?) doesn't inform the server of a cancelled call and the server then reaches its max cap of streams its prepared to handle on this channel, and then no data flows between the parties.

Describe the solution you'd like

I'd like to see the .destroy(err?) method implemented to work. I believe the base function can be wrapped with code to ensure the workflow doesn't end abruptly.

The overriding function could match something like:

class ClientReadableStreams extends Readable {
  destroy(err?: Error): void {
    // Call the server with a CANCELLED status
    // invoke the base .destroy(err?) function ... Do not throw the CANCELLED error currently thrown by .cancel()
  }
}

Consider this example:

return new Promise((resolve, reject) => {
  const stream: ClientReadableStream<GetSomeStreamResponse> = await getStreamSomehow();

  const complete = (err?: Error) =>  {
    if (err) {
      return reject(err);
    }

    resolve();
  }

  let dataPacketsHandled = 0;
  stream.on('data', async data => {
    stream.pause();

    try {
      await determineIfDataIsValid(data); // this could throw an InvalidDataError
      await storeData(data); // this could a variety of errors
    } catch (err) {
      return stream.destroy(err)
    }

   if (++dataPacketsHandled >= 50) {
      // time to go handle something else
      return stream.destroy();
   }

    stream.resume()
  });
  stream.on('error', err => complete(err));
  stream.on('close', () => complete());
});

With this promise, I can get exact information on what error occurred (if an error occurred), rather than just being aware that the stream was cancelled.

To me, this is intuitive and conforms to how I would handle a non-grpc stream.

Describe alternatives you've considered

A clear and concise description of any alternative solutions or features you've considered.

Additional context

If this feature is not approved, please add information to the documentation telling us not to use .destroy(err?)

gavinvangent commented 4 years ago

Implementing the same example code using .cancel() would look something like this:

return new Promise((resolve, reject) => {
  const stream: ClientReadableStream<GetSomeStreamResponse> = await getStreamSomehow();

  const complete = (err?: Error) =>  {
    const errorToThrow = err && err.details === 'Cancelled' ? caughtError : err
    if (errorToThrow) {
      return reject(caughtError);
    }

    resolve();
  }

  let caughtError: Error;

  let dataPacketsHandled = 0;
  stream.on('data', async data => {
    stream.pause();

    try {
      await determineIfDataIsValid(data); // this could throw an InvalidDataError
      await storeData(data); // this could a variety of errors
    } catch (err) {
      caughtError = err;
      return stream.cancel();
    }

   if (++dataPacketsHandled >= 50) {
      // time to go handle something else
      return stream.cancel();
   }

    stream.resume()
  });
  stream.on('error', err => complete(err));
  stream.on('close', () => complete());
});
murgatroid99 commented 4 years ago

First I want to say that cancellation is not intended to be used as the normal method for ending streams. For server-streaming requests, the server should usually know when to stop sending messages, possibly based on the message sent by the client at the beginning of the stream. Alternatively a bidirectional stream can be used, with the client sending another message indicating that it is done reading. Cancellation should be used for events that cannot be anticipated, such as the user cancelling the interaction that triggered the request, or another error making the response no longer needed.

In addition, you say that each stream you create is "eventually destroyed" and then recreated "after a short delay". Why destroy the streams at all? The cost of keeping them open should be fairly minimal, especially if the amount of time that they are open is long compared to the amount of time between when they are destroyed and when they are recreated.

With all that being said, I think you're probably right about adding that destroy functionality. The built in http2 module's stream type is fairly similar to one of these streams, and destroying one of those sends a RST_STREAM to the remote end (or at least tries to).

gavinvangent commented 4 years ago

Unfortunately, I'm the client, using the generated code from the Proto files given to me. In those proto's, that stream is declared as a ClientReadableStream ... I suppose I could change it to be a bidirectional stream, etc, but that just causes overhead should new protos be given to us and we have to regenerate the code. So I'd prefer to stay away from that at all costs.

I agree, its counter-intuitive to open a stream, read for a while and then disconnect just to come back to it later. The short story here is we have a system that was intended to extract data from external servers every x seconds. We have a worker pool of X workers trying to handle 1000s of tasks. Each worker fetches a limited amount of data, then offloads that task and picks up the next available task. This way we make sure we handle all tasks within a certain duration. The problem is that this doesn't work well with long-running tasks, like streaming data. We should have a worker fixed onto the stream until something goes wrong, but in order to listen to a stream all day, we would need to have a much bigger worker pool (worker per task), which adds $$$ ... so, we are working on a complimentary solution within the same system, but for now, this is where we are.

With that said, there will still be times when the client will want to destroy/close the stream (see example of when/why in my original issue/feature description)

murgatroid99 commented 4 years ago

I see. I think I'll probably make that change at some point, but for now, cancel should do everything you want to do here.

gnarea commented 3 years ago

I'm hitting this issue after upgrading from grpc to @grpc/grpc-js (1.3.7).

I have a bidi RPC where the client sends files and the server ACKs them, and then the client ends the call when all files have been sent and the server has ACK'ed them all.

Calling call.end()/call.destroy() on the client doesn't actually end the call. I have to use call.cancel() to actually end the call, which isn't ideal or intuitive.

murgatroid99 commented 3 years ago

The expected way that a bidi stream is used is that the client calls call.end() when it has finished writing, and the server does the same when it has finished writing. The client's call to call.end() is the signal to the server that there are no more incoming messages. Then when the server calls call.end(), that is what actually ends the call.

gnarea commented 3 years ago

I'm pretty sure that's what I'm doing: Client calls .end() when it's done, but the server doesn't get the end event in its call. The server only gets the end event when I kill the client's process or the client calls .cancel(). (BTW, calling .close() on the client doesn't end the call either)

I also have a second bidi RPC that mirrors that one (i.e., the server sends files and the client ACKs them), and that works fine with both grpc and @grpc/grpc-js: The server calls .end() and then the client's call ends.

gnarea commented 3 years ago

Unless you mean that the server must know when to close the call without listening on the end event? I guess that'd require changing my protocol to have the client send a special message (unless there's another way to signal this?).

murgatroid99 commented 3 years ago

I did mean that the server should listen for the end event to know when to call call.end() itself. The server does consistently get the end event after the client calls call.end() in my own tests, though it's important to note that the server will only see that event after it has also consumed all incoming messages.

In the commit you made that links to this issue, this part could be the problem:

sink.once('end', () => {
  if (!anyCargoSent) {
    call.end();
  }
});

It looks to me like the client only ends the call if it never sends any messages. Is that the intended behavior?

gnarea commented 3 years ago

Thanks for checking out that code!

The snippet you quoted is only there to ensure that, if the iterable to be streamed by the client turns out to be empty, then the client ends the call immediately (instead of waiting for the server to ACK all messages, since no messages were sent).

When the client does have messages to send, then the bit that should end the call in this bit:

        if (Object.getOwnPropertyNames(pendingAckIds).length === 0) {
          break;
        }

The break causes call.destroy() to be called, because call is used as an iterable. I've verified this is indeed the case by checking call.destroyed afterwards.

Note that when I was debugging this earlier I also tried calling call.end() explicitly, but it didn't make any difference.

murgatroid99 commented 3 years ago

The whole original point of this issue is that call.destroy() doesn't cause network actions. Destroying the call won't end the call, and it might cause issues with the local state.

I'm not entirely following that code, but the loop that the line you referenced is in doesn't write to the call at all, and breaking the loop doesn't go to destroying the call, it just runs the rest of the function.

gnarea commented 3 years ago

I understand that call.destroy() won't actually end the call, now that I found this issue.

Destroying the call won't end the call, and it might cause issues with the local state

Now that is the root cause of the issue I was experiencing: In my debugging yesterday, I was calling call.end() after breaking (and therefore after call.destroy() too), but I've just tried calling it before and it works as expected.

This is definitely a regression from the grpc package, which I'd been using for a while and never had this issue.

Would you be open to a PR that implemented .destroy() as follows?

function destroy(err?: Error): void {
  if (err) {
    this.emit('error', {
      code: grpc.status.UNAVAILABLE
    });
    setImmediate(() => super.destroy(err));
  } else {
    this.end();
    super.destroy();
  }
}

Feedback welcome. I suspect there may be a better way to handle the error case.


breaking the loop doesn't go to destroying the call

break calls call.destroy() because I'm iterating over call, which is a readable stream. According to the Node.js docs: "If the loop terminates with a break, return, or a throw, the stream will be destroyed". I (re-)confirmed this in my debugging yesterday.

murgatroid99 commented 3 years ago

I see, I missed the change in context inside and outside of collectAcknowledgments. Now that I see that source is call, I understand the code better. But the solution still goes back to the code I quoted earlier: you should unconditionally call call.end() when you are done writing to call, which is signaled by the sink.once('end', ...) event handler. call.end is a Writable stream method. It signals that you have finished writing to the stream. There is no value in waiting until some time after that to call it.

Why even have that break statement that you quoted? Why wouldn't you just let the loop finish naturally? If the server sends extra IDs, it will just throw an error in the next iteration anyway.

Regarding the destroy implementation, the original request in this issue was to have destroy call cancel, and that still seems like the right behavior. Of course, it should be implemented by overriding _destroy, not destroy.

gavinvangent commented 3 years ago

Hi Guys,

I've been following the convo, but haven't had anything of substance to add, except:

I'd suggest that cancel invokes destroy/_destroy passing in the CANCELLED error that is currently being thrown by cancel

gnarea commented 3 years ago

I agree that .cancel() should call .destroy(), not the other way around. I don't think there's any reason why .destroy() should only ever be called to abort the request in the event of an error: I think you should be able to call call.destroy() to end the call normally, or call.destroy(error) if something went wrong.

In my case, for example, I have a duplex stream, and I want to call .destroy() to end the readable and writable components. I could call both call.push(null) and call.end() to end the underlying readable and writable streams, but call.destroy() could just do that for me.

Perhaps more importantly (to me at least), implementing .destroy(err?) will make it possible to use gRPC streams as iterables. This works today, except for break not triggering call.destroy() any more.


Why even have that break statement that you quoted? Why wouldn't you just let the loop finish naturally? If the server sends extra IDs, it will just throw an error in the next iteration anyway.

I'm breaking there because I want the client to end the connection when all the files it sent have been acknowledged by the server.

Neither the client or the server know upfront how many files will be sent (new files could be created since the call started, for example), so it's up to the client to end the call when all files have been sent. Without that break, the call would just hang until it times out, because the server wouldn't know when all files have been received.

murgatroid99 commented 3 years ago

I'm breaking there because I want the client to end the connection when all the files it sent have been acknowledged by the server.

I understand wanting to do this, but this is just not how gRPC works. A single call does not control a connection, but rather a stream on an HTTP/2 session. Ending the call in any way, naturally or by cancellation, will not close the connection, it will just end the stream.

If you actually want to close the underlying connection, you can do so by calling client.close(). This will not affect any existing active calls, but it will prevent any new calls from being created with that client.


Neither the client or the server know upfront how many files will be sent (new files could be created since the call started, for example), so it's up to the client to end the call when all files have been sent. Without that break, the call would just hang until it times out, because the server wouldn't know when all files have been received.

Nothing needs to be known upfront. Surely the client knows, at the time that it has sent all of the files, that there are no more files to send. That is the time when you should call call.end(). If you call call.end() on the client as soon as the client knows that it has finished sending, the server will see an end event and it can use that to determine when to call call.end() on its side, and then the iterator will just finish normally without you needing to break.


I'd suggest that cancel invokes destroy/_destroy passing in the CANCELLED error that is currently being thrown by cancel

Why would we do this? Why do we want to call destroy at all?