scalecube / scalecube-cluster

ScaleCube Cluster is a lightweight Java VM implementation of SWIM: Scalable Weakly-consistent Infection-style Process Group Membership Protocol. features cluster membership, failure detection, and gossip protocol library.
http://scalecube.github.io/
Apache License 2.0
263 stars 88 forks source link

Wrong response message when using "requestResponse" method (send message to self). #390

Open smyrgeorge opened 11 months ago

smyrgeorge commented 11 months ago

Hello!

I was just playing around and I think I encountered a bug.

The problem is when we make use of the requestResponse method in order to send messages to our self (acting like a node).

The code correctly waits for the response, although it does not return the right response message.

For details just take a look at the following example (kotlin).

println(res) does not print "Pong!"

fun main(args: Array<String>) {
    val alias = System.getenv("ACTOR_NODE_ID") ?: "node-1"
    val seedPort = System.getenv("ACTOR_NODE_SEED_PORT")?.toInt() ?: 61100

    // Build cluster.
    val cluster: Cluster = ClusterImpl().transport { it.port(seedPort) }
        .config { it.memberAlias(alias) }
        .transportFactory { TcpTransportFactory() }
        .handler {
            object : ClusterMessageHandler {
                override fun onMessage(message: Message) {
                    println("Received message: $message")

                    if (message.correlationId() != null
                        && message.header("x-is-reply") != null
                    ) {

                        val response = Message
                            .builder()
                            .correlationId(message.correlationId())
                            .header("x-is-reply", "t")
                            .data("Pong!")
                            .build()

                        runBlocking { it.send(message.sender(), response).awaitFirstOrNull() }
                    }
                }

                override fun onGossip(gossip: Message) {
                    println("Received message: $gossip")
                }

                override fun onMembershipEvent(event: MembershipEvent) {
                    println("Received membership-event: $event")
                }
            }
        }.startAwait()

    runBlocking {
        while (true) {
            val self: Member = cluster.member()
            val message: Message = Message.builder().correlationId("test_id").data("Ping!").build()
            val res: String = cluster.requestResponse(self, message).awaitSingle().data()

            println(res) // Prints "Ping!" and not "Pong!"

            // Block main thread.
            delay(1_000)
        }
    }
}

The above code can be found here.

Here is the output for the code above:

0    [main] INFO  io.scalecube.cluster.Cluster  - [null][doStart] Starting, config: ClusterConfig[metadata=null, metadataTimeout=3000, metadataCodec=io.scalecube.cluster.metadata.JdkMetadataCodec@7dc222ae, memberId='null', memberAlias='node-1', externalHosts=null, transportConfig=TransportConfig[port=61100, clientSecured=false, connectTimeout=3000, messageCodec=io.scalecube.cluster.transport.api.JdkMessageCodec@aecb35a, maxFrameLength=2097152, transportFactory=io.scalecube.transport.netty.tcp.TcpTransportFactory@5fcd892a, addressMapper=java.util.function.Function$$Lambda$34/0x0000000800c69e20@8b87145], failureDetectorConfig=FailureDetectorConfig[pingInterval=1000, pingTimeout=500, pingReqMembers=3], gossipConfig=GossipConfig[gossipFanout=3, gossipInterval=200, gossipRepeatMult=3, gossipSegmentationThreshold=1000], membershipConfig=MembershipConfig[seedMembers=[], syncInterval=30000, syncTimeout=3000, suspicionMult=5, namespace='default', removedMembersHistorySize=42]]
324  [sc-cluster-io-nio-1] INFO  io.scalecube.cluster.transport.api.Transport  - [start][/[0:0:0:0:0:0:0:0]:61100] Bound cluster transport
403  [sc-cluster-io-nio-1] INFO  io.scalecube.cluster.Cluster  - [default:node-1:7778863775a34011][doStart] Started
default:node-1:7778863775a34011
[default:node-1:7778863775a34011]
Received message: Message[headers={sender=192.168.2.10:61100, cid=test_id}, data=Ping!]
Ping!
Received message: Message[headers={sender=192.168.2.10:61100, cid=test_id}, data=Ping!]
Ping!

Am I missing something?

Thanks a lot!

artem-v commented 11 months ago

@smyrgeorge Hi, thanks for reporting. We will look into this till FRI.

smyrgeorge commented 11 months ago

Just forgot to mention that I'm using the 2.7.0.rc version.

artem-v commented 11 months ago

Honestly I don't remember what's 2.7.0.rc, looks like it was some development of something. I was trying to reproduce your program in java, on latest version, on 2.6.17, and confusion is following - where do you initially send message to the cluster? I see you declared message handler:

override fun onMessage(message: Message) 
...

it's ok, but this handler must be activated upon receive of some message, question is - where do you send this message to cluster with non-null correlationId and header "x-is-reply"?

smyrgeorge commented 11 months ago

I just tried the version 2.6.17 and the problem remains.

I'm sending the message to the node that sent the Ping! request.

artem-v commented 11 months ago

Can you rewrite example in java. It's hard to understand what' going on, for example this line:

            val res: String = cluster.requestResponse(self, message).awaitSingle().data()

How it's going to work? Cluster interface doesn't have method requestResponse.

smyrgeorge commented 11 months ago

Actually that's true. The interface does not contain the method requestResponse (in master branch). But if you checkout the tag 2.6.17 you will find it, have a look here.

I think it's a discontinued feature, so I guess you can also close this issue.

Thanks a lot.