grpc / grpc-dart

The Dart language implementation of gRPC.
https://pub.dev/packages/grpc
Apache License 2.0
857 stars 271 forks source link

server.shutdown never finishes when streaming data #276

Open North101 opened 4 years ago

North101 commented 4 years ago

When the server is streaming data to a client and server.shutdown() is called, it'll hang as the connection never finishes.

  grpc:
    dependency: "direct main"
    description:
      name: grpc
      url: "https://pub.dartlang.org"
    source: hosted
    version: "2.1.3"

Repro steps

replace the helloworld.proto, server.dart and client.dart files with the ones bellow and re-run

protoc --dart_out=grpc:lib/src/generated -Iprotos protos/helloworld.proto

run dart bin/sever.dart and then quickly run dart bin/client.dart (as the server will attempt to close after 5 seconds).

The server never fully shuts down as the connection.finish() call never finishes and both the server and client will hang until the server process is closed.

If you change https://github.com/grpc/grpc-dart/blob/master/lib/src/server/server.dart#L147 to

    final done = _connections.map((connection) => connection.terminate()).toList();

then it works as I'd expect, though this is probably not the proper fix for this.

Details

helloworld.proto

syntax = "proto3";

option java_multiple_files = true;
option java_package = "io.grpc.examples.helloworld";
option java_outer_classname = "HelloWorldProto";
option objc_class_prefix = "HLW";

package helloworld;

// The greeting service definition.
service Greeter {
  // Sends a greeting
  rpc SayHello (HelloRequest) returns (stream HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
  string name = 1;
}

// The response message containing the greetings
message HelloReply {
  string message = 1;
}

server.dart

import 'dart:async';

import 'package:grpc/grpc.dart';

import 'package:helloworld/src/generated/helloworld.pb.dart';
import 'package:helloworld/src/generated/helloworld.pbgrpc.dart';

class GreeterService extends GreeterServiceBase {
  @override
  Stream<HelloReply> sayHello(ServiceCall call, HelloRequest request) {
    return Stream.periodic(Duration(seconds: 1), (value) {
      return HelloReply()..message = 'Hello, ${request.name}!';
    });
  }
}

Future<void> main(List<String> args) async {
  final server = Server([GreeterService()]);
  await server.serve(port: 50051);
  print('Server listening on port ${server.port}...');
  await Future.delayed(Duration(seconds: 5));
  print('shutting down');
  await server.shutdown();
  print('shutdown');
}

client.dart

import 'dart:async';

import 'package:grpc/grpc.dart';

import 'package:helloworld/src/generated/helloworld.pb.dart';
import 'package:helloworld/src/generated/helloworld.pbgrpc.dart';

Future<void> main(List<String> args) async {
  final channel = ClientChannel(
    'localhost',
    port: 50051,
    options: const ChannelOptions(credentials: ChannelCredentials.insecure()),
  );
  final stub = GreeterClient(channel);

  final name = args.isNotEmpty ? args[0] : 'world';

  try {
    final response = await stub.sayHello(HelloRequest()..name = name);
    response.listen((value) {
      print('Greeter client received: ${value.message}');
    }, onError: (error) {
      print(error);
      channel.shutdown();
    });
  } catch (e) {
    print('Caught error: $e');
  }
}
MatthewLM commented 1 week ago

This is still a bug. shutdown() does not force streams to close and waits for them to finish. The only workaround is to ensure the streams are closed before calling shutdown(). However I find that call.sendTrailers() must be manually called at the end of the stream generator because it is not automatically called at the end of the stream unlike how it is documented.

MatthewLM commented 1 week ago

The workaround I have at the moment looks like this in the service RPC implementation:

    // sendTrailers is not always called automatically when the stream ends
    // despite the documentation.
    // Without calling this, the grpc stream will hang and never close.
    final controller = StreamController<Event>(
      onCancel: () => call.sendTrailers(),
    );
    // When upstream stream is done, cancel this one
    controller.addStream(stream).then((_) => controller.close());
    return controller.stream;