Atmosphere / wasync

WebSockets with fallback transports client library for Node.js, Android and Java
http://async-io.org
161 stars 47 forks source link

[SSL/TLS] Unable to receive messages larger than 8k #109

Open phemphill opened 10 years ago

phemphill commented 10 years ago

I'm unable to receive any messages larger than 8k. Any thing larger than 8k doesn't get raised as a message event until another message less than 8k is received. I think the track message size decoder is then failing on that message because the message received (which is encoded in json) is unreadable. I'll add that if I use an old version of async-http-client (1.8.1), it works as expected: the 8k messages are received in completion, the message event is raised, and they are processed correctly. I only tried older versions of this library because the debugger seemed to suggest it was how wAsync was interacting with it that was causing the problem.

A possible influence in this issue is that the transport event fires every time a message is received and multiple times for messages larger than 8k. According to the documentation, this should only fire once when connecting the socket. I'm not sure how much of an issue this is. For me it's really only adding to the logs, but I wondered if it related to the messages larger than 8k failing.

Here are the configuration details: Android 4.1 - 4.4 (I've also seen the problems on the Android 5.0 previews, and a colleague has seen this in a server based java environment). wAsync 1.4.0 netty 3.9.2 async-http-client 1.8.11 (trying 1.8.14 did not help) The connection uses the websockets transport only. The connection uses TLS. I have the runtime setup to use the NettyAsyncHttpProvider as described on the wiki. I'll note that changing the max chunk buffer size doesn't affect the 8k issue. Recombining chunks of any size seems to be a problem. The client is set to track message size.

Below see some unit tests for the situations:

For the multiple transport event fires:

@Test
    public void repeatedTransportTest() throws Exception {
        final CountDownLatch l = new CountDownLatch(1);
        final AtomicInteger transportCount = new AtomicInteger();
        Config config = new Config.Builder()
                .port(port)
                .host("127.0.0.1")
                .resource("/suspend", new AtmosphereHandler() {

                    private final AtomicBoolean b = new AtomicBoolean(false);

                    @Override
                    public void onRequest(AtmosphereResource r) throws IOException {
                        r.getResponse().getWriter().print("HELLO");                        
                    }

                    @Override
                    public void onStateChange(AtmosphereResourceEvent r) throws IOException {
                        if (!r.isResuming() || !r.isCancelled()) {
                            r.getResource().getResponse().getWriter().print("Pong");
                            r.getResource().resume();
                        }
                    }

                    @Override
                    public void destroy() {

                    }
                }).build();

        server = new Nettosphere.Builder().config(config).build();
        assertNotNull(server);
        server.start();

        final CountDownLatch latch = new CountDownLatch(1);
        final AtomicReference<String> response = new AtomicReference<String>();
        AtmosphereClient client = ClientFactory.getDefault().newClient(AtmosphereClient.class);

        RequestBuilder request = client.newRequestBuilder()
                .method(Request.METHOD.GET)
                .uri(targetUrl + "/suspend")
                .transport(transport())
        .trackMessageLength(true)
        .enableProtocol(true);

        Socket socket = client.create();

        socket.on("message", new Function<String>() {
            @Override
            public void on(String t) {
                response.set(t);
                latch.countDown();
            }
        }).on(new Function<Throwable>() {

            @Override
            public void on(Throwable t) {
                t.printStackTrace();
                latch.countDown();
            }
        }).on(Event.TRANSPORT, new Function<Request.TRANSPORT>()
        {
            @Override
            public void on (Request.TRANSPORT t)
            {
                transportCount.getAndIncrement();
            }
        }).open(request.build());

        for (int i = 0; i < 5; i++) {       
         socket.fire("Ping");       
        }

        latch.await();

        socket.close();
        server.stop();

        assertEquals(transportCount, 1);
    }

For the large message receiving: Note, I was unable to setup SSL/TLS in this unit test, the test doesn't seem to replicate the issue, and I think that the SSL/TLS part may be important to causing the receiving problem.

@Test
    public void largeMessageTest() throws Exception {
        final CountDownLatch l = new CountDownLatch(1);        
        AtomicReference<String> messageText = new AtomicReference<String>();
        messageText.set("");
        for (int i = 0; i < 10000; i++)
        {
            messageText.set(messageText.get() + "0123456789");
        }
        messageText.set(new Integer(messageText.get().length() + 1).toString() + "|" + messageText.get());
        final AtomicReference<String> largeResponse = new AtomicReference<String>();
        largeResponse.set(messageText.get());
        Config config = new Config.Builder()
                .port(port)
                .host("127.0.0.1")
                .supportChunking(true)
                .maxChunkContentLength(8192)
                .resource("/suspend", new AtmosphereHandler()
                {

                    private final AtomicBoolean b = new AtomicBoolean(false);

                    @Override
                    public void onRequest(AtmosphereResource r) throws IOException
                    {
                        if (!b.getAndSet(true))
                        {
                            r.suspend(-1);
                        }
                        else
                        {
                            r.getBroadcaster().broadcast(RESUME);
                        }
                    }

                    @Override
                    public void onStateChange(AtmosphereResourceEvent r) throws IOException
                    {
                        if (!r.isResuming() || !r.isCancelled())
                        {
                            r.getResource().getResponse().getWriter().print(largeResponse.get());
                            r.getResource().resume();
                        }
                    }

                    @Override
                    public void destroy()
                    {

                    }
                }).build();

        server = new Nettosphere.Builder().config(config).build();
        assertNotNull(server);
        server.start();

        final CountDownLatch latch = new CountDownLatch(1);
        final AtomicReference<String> response = new AtomicReference<String>();
        AtmosphereClient client = ClientFactory.getDefault().newClient(AtmosphereClient.class);

        RequestBuilder request = client.newRequestBuilder()
                .method(Request.METHOD.GET)
                .uri(targetUrl + "/suspend")
                .transport(transport())
                .trackMessageLength(true)
                .trackMessageLengthDelimiter("|");

        Socket socket = client.create();

        socket.on(Event.MESSAGE, new Function<String>() {
            @Override
            public void on(String t) {
                response.set(t);
                latch.countDown();
            }
        }).on(new Function<Throwable>() {

            @Override
            public void on(Throwable t) {
                t.printStackTrace();
                latch.countDown();
            }
        }).open(request.build());

        assertNotNull(socket);

        socket.fire("Ping");        

        latch.await();

        socket.close();
        server.stop();

        String [] expected = largeResponse.get().split("\\|");
        assertEquals(response.get(), expected[1]);
    }
phemphill commented 9 years ago

I was able to test this today in an environment that did not have SSL/TLS and I was still unable to receive messages larger than 8k or any subsequent messages on that connection. My earlier hunch that SSL/TLS was related seems to be invalid.

jfarcand commented 9 years ago

@phemphill Thanks. Will try to spend time on it.

pdmars commented 9 years ago

Curious if there have been any updates on this issue? I'm having trouble with 8K+ messages without SSL/TLS using wasync 2.1.2 and async-http-client 1.9.20 and WebSocket as the transport (long polling seems to work). Client side I don't see the message event triggering and server-side with Atmosphere 2.3.4 logging set to trace I just see it closing the connection with error code 1006 and read EOF.

If I downgrade to wasync 1.4.0 and async-http-client 1.8.1 per the suggestion here, it appears to work with WebSockets.

jfarcand commented 9 years ago

@phemphill You need to set [this config]() on the AHC Client

        NettyAsyncHttpProviderConfig nettyConfig = new NettyAsyncHttpProviderConfig();

        nettyConfig.addProperty("tcpNoDelay", "true");
        nettyConfig.addProperty("keepAlive", "true");
        nettyConfig.addProperty("reuseAddress", true);
        nettyConfig.addProperty("connectTimeoutMillis", nettyConnectionTimeout);
        nettyConfig.setWebSocketMaxFrameSize(65536);
pdmars commented 9 years ago

Awesome, that works. Thank you.