rsocket / rsocket-java

Java implementation of RSocket
http://rsocket.io
Apache License 2.0
2.35k stars 354 forks source link

Tcp Example Server and Client in Seperate files doesn't work #1081

Closed burakakca closed 1 year ago

burakakca commented 1 year ago

Server

package io.rsocket.examples.transport.tcp.client;

//import  ....

public class RSocketClientExample {
  static final Logger logger = LoggerFactory.getLogger(RSocketClientExample.class);

  public static void main(String[] args) {

    RSocketServer.create(
            SocketAcceptor.forRequestResponse(
                p -> {
                  String data = p.getDataUtf8();
                  logger.info("Received request data {}", data);

                  Payload responsePayload = DefaultPayload.create("Echo: " + data);
                  p.release();

                  return Mono.just(responsePayload);
                }))
        .bind(TcpServerTransport.create("localhost", 7000))
        .delaySubscription(Duration.ofSeconds(15))
        .doOnNext(cc -> logger.info("Server started on the address : {}", cc.address()))
        .block();
  }
}
package io.rsocket.examples.transport.tcp.client;

//import ....

public class RSocketClientExample2 {
  static final Logger logger = LoggerFactory.getLogger(RSocketClientExample2.class);

  public static void main(String[] args) {

    Mono<RSocket> source =
        RSocketConnector.create()
            .reconnect(Retry.backoff(50, Duration.ofMillis(500)))
            .connect(TcpClientTransport.create("localhost", 7000));

    RSocketClient.from(source)
        .requestResponse(Mono.just(DefaultPayload.create("Test Request")))
        .doOnSubscribe(s -> logger.info("Executing Request"))
        .doOnNext(
            d -> {
              logger.info("Received response data {}", d.getDataUtf8());
              d.release();
            })
        .repeat(10)
        .blockLast();
  }
}

Response

Task :rsocket-examples:RSocketClientExample2.main()
01 Feb 2023 14:04:58  INFO [main] i.r.e.t.t.c.RSocketClientExample2 - Executing Request

All in one file code;

package io.rsocket.examples.transport.tcp.client;

// import ...

public class RSocketClientExample {
    static final Logger logger = LoggerFactory.getLogger(RSocketClientExample.class);

    public static void main(String[] args) {

        RSocketServer.create(
                        SocketAcceptor.forRequestResponse(
                                p -> {
                                    String data = p.getDataUtf8();
                                    logger.info("Received request data {}", data);

                                    Payload responsePayload = DefaultPayload.create("Echo: " + data);
                                    p.release();

                                    return Mono.just(responsePayload);
                                }))
                .bind(TcpServerTransport.create("localhost", 7000))
                .delaySubscription(Duration.ofSeconds(5))
                .doOnNext(cc -> logger.info("Server started on the address : {}", cc.address()))
                .block();

        Mono<RSocket> source =
                RSocketConnector.create()
                        .reconnect(Retry.backoff(50, Duration.ofMillis(500)))
                        .connect(TcpClientTransport.create("localhost", 7000));

        RSocketClient.from(source)
                .requestResponse(Mono.just(DefaultPayload.create("Test Request")))
                .doOnSubscribe(s -> logger.info("Executing Request"))
                .doOnNext(
                        d -> {
                            logger.info("Received response data {}", d.getDataUtf8());
                            d.release();
                        })
                .repeat(10)
                .blockLast();
    }
}

Expected Response

 Task :rsocket-examples:RSocketClientExample.main()
