rsocket / rsocket-go

rsocket-go implementation
Apache License 2.0
511 stars 44 forks source link

Sending anything on the client socket, without client having Acceptor defined, closes the socket. #59

Closed Codebreaker101 closed 4 years ago

Codebreaker101 commented 4 years ago

As the title says, if I use sendingSocket to send anything to the client that does not have Acceptor function defined, it causes it to close the client socket without error. Not sure if this is correct behavior.

Steps to Reproduce

package main

import (
    "context"
    "fmt"
    "time"

    "github.com/jjeffcaii/reactor-go/scheduler"
    "github.com/rsocket/rsocket-go"
    "github.com/rsocket/rsocket-go/payload"
    "github.com/rsocket/rsocket-go/rx/mono"
)

const transportString = "tcp://127.0.0.1:7878"

func main() {
    readyCh := make(chan struct{})
    go server(readyCh)
    <-readyCh
    client()
}

var schedulerMap map[byte]scheduler.Scheduler

func server(readyCh chan struct{}) {
    schedulerMap = make(map[byte]scheduler.Scheduler)
    err := rsocket.Receive().
        OnStart(func() {
            close(readyCh)
        }).
        Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
            fmt.Println("new connection")
            sendingSocket.MetadataPush(payload.New(nil, []byte{0x44}))
            // sendingSocket.FireAndForget(payload.New(nil, []byte{0x44}))
            // sendingSocket.RequestResponse(payload.New(nil, []byte{0x44})).Subscribe(context.Background())
            fmt.Println("asynchronous metadata push sent")
            return rsocket.NewAbstractSocket(
                rsocket.RequestResponse(func(msg payload.Payload) mono.Mono {
                    return mono.Just(msg)
                }),
            ), nil
        }).
        Transport(transportString).
        Serve(context.Background())

    panic(err)
}

func client() {

    fmt.Println("new client start")
    client, err := rsocket.
        Connect().
        OnClose(func(err error) {
            fmt.Println("client closed", err)
        }).
        // Uncomment acceptor for the client not to close
        // Acceptor(func(socket rsocket.RSocket) rsocket.RSocket {
        //  return rsocket.NewAbstractSocket()
        // }).
        Transport(transportString).
        Start(context.Background())

    if err != nil {
        panic(err)
    }

    fmt.Println("new client end")

    // wait for a bit so that the client closes, panic ensues
    time.Sleep(time.Second)

    client.
        RequestResponse(payload.New(nil, nil)).
        DoOnSuccess(func(input payload.Payload) {
            fmt.Println("success")
        }).
        DoOnError(func(err error) {
            fmt.Println(err)
        }).
        Block(context.Background())

    client.Close()
}
jjeffcaii commented 4 years ago

Yep, it's not a correct behavior. I'll fix it. Thanks for report! 👍