rsocket / rsocket-rpc-java

Standard RSocket RPC Java Implementation
http://rsocket.io/
Apache License 2.0
172 stars 41 forks source link

client never gets notified when server goes away #9

Open apbthere opened 6 years ago

apbthere commented 6 years ago

The client never gets notified when server goes away. It just waits indefinitely. The following unit test creates a simple server that streams 200 messages to the client. Client will call Object.wait() on server socket effectively hanging the server after receiving 2 messages and then wait for 20 more messages or it will time out in 10 seconds. The client doesn't throw any exceptions nor signals onError.


package com.example.demo;

import java.util.Date;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.junit.Test;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import com.poc.protobuf.UnitRequest;
import com.poc.protobuf.UnitResponse;
import com.poc.protobuf.UnitService;
import com.poc.protobuf.UnitServiceClient;
import com.poc.protobuf.UnitServiceServer;

import io.netty.buffer.ByteBuf;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.rpc.rsocket.RequestHandlingRSocket;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.server.NettyContextCloseable;
import io.rsocket.transport.netty.server.TcpServerTransport;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class DefaultSimpleServiceTests {

class DefaultSimpleService implements UnitService {

@Override
public Flux<UnitResponse> requestStream(UnitRequest unitRequest, ByteBuf metadata) {
    String command = unitRequest.getRequestCommandMessage();

    return Flux.range(1, 200)
            .map(i -> UnitResponse.newBuilder()
                    .setMessageNumber(i)
                    .setResponseMessage(i + " Srever is processsing " + command + " command")
                    .build());
}
}

  @Test
  public void test1() throws Exception {
      UnitServiceServer serviceServer = new UnitServiceServer(new DefaultSimpleService(), Optional.empty(), Optional.empty());

        NettyContextCloseable serverSocket = RSocketFactory.receive()
            .acceptor(
                (setup, sendingSocket) ->
                    Mono.just(new RequestHandlingRSocket(serviceServer)))
            .transport(TcpServerTransport.create(8801))
            .start()
            .block();

        RSocket rSocket = RSocketFactory.connect().transport(TcpClientTransport.create(8801)).start().block();

        UnitServiceClient client = new UnitServiceClient(rSocket);

            CountDownLatch latch = new CountDownLatch(22);

            client.requestStream(UnitRequest.newBuilder().setRequestCommandMessage("Give me some data!").build())
                    .subscribe(new Subscriber<UnitResponse>() {

                private Subscription subscription;

                @Override
                public void onSubscribe(Subscription s) {
                    this.subscription = s;
                    s.request(1);
                }

                @Override
                public void onNext(UnitResponse t) {
                    System.out.println("Received message " + t.getResponseMessage());
                    latch.countDown();

                    if (latch.getCount() < 20) {
                        System.out.println("Killing server now...");
                        try {
                            // this will halt the thread causing server to disappear 
                            serverSocket.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }

                    subscription.request(1);
                }

                @Override
                public void onError(Throwable t) {
                    System.out.println("Error detected! " + t);
                }

                @Override
                public void onComplete() {
                    System.out.println("Stream completed!");
                }
            } );

            latch.await(10, TimeUnit.SECONDS);
            System.out.println("Finished at " + new Date().toString());
  }
}

proto file

syntax = "proto3";

package com.harris.atom.poc;

import "google/protobuf/empty.proto";

option java_package = "com.poc.protobuf";
option java_outer_classname = "UnitServiceProto";
option java_multiple_files = true;

service UnitService {

// Single Request / Streaming Response
rpc RequestStream (UnitRequest) returns (stream UnitResponse) {}
}

message UnitRequest {
string requestCommandMessage = 1;
}

message UnitResponse {
string responseMessage = 1;
int32 messageNumber = 2;
}
robertroeser commented 5 years ago

@mostroverkhov does your fix to rsocket help with this?

mostroverkhov commented 5 years ago

@robertroeser It helps, but https://github.com/reactor/reactor-netty/issues/495 has to be resolved also