rsocket / rsocket-go

rsocket-go implementation
Apache License 2.0
511 stars 46 forks source link

Parallel request processing #57

Closed Codebreaker101 closed 4 years ago

Codebreaker101 commented 4 years ago

I've noticed the following in my testing with RequestResponse:

1. Server responds with mono.Just

// server code 
Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
    return rsocket.NewAbstractSocket(rsocket.RequestResponse(func(request payload.Payload) mono.Mono {
                time.Sleep(time.Second)
        return mono.Just(payload.NewString("data", "metadata"))
    })), nil
}).

2. Server responds with mono.Create

// server code 
Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
    return rsocket.NewAbstractSocket(rsocket.RequestResponse(func(request payload.Payload) mono.Mono {
        return mono.Create(func(ctx context.Context, s mono.Sink) {
            time.Sleep(time.Second)
            s.Success(payload.NewString("data", "metadata"))
        })
    })), nil
}).

Is this by the rsocket protocol specification (could not find any reference)? Or was this a design choice or perhaps it was a non-intentional feature? In any case, will this functionality persist in v1?

jjeffcaii commented 4 years ago

Thanks for report! I think that because time.Sleep is outside the return statement in case 1.

Codebreaker101 commented 4 years ago

So, this is correct functionally of the above examples and will not change?

jjeffcaii commented 4 years ago

The handler which registered by rsocket.RequestResponse will work in event-loop. So a long-time handler will block the main loop. I can change something to make it dispatch in parallel, but it maybe cause other problems or reduce performance. I need do some testing before starting it.

The best way is wrapping your codes in Mono just like example 2 because Mono is lazy, don't worry about blocking problem.

In addition, the same problem will happen in Java SDK. Think codes below, it will also work in series.

    RSocketFactory.receive()
        .acceptor(
            (setup, sendingSocket) ->
                Mono.just(
                    new AbstractRSocket() {
                      @Override
                      public Mono<Payload> requestResponse(Payload payload) {
                        try {
                          TimeUnit.SECONDS.sleep(1);
                        } catch (InterruptedException e) {
                          // ignore
                        }
                        log.info("current thread: {}", Thread.currentThread().getName());
                        return Mono.just(DefaultPayload.create("foobar"));
                      }
                    }))
        .transport(TcpServerTransport.create(7878))
        .start()
        .block()
        .onClose()
        .block();

// Requests concurrently, you can see it works in serial from logs.
// 17:22:01.511 [reactor-tcp-nio-2] INFO  com.example.rsdemo.DecTest - current thread: reactor-tcp-nio-2
// 17:22:02.530 [reactor-tcp-nio-2] INFO  com.example.rsdemo.DecTest - current thread: reactor-tcp-nio-2
// 17:22:03.532 [reactor-tcp-nio-2] INFO  com.example.rsdemo.DecTest - current thread: reactor-tcp-nio-2
// 17:22:04.534 [reactor-tcp-nio-2] INFO  com.example.rsdemo.DecTest - current thread: reactor-tcp-nio-2
// 17:22:05.539 [reactor-tcp-nio-2] INFO  com.example.rsdemo.DecTest - current thread: reactor-tcp-nio-2

If you want to work in parallel, you need return like this:

return Mono.delay(Duration.ofSeconds(1)).map(ignore -> payload);
Codebreaker101 commented 4 years ago

I think there are some use cases for per client sequential execution. For example, to force a client to wait for time consuming request to complete before sending another one, unbeknownst to client who fired multiple requests simultaneously.

On that note, will subsequent requests timeout, be dropped or be affected in any way if the first request execution is, for example, infinite?

jjeffcaii commented 4 years ago

Hmm... It depends on your code. I kown it is confusing. šŸ˜­

I mean if you want to sleep and parallel, you should not block the event-loop, so example 2 is the right solution.

Remember that you should never block event-loop, it doesn't like http response handler. Mono is lazy means that nothing really happend before subscribe it.

    rsocket.RequestResponse(func(msg payload.Payload) mono.Mono {
        // ---->
        // EVENT-LOOP: DO NOT block here
        // <----
        return yourMono
    })

    rsocket.RequestResponse(func(msg payload.Payload) mono.Mono {
        // ---->
        // long-time processing, move into mono creating.
        // <----
        return mono.Create(func(ctx context.Context, sink mono.Sink) {
            // try processing here.
            sink.Success(result)
        })
    })
Codebreaker101 commented 4 years ago

The sleep was just to simulate a comprehensive task. Here is one real example. I have a service ping that scans the network for online devices. That scan takes a while, depending on the ip range. Because it is a demanding task, I would like to prevent a a single client from calling my service before the his current request has completed. Since I have a 0-trust policy between services, it must be on the ping service side.

This per client sequential request processing feature would fit nicely to solve this issue.

jjeffcaii commented 4 years ago

Try this ?


func Ping() {
// take a while
}

...

return mono.Create(func(ctx context.Context, sink mono.Sink) {
    YourPing()
    sink.Success(result)
})

Or you can use channel:

    rsocket.RequestResponse(func(msg payload.Payload) mono.Mono {
        chPayload := make(chan payload.Payload)
        chErr := make(chan error)

        go func() {
            // Invoke ping here
            // ...

            // if success
            chPayload <- payload.NewString("ping ok", "")
            // if failed
            // chErr <- errors.New("your error")
        }()

        return mono.CreateFromChannel(chPayload, chErr)
    })

Hope it helps you! šŸ˜ƒ

Codebreaker101 commented 4 years ago

With that all the pings are executed at the same time, which is not a desired outcome.

Check this out. If you run the code bellow, both clients will send 10 requests for ping and all 10 requests will be executed at the same time.

Now, if you uncomment the code bellow executes sequential per client, it will execute 1 request per client at the same time. This is a desired outcome.

package main

import (
    "context"
    "fmt"
    "sync"
    "time"

    "github.com/rsocket/rsocket-go"
    "github.com/rsocket/rsocket-go/payload"
    "github.com/rsocket/rsocket-go/rx"
    "github.com/rsocket/rsocket-go/rx/mono"
)

const transportString = "tcp://127.0.0.1:7878"
const number = 13

func main() {

    readyCh := make(chan struct{})

    // start a server in a go routine
    go server(readyCh)

    // wait for the server to be ready
    <-readyCh

    // call the client

    wg := sync.WaitGroup{}
    // create 2 clients
    for i := 0; i < 2; i++ {
        wg.Add(1)
        go client(i, wg.Done)
    }

    wg.Wait()
}

func server(readyCh chan struct{}) {

    err := rsocket.Receive().
        OnStart(func() {
            close(readyCh)
        }).
        Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
            return rsocket.NewAbstractSocket(rsocket.RequestResponse(func(request payload.Payload) mono.Mono {
                                // executes sequential per client
                // fmt.Printf("Client: %v, Request: %v\n", request.Data()[0], request.Data()[1])
                // ping()
                // return mono.Just(payload.NewString("ping ok", ""))

                                // executes all request in parallel
                return mono.Create(func(ctx context.Context, s mono.Sink) {
                    fmt.Printf("Client: %v, Request: %v\n", request.Data()[0], request.Data()[1])
                    ping()
                    s.Success(payload.NewString("ping ok", ""))
                })
            })), nil
        }).
        Transport(transportString).
        Serve(context.Background())

    panic(err)
}

func client(id int, done func()) {
    // Start a client connection
    client, err := rsocket.Connect().Transport(transportString).Start(context.Background())
    if err != nil {
        panic(err)
    }
    defer client.Close()

    // create a wait group so that the function does not return until the stream completes
    wg := sync.WaitGroup{}

    for i := 0; i < 10; i++ {
        wg.Add(1)
        reqN := i
        f := client.RequestResponse(payload.New([]byte{byte(id), byte(i)}, []byte{}))
        f.DoOnSuccess(func(input payload.Payload) {
            fmt.Printf("Client: %v, Request: %v %v\n", id, reqN, input.DataUTF8())
        }).DoOnError(func(err error) {
            fmt.Println(err)
        }).DoFinally(func(s rx.SignalType) {
            wg.Done()
        }).Subscribe(context.Background())
    }

    wg.Wait()
    done()
}

func ping() {
    // scanning network
    // simulating with Sleep
    time.Sleep(time.Second)
}

Since you don't recommend the use of blocking event loop, what are my other options to achieve the desired outcome? The outcome being, one request per client at the same time, no parallel execution.

jjeffcaii commented 4 years ago

Hmm... Maybe you can implement a queue by yourself. Or try using RequestStream instead.

jjeffcaii commented 4 years ago

Another way is using single scheduler. Just like this draft:

// create a global single scheduler
var mySche = scheduler.NewSingle()

// change you mono
return mono.Create(...).SubscribeOn(mySche)
Codebreaker101 commented 4 years ago

Scheduling does work, but I have to have one scheduler per client. Each client has to send and ID in the setup payload so that the server can identify client instance. That, again, requires the client to be trusted.

return rsocket.NewAbstractSocket(rsocket.RequestResponse(func(request payload.Payload) mono.Mono {

    var mySch scheduler.Scheduler

    if sch, ok := schedulerMap[setup.Data()[0]]; ok {
        mySch = sch
    } else {
        mySch = scheduler.NewSingle(1)
        schedulerMap[setup.Data()[0]] = mySch
    }

    return mono.Create(func(ctx context.Context, s mono.Sink) {
        fmt.Printf("Client: %v, Request: %v\n", request.Data()[0], request.Data()[1])
        ping()
        s.Success(payload.NewString("ping ok", ""))
    }).SubscribeOn(mySch)

})), nil

Is there anything internally that can be exposed to the user that identifies the client? If it does maybe we can expand CloseableRSocket interface to include, for example, an .ID() function to get the unique ID of the server socket?

jjeffcaii commented 4 years ago

No, there's no .ID() method. But it's easy to make a map to store it when a connection setup.

Codebreaker101 commented 4 years ago

Thank you for all of the responses.

To conclude:

}). Transport(transportString). Serve(context.Background())



If you agree with the conclusions, we can close this issue. 
jjeffcaii commented 4 years ago

Yes! That's a good solution. It will accept non-blocking requests in parallel and create a single worker for each connection which consume/process each request one by one. Well, remember close your scheduler when connection closed. (add a close hook) šŸ‘

        mySch := scheduler.NewSingle(1)
        sendingSocket.OnClose(func(_ error) {
            mySch.Close()
        })
        return ...