grpc / grpc-kotlin

Kotlin gRPC implementation. HTTP/2 based RPC
https://grpc.io/docs/languages/kotlin
Apache License 2.0
1.2k stars 162 forks source link

Retry does not work with coroutines #277

Open Cassianokunsch opened 3 years ago

Cassianokunsch commented 3 years ago

Hello, I made a gRPC service, and I'm trying to do my client retry the call when an especific error is throw, but the implementation in the service with courotines does not work, but with Java works very fine.

Server code with coroutines:

class NotificationGrpcService(private val notificationService: NotificationService) :
    NotificationGrpcKt.NotificationCoroutineImplBase() {

    var retryCounter = AtomicInteger(0)

    private val log = LoggerFactory.getLogger(this.javaClass)
    private val UNAVAILABLE_PERCENTAGE = 0.9f
    private val random = Random()

    override suspend fun sendPush(request: SendPushNotificationRequest): SendPushNotificationResponse {
        log.info("Received a call on method sendPushNotification with payload -> $request")

        val count: Int = retryCounter.incrementAndGet()
        if (random.nextFloat() < UNAVAILABLE_PERCENTAGE) {
            log.info("Returning stubbed UNAVAILABLE error. count: $count")
            throw StatusException(Status.UNAVAILABLE.withDescription("Method notification.Notification.SendPush is unavailable"))
        } else {
            log.info("Returning successful Hello response, count: $count")

            return SendPushNotificationResponse.newBuilder().setMessage("success").build()
        }
    }
}

Client

public class RetryingHelloWorldClient {
    static final String ENV_DISABLE_RETRYING = "DISABLE_RETRYING_IN_RETRYING_EXAMPLE";

    private static final Logger logger = Logger.getLogger(RetryingHelloWorldClient.class.getName());

    private final boolean enableRetries;
    private final ManagedChannel channel;
    private final GreeterGrpc.GreeterBlockingStub blockingStub;
    private final NotificationGrpc.NotificationBlockingStub blockingStubNotification;
    private final AtomicInteger totalRpcs = new AtomicInteger();
    private final AtomicInteger failedRpcs = new AtomicInteger();

    protected Map<String, ?> getRetryingServiceConfig() {
        return new Gson()
                .fromJson(
                        new JsonReader(
                                new InputStreamReader(
                                        RetryingHelloWorldClient.class.getResourceAsStream(
                                                "retrying_service_config.json"),
                                        UTF_8)),
                        Map.class);
    }

    /**
     * Construct client connecting to HelloWorld server at {@code host:port}.
     */
    public RetryingHelloWorldClient(String host, int port, boolean enableRetries) {

        ManagedChannelBuilder<?> channelBuilder = ManagedChannelBuilder.forAddress(host, port)
                // Channels are secure by default (via SSL/TLS). For the example we disable TLS to avoid
                // needing certificates.
                .usePlaintext();
        if (enableRetries) {
            Map<String, ?> serviceConfig = getRetryingServiceConfig();
            logger.info("Client started with retrying configuration: " + serviceConfig);
            channelBuilder.defaultServiceConfig(serviceConfig).enableRetry();
        }
        channel = channelBuilder.build();
        blockingStub = GreeterGrpc.newBlockingStub(channel);
        blockingStubNotification = NotificationGrpc.newBlockingStub(channel);
        this.enableRetries = enableRetries;
    }

    public void push() {
        SendPushNotificationRequest request = SendPushNotificationRequest.newBuilder().setCustomerId(UUID.randomUUID().toString()).setMessage("sdasd").setTitle("asdad").build();
        SendPushNotificationResponse response = null;
        StatusRuntimeException statusRuntimeException = null;
        try {
            response = blockingStubNotification.sendPush(request);
        } catch (StatusRuntimeException e) {
            failedRpcs.incrementAndGet();
            statusRuntimeException = e;
        }

        totalRpcs.incrementAndGet();

        if (statusRuntimeException == null) {
            logger.log(Level.INFO, "Greeting: {0}", new Object[]{response.getMessage()});
        } else {
            logger.log(Level.INFO, "RPC failed: {0}", new Object[]{statusRuntimeException.getStatus()});
        }
    }

    public static void main(String[] args) throws Exception {
        boolean enableRetries = !Boolean.parseBoolean(System.getenv(ENV_DISABLE_RETRYING));
      final RetryingHelloWorldClient client = new RetryingHelloWorldClient("localhost", 50051, enableRetries);

      client.push();

    }
}

Retry Police config

{
  "methodConfig": [
    {
      "name": [
        {
          "service": "notification.Notification",
          "method": "SendPush"
        }
      ],

      "retryPolicy": {
        "maxAttempts": 5,
        "initialBackoff": "0.5s",
        "maxBackoff": "30s",
        "backoffMultiplier": 2,
        "retryableStatusCodes": [
          "UNAVAILABLE", "UNIMPLEMENTED"
        ]
      }
    }
  ]
}

Server code without coroutines

class NotificationGrpcService(private val notificationService: NotificationService) :
    NotificationGrpc.NotificationImplBase() {

    var retryCounter = AtomicInteger(0)

    private val log = LoggerFactory.getLogger(this.javaClass)
    private val UNAVAILABLE_PERCENTAGE = 0.9f
    private val random = Random()

    override fun sendPush(
        request: SendPushNotificationRequest?,
        responseObserver: StreamObserver<SendPushNotificationResponse>?
    ) {
        log.info("Received a call on method sendPushNotification with payload -> $request")

        val count: Int = retryCounter.incrementAndGet()
        if (random.nextFloat() < UNAVAILABLE_PERCENTAGE) {
            log.info("Returning stubbed UNAVAILABLE error. count: $count")
            responseObserver!!.onError(Status.UNAVAILABLE.withDescription("Method notification.Notification.SendPush is unavailable").asRuntimeException())
        } else {
            log.info("Returning successful Hello response, count: $count")
            responseObserver!!.onNext(SendPushNotificationResponse.newBuilder().setMessage("success").build())
            return responseObserver.onCompleted()
        }
    }
}
UDarya commented 2 years ago

Hello! Any updates here?

lowasser commented 2 years ago

What exactly doesn't work? Do you get an error? Something else?

sangyongchoi commented 2 years ago

@lowasser I am also having this problem. I put my case and examples #334 Thank you for checking it out.