01 Feb 2023 14:06:57  INFO [reactor-tcp-epoll-1] i.r.e.t.t.c.RSocketClientExample - Server started on the address : /127.0.0.1:7000
01 Feb 2023 14:06:57  INFO [main] i.r.e.t.t.c.RSocketClientExample - Executing Request
01 Feb 2023 14:06:57  INFO [reactor-tcp-epoll-4] i.r.e.t.t.c.RSocketClientExample - Received request data Test Request
01 Feb 2023 14:06:57  INFO [reactor-tcp-epoll-3] i.r.e.t.t.c.RSocketClientExample - Received response data Echo: Test Request
01 Feb 2023 14:06:57  INFO [reactor-tcp-epoll-3] i.r.e.t.t.c.RSocketClientExample - Executing Request
01 Feb 2023 14:06:57  INFO [reactor-tcp-epoll-4] i.r.e.t.t.c.RSocketClientExample - Received request data Test Request
01 Feb 2023 14:06:57  INFO [reactor-tcp-epoll-3] i.r.e.t.t.c.RSocketClientExample - Received response data Echo: Test Request
01 Feb 2023 14:06:57  INFO [reactor-tcp-epoll-3] i.r.e.t.t.c.RSocketClientExample - Executing Request
01 Feb 2023 14:06:57  INFO [reactor-tcp-epoll-4] i.r.e.t.t.c.RSocketClientExample - Received request data Test Request
01 Feb 2023 14:06:57  INFO [reactor-tcp-epoll-3] i.r.e.t.t.c.RSocketClientExample - Received response data Echo: Test Request
01 Feb 2023 14:06:57  INFO [reactor-tcp-epoll-3] i.r.e.t.t.c.RSocketClientExample - Executing Request
01 Feb 2023 14:06:57  INFO [reactor-tcp-epoll-4] i.r.e.t.t.c.RSocketClientExample - Received request data Test Request
01 Feb 2023 14:06:57  INFO [reactor-tcp-epoll-3] i.r.e.t.t.c.RSocketClientExample - Received response data Echo: Test Request
01 Feb 2023 14:06:57  INFO [reactor-tcp-epoll-3] i.r.e.t.t.c.RSocketClientExample - Executing Request
01 Feb 2023 14:06:57  INFO [reactor-tcp-epoll-4] i.r.e.t.t.c.RSocketClientExample - Received request data Test Request
01 Feb 2023 14:06:57  INFO [reactor-tcp-epoll-3] i.r.e.t.t.c.RSocketClientExample - Received response data Echo: Test Request
01 Feb 2023 14:06:57  INFO [reactor-tcp-epoll-3] i.r.e.t.t.c.RSocketClientExample - Executing Request
01 Feb 2023 14:06:57  INFO [reactor-tcp-epoll-4] i.r.e.t.t.c.RSocketClientExample - Received request data Test Request
01 Feb 2023 14:06:57  INFO [reactor-tcp-epoll-3] i.r.e.t.t.c.RSocketClientExample - Received response data Echo: Test Request
01 Feb 2023 14:06:57  INFO [reactor-tcp-epoll-3] i.r.e.t.t.c.RSocketClientExample - Executing Request
01 Feb 2023 14:06:57  INFO [reactor-tcp-epoll-4] i.r.e.t.t.c.RSocketClientExample - Received request data Test Request
01 Feb 2023 14:06:57  INFO [reactor-tcp-epoll-3] i.r.e.t.t.c.RSocketClientExample - Received response data Echo: Test Request
01 Feb 2023 14:06:57  INFO [reactor-tcp-epoll-3] i.r.e.t.t.c.RSocketClientExample - Executing Request
01 Feb 2023 14:06:57  INFO [reactor-tcp-epoll-4] i.r.e.t.t.c.RSocketClientExample - Received request data Test Request
01 Feb 2023 14:06:57  INFO [reactor-tcp-epoll-3] i.r.e.t.t.c.RSocketClientExample - Received response data Echo: Test Request
01 Feb 2023 14:06:57  INFO [reactor-tcp-epoll-3] i.r.e.t.t.c.RSocketClientExample - Executing Request
01 Feb 2023 14:06:57  INFO [reactor-tcp-epoll-4] i.r.e.t.t.c.RSocketClientExample - Received request data Test Request
01 Feb 2023 14:06:57  INFO [reactor-tcp-epoll-3] i.r.e.t.t.c.RSocketClientExample - Received response data Echo: Test Request
01 Feb 2023 14:06:57  INFO [reactor-tcp-epoll-3] i.r.e.t.t.c.RSocketClientExample - Executing Request
01 Feb 2023 14:06:57  INFO [reactor-tcp-epoll-4] i.r.e.t.t.c.RSocketClientExample - Received request data Test Request
01 Feb 2023 14:06:57  INFO [reactor-tcp-epoll-3] i.r.e.t.t.c.RSocketClientExample - Received response data Echo: Test Request
01 Feb 2023 14:06:57  INFO [reactor-tcp-epoll-3] i.r.e.t.t.c.RSocketClientExample - Executing Request
01 Feb 2023 14:06:57  INFO [reactor-tcp-epoll-4] i.r.e.t.t.c.RSocketClientExample - Received request data Test Request
01 Feb 2023 14:06:57  INFO [reactor-tcp-epoll-3] i.r.e.t.t.c.RSocketClientExample - Received response data Echo: Test Request
VictoryWangCN commented 1 year ago

Server port 9090, client connect 7000?

burakakca commented 1 year ago

@VictoryWangCN that's my mistake but the problem is not that

VictoryWangCN commented 1 year ago

Try modify RSocketClientExample.java

 CloseableChannel channel = RSocketServer.create(
                SocketAcceptor.forRequestResponse(p -> {
                    String data = p.getDataUtf8();
                    logger.info("Received request data {}", data);

                    Payload responsePayload = DefaultPayload.create("Echo: " + data);
                    p.release();

                    return Mono.just(responsePayload);
                }))
                .bind(TcpServerTransport.create("localhost", 7000))
                .doOnNext(cc -> logger.info("Server started on the address : {}", cc.address()))
                .block();

channel.onClose().block();
burakakca commented 1 year ago

Yes, It's working now. Thanks.

I have another question, maybe you can help me with that. How to send the first request from the server to the client. In all examples doing the first request from the client

VictoryWangCN commented 1 year ago

You can use forRequestStream or forRequestChannel to send requests from the server, but the first request still needs to be sent from the client