eclipse-ee4j / tyrus

Tyrus
Other
115 stars 37 forks source link

Client hangs if i send recoursively #581

Open glassfishrobot opened 9 years ago

glassfishrobot commented 9 years ago

Server side:

@ServerEndpoint("/tests/1client")
public class Tests1Client
{

    @OnOpen
    public void open(Session client) {
        System.out.println("/websocket - open");
    }

    @OnMessage
    public void shout(String text, Session client) throws InterruptedException, IOException
    {
        System.out.println("/websocket -> " + text);
        client.getBasicRemote().sendText(text + "!!!");
    }

    @OnClose
    public void close(Session client) {
        System.out.println("/websocket - close");
    }
}

Case 1: client sends 1000 messages and received 1000 responses. Appeared result is same as expected. Client with iteration of messages sending:

import javax.websocket.*;
import java.net.URI;
import java.util.concurrent.*;

public class Client1
{
    private static final int MESSAGES_COUNT = 1000;
    public static final String URL = "ws://localhost:8080/jsr356/tests/1client";
    private static final String MESSAGE_50_SYMBOLS = "HelloWorldHelloWorldHelloWorldHelloWorldHelloWorld";

    private long startTime;

    private final CountDownLatch messageLatch = new CountDownLatch(MESSAGES_COUNT);
    private ExecutorService exec;

    public void start()
    {
        try
        {
            exec = Executors.newFixedThreadPool(3);
            startTime = System.currentTimeMillis();

            ClientManager client = ClientManager.createClient();
            client.connectToServer(new Endpoint()
            {
@Override
public void onOpen(final Session session, EndpointConfig config)
{
    System.out.println("Client connected in time " + ((System.currentTimeMillis() - startTime) / 1000) + "s");

    session.addMessageHandler(new MessageHandler.Whole<String>()
    {
        @Override
        public void onMessage(String message)
        {
            System.out.println(" Client received message " + message);

            messageLatch.countDown();

        }
    });
    sendMessage(session);
}

@Override
public void onClose(Session session, CloseReason closeReason)
{
    System.out.println("Client disconnected");
}

@Override
public void onError(Session session, Throwable thr)
{
    System.out.println("Client received error");
}
            }, ClientEndpointConfig.Builder.create().build(), new URI(URL));
            messageLatch.await();
            if (messageLatch.getCount() != 0)
            {
System.out.println("Must be sent " + MESSAGES_COUNT + " but send only " + (MESSAGES_COUNT - messageLatch.getCount()));
            }
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
    }

    private void sendMessage(Session session)
    {
        final RemoteEndpoint.Async async = session.getAsyncRemote();
        for (int i = 0; i < MESSAGES_COUNT; i++)
        {
            final int index = i;
            exec.submit(new Callable<Object>()
            {
@Override
public Future<Void> call() throws Exception
{

    System.out.println("Try to send message " + " " + index + " " + MESSAGE_50_SYMBOLS);
    async.sendText(index + " Message " + MESSAGE_50_SYMBOLS);
    return null;
}
            });
        }
    }

    public static void main(String[] args)
    {
        new Client1().start();
    }
}

Case 2: client should send message and receive response 1000 times (algorithm is recursive: next message sending client will make in onMessahe handler). Client with recursive "ping-pong" messages sending:

import org.glassfish.tyrus.client.ClientManager;

import javax.websocket.*;
import java.net.URI;
import java.util.concurrent.*;

public class Client1_recursive
{
    private static final int MESSAGES_COUNT = 1000;
    public static final String URL = "ws://localhost:8080/jsr356/tests/1client";
    private static final String MESSAGE_50_SYMBOLS = "HelloWorldHelloWorldHelloWorldHelloWorldHelloWorld";

    private long startTime;

    private final CountDownLatch messageLatch = new CountDownLatch(MESSAGES_COUNT);
    private ExecutorService exec;

    public void start()
    {
        try
        {
            exec = Executors.newFixedThreadPool(3);
            startTime = System.currentTimeMillis();

            ClientManager client = ClientManager.createClient();
            client.connectToServer(new Endpoint()
            {
@Override
public void onOpen(final Session session, EndpointConfig config)
{
    System.out.println("Client connected in time " + ((System.currentTimeMillis() - startTime) / 1000) + "s");

    session.addMessageHandler(new MessageHandler.Whole<String>()
    {
        @Override
        public void onMessage(String message)
        {
            System.out.println(" Client received message " + message);

            messageLatch.countDown();

            sendMessage(session);
        }
    });
    sendMessage(session);
}

@Override
public void onClose(Session session, CloseReason closeReason)
{
    System.out.println("Client disconnected");
}

@Override
public void onError(Session session, Throwable thr)
{
    System.out.println("Client received error");
}
            }, ClientEndpointConfig.Builder.create().build(), new URI(URL));
            messageLatch.await();
            if (messageLatch.getCount() != 0)
            {
System.out.println("Must be sent " + MESSAGES_COUNT + " but send only " + (MESSAGES_COUNT - messageLatch.getCount()));
            }
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
    }

    private int i = 0;

    private void sendMessage(Session session)
    {
        final int index = i++;

        final RemoteEndpoint.Async async = session.getAsyncRemote();
        exec.submit(new Callable<Object>()
        {
            @Override
            public Future<Void> call() throws Exception
            {

System.out.println("Try to send message " + " " + index + " " + MESSAGE_50_SYMBOLS);
async.sendText(index + " Message " + MESSAGE_50_SYMBOLS);
return null;
            }
        });
    }

    public static void main(String[] args)
    {
        new Client1_recursive().start();
    }
}

In recursive algorithm about 25 (or 74, or 90...) messages was sent, but client hanged , logs for example may be that:

Try to send message  21 HelloWorldHelloWorldHelloWorldHelloWorldHelloWorld
 Client received message 21 Message HelloWorldHelloWorldHelloWorldHelloWorldHelloWorld!!!
Try to send message  22 HelloWorldHelloWorldHelloWorldHelloWorldHelloWorld
 Client received message 22 Message HelloWorldHelloWorldHelloWorldHelloWorldHelloWorld!!!
Try to send message  23 HelloWorldHelloWorldHelloWorldHelloWorldHelloWorld
 Client received message 23 Message HelloWorldHelloWorldHelloWorldHelloWorldHelloWorld!!!
Try to send message  24 HelloWorldHelloWorldHelloWorldHelloWorldHelloWorld

As seen client sends message 24 ant it must be received on server, but in server logs i see that message is not received on server, and client hanged. Why?

glassfishrobot commented 6 years ago
glassfishrobot commented 9 years ago

@glassfishrobot Commented Reported by goleon

glassfishrobot commented 9 years ago

@glassfishrobot Commented @pavelbucek said: This could be a bug but for different reason.

The latter usecase should not work at all - onMessage() method shouldn't be invoked in parallel for the same client. Tyrus should make sure that next onMessage() is not invoked before previous one is finished; we cannot assure in-order message delivery otherwise.

glassfishrobot commented 9 years ago

@glassfishrobot Commented goleon said: Thanks, could you explain this restriction please. It is from WebSocket transport or JSR 356 standard or from JSR server side or JSR client side implementation?

glassfishrobot commented 9 years ago

@glassfishrobot Commented goleon said: And i removed concurrent tasks executor but problem still exists...

glassfishrobot commented 9 years ago

@glassfishrobot Commented @pavelbucek said: the "restriction" is from the JSR 356 itself - as I already mentioned, we cannot guarantee in-order message delivery otherwise. Client/Server side does not matter, since they are equal after WebSocket handshake.

I overlooked that you are sending the message from ExecutorService - in that case, it should work; leave it there and to jstack when it "hangs" and post it here, we might be able to find something unusual.

glassfishrobot commented 7 years ago

@glassfishrobot Commented This issue was imported from java.net JIRA TYRUS-404