netifi-proteus / proteus-java

Proteus Java Client
https://www.netifi.com
Apache License 2.0
42 stars 3 forks source link

Client auto reconnect after connection failure is damaged in 1.5.3 #57

Open VictorVisoki opened 5 years ago

VictorVisoki commented 5 years ago

With Proteus client of version 1.5.3 the client won't reconnect to the Proteus server after connection failure, for example when the server was restarted.

It does work well for clients from version 1.5.2, even more: In a machine with multiple clients, both 1.5.2 and 1.5.3, only the clients of 1.5.2 had survived Proteus server restart (restarted more than 2 times to verify) .

In addition, the ability to use Proteus in integration tests (bringing up a test container with Proteus server) was also damaged. Again, when going back to 1.5.2 it works fine.

The ability to reconnect to the server is critical for applications in production, making it impossible to use the new 1.5.3 version.

robertroeser commented 5 years ago

Hi @VictorVisoki

Can you try with 1.5.5? https://github.com/netifi-proteus/proteus-java/releases/tag/1.5.5

Thanks, Robert

VictorVisoki commented 5 years ago

Hey Robert, the entire application is using spring to work with Proteus, as far as I've seen there is no 1.5.5 version for spring-proteus yet. It would be difficult for me to try it in production without the spring support. I maybe can try the integration test

robertroeser commented 5 years ago

You can use proteus-java 1.5.5 and proteus-spring-1.5.3 - but yes - it looks like proteus-spring still transitively pulls in 1.5.3

I will get this fixed.

VictorVisoki commented 5 years ago

Thanks, update: tried to run the integration test with 1.5.5 - it doesn't work. About the bigger issue, the auto reconnect after failure - I won't be able to test that without spring.

robertroeser commented 5 years ago

can you share what your integration test does so I can take a look?

robertroeser commented 5 years ago

also - are you running the 1.5.3 broker?

VictorVisoki commented 5 years ago

yes, the broker is 1.5.3

robertroeser commented 5 years ago

Hi @VictorVisoki ,

I forked the proteus spring example, and created a branch: https://github.com/robertroeser/proteus-spring/tree/retry

I changed the client to continually retry to send messages. If the broker is down it reconnects. I restarted the broker server times, and the connections on the client and the server both automatically reconnect, and messages start flowing again. This was with 1.5.3.

I'm going to need some more context to track down the issue you're having.

VictorVisoki commented 5 years ago

I have a k8s cluster on which the Proteus broker is one of the pods. The ip of the borker is dynamic and can be different after each restart. (it's not localhost like in your example) I use a hostname of course

When I run this setup with proteus-spring 1.5.3, and do couple of restarts to the Proteus pod, the connection is not restored. The clients keep throwing this:

2019-03-19 12:35:51.132 INFO [ parallel-1] .p.DefaultProteusBrokerService selecting socket WeightedClientTransportSupplier{', supplierId=2, errorPercentage=Ewma(value=6.633688616271088E-72, age=1969229), latency=Ewma(value=0.0, age=1083497048), socketAddress=127.0.0.1:8001, activeConnections=0, selectedCounter=1} with weight 6.633688616271088E-72 2019-03-19 12:35:51.148 ERROR [actor-tcp-nio-3] .r.WeightedReconnectingRSocket error trying to broker io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /127.0.0.1:8001 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[?:1.8.0_181] at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) ~[?:1.8.0_181] at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:327) ~[netty-transport-4.1.33.Final.jar:4.1.33.Final] at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340) ~[netty-transport-4.1.33.Final.jar:4.1.33.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:665) ~[netty-transport-4.1.33.Final.jar:4.1.33.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:612) ~[netty-transport-4.1.33.Final.jar:4.1.33.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:529) ~[netty-transport-4.1.33.Final.jar:4.1.33.Final] at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:491) [netty-transport-4.1.33.Final.jar:4.1.33.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905) [netty-common-4.1.33.Final.jar:4.1.33.Final] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181] Caused by: java.net.ConnectException: Connection refused ... 10 more

When I switch back to 1.5.2 it just works fine.

VictorVisoki commented 5 years ago

About the integration test, here's what I'm doing (in Kotlin):

@Testcontainers class ProteusExampleTest {

companion object {

    @Container
    @JvmField
    val proteusBroker = TestProteus()

}

private class TestHelloService(private val replayMessage: String) : HelloService {

    lateinit var receivedMessage : String

    override fun sayHello(message: HelloRequest, metadata: ByteBuf): Mono<HelloResponse> {
        receivedMessage = message.name
        return Mono.just(HelloResponse.newBuilder().setMessage(replayMessage).build())
    }

}

@Test
fun testConnection() {
    val serversGroupName = "testservices"
    val messageFromService = "I am Proteus server"
    val messageFromClient = "I am Proteus Client"

    // Server
    val testHelloService = TestHelloService(messageFromService)
    getProteus(serversGroupName).addService(
        HelloServiceServer(
            testHelloService,
            Optional.empty(),
            Optional.empty()
        )
    )

    // Client
    val connection = getProteus("testClientsGroup").group(serversGroupName)
    val helloServiceClient = HelloServiceClient(connection)

    val replayFromServer = helloServiceClient.sayHello(HelloRequest.newBuilder().setName(messageFromClient).build()).block()!!.message

    assertEquals(messageFromService, replayFromServer)
    assertEquals(messageFromClient, testHelloService.receivedMessage)
}

private fun getProteus(groupName: String) =
    Proteus.builder()
        .host(proteusBroker.serverHost)
        .port(proteusBroker.tcpPort)
        .accessKey(proteusBroker.accessKey)
        .accessToken(proteusBroker.accessToken)
        .group(groupName)
        .poolSize(2)
        .build()

}

When the client version is 1.5.3, it simply hangs... Again, with 1.5.2 it works fine. TestProteus is just a test container that btings up proteus borker 1.5.3

robertroeser commented 5 years ago

Hi @VictorVisoki ,

Thanks for writing back. This makes sense now. There was bug in 1.5.2 that was fixed that would have allowed this to work. At a high level because of the way docker works the Proteus broker needs a private address it binds to and a public address that it advertises clients to connect on.
For instance the broker in a docker container would bind to 127.0.0.1:8001, but tell clients to connect to it on 10.0.0.1:8001. This is necessary for cluster to work properly. We have more documentation and helm chart that will be coming out in our next release.

In the meantime I have created an example application that runs on Kubernetes.

Here is a link to the application: https://github.com/robertroeser/example-app/

Here is the YAML I used to configure the brokers. It automatically sets the broker’s public address so that other brokers and clients can find it: https://github.com/robertroeser/example-app/blob/master/scripts/k8s/acme/broker.yaml

Additionally please take a look at a service: https://github.com/robertroeser/example-app/blob/master/scripts/k8s/acme/flight.yaml

This will allow the service to connect to a broker and then start to get a stream of the other brokers.

I believe this information should help get the application working correctly. Let me know if you have other questions.

Thanks, Robert