rajith77 / qpid

Mirror of Apache Qpid
0 stars 0 forks source link

live-lock in the client when trying to close a consumer that is sending a message in onMessage of its listener #7

Open pmoravec opened 11 years ago

pmoravec commented 11 years ago

reproducer:

$ cat play_with_onMessage.java import javax.jms.*; import org.apache.qpid.amqp_0_10.jms.impl.ConnectionImpl;

public class play_with_onMessage {

public static void main(String[] args)
{
    play_with_onMessage testMsg = new play_with_onMessage();
}

private MessageConsumer clientConsumer0;
private MessageConsumer clientConsumer1;

private MessageProducer[] clientProducer = new MessageProducer[2];

private Session session;

private Connection connection;

final MessageListener messageListener = new MessageListener()
{
    @Override
    public void onMessage(Message _message)
    {
        try
        {
            String text = ((TextMessage) _message).getText();
            int nr = Integer.parseInt(text);
            nr = 1 - nr; nr=0;
            Message msg = session.createTextMessage("" + nr);
            clientProducer[nr].send(msg);
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
    }
};

public play_with_onMessage()
{
    try
    {
        connection = new ConnectionImpl(

// "amqp://anonymous:anonymous@clientid/test?brokerlist='tcp://localhost:5001;tcp://localhost:5002'&failover='roundrobin?cyclecount='100''"); "amqp://anonymous:anonymous@clientid/test?brokerlist='tcp://localhost:5672'"); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination dest0 = session.createQueue("Queue0;{create:always}"); // Destination dest1 = session.createQueue("Queue1;{create:always}"); clientConsumer0 = session.createConsumer(dest0); // clientConsumer1 = session.createConsumer(dest1); clientProducer[0] = session.createProducer(dest0); // clientProducer[1] = session.createProducer(dest1); clientConsumer0.setMessageListener(messageListener); // clientConsumer1.setMessageListener(messageListener);

    clientProducer[0].send(session.createTextMessage("0"));
        Thread.sleep(10000);
    System.out.println("Finishing..");

    clientProducer[0].close();

// clientProducer[1].close(); clientConsumer0.close(); // clientConsumer1.close(); session.close(); connection.close(); System.out.println("Everything closed now"); } catch (Exception e) { e.printStackTrace(); } } } $

(end of reproducer)

Just run it, wait 10 seconds and it should terminate (in most cases, to have more deterministic behaviour, uncomment producers and consumers).

When it stalls, jstack is:

Full thread dump OpenJDK 64-Bit Server VM (20.0-b12 mixed mode):

"Attach Listener" daemon prio=10 tid=0x00007f4fb4001000 nid=0x4845 waiting on condition [0x0000000000000000] java.lang.Thread.State: RUNNABLE

"ack-flusher" daemon prio=10 tid=0x00007f4fe837e800 nid=0x482c in Object.wait() [0x00007f4fde779000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method)

"IoReceiver - localhost/127.0.0.1:5672 -2013-08-29 13:19:47.414" daemon prio=10 tid=0x00007f4fe833b000 nid=0x482b runnable [0x00007f4fdea81000] java.lang.Thread.State: RUNNABLE at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.read(SocketInputStream.java:146) at org.apache.qpid.transport.network.io.IoReceiver.run(IoReceiver.java:162) at java.lang.Thread.run(Thread.java:679)

"IoSender - localhost/127.0.0.1:5672 -2013-08-29 13:19:47.417" daemon prio=10 tid=0x00007f4fe8304000 nid=0x482a in Object.wait() [0x00007f4fdeb82000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method)

"Dispatcher-1-Conn-0" prio=10 tid=0x00007f4fe82de000 nid=0x4827 in Object.wait() [0x00007f4fdef11000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method)

"Low Memory Detector" daemon prio=10 tid=0x00007f4fe80a3000 nid=0x4825 runnable [0x0000000000000000] java.lang.Thread.State: RUNNABLE

"C2 CompilerThread1" daemon prio=10 tid=0x00007f4fe80a1000 nid=0x4824 waiting on condition [0x0000000000000000] java.lang.Thread.State: RUNNABLE

"C2 CompilerThread0" daemon prio=10 tid=0x00007f4fe809c000 nid=0x4823 waiting on condition [0x0000000000000000] java.lang.Thread.State: RUNNABLE

"Signal Dispatcher" daemon prio=10 tid=0x00007f4fe809a000 nid=0x4822 runnable [0x0000000000000000] java.lang.Thread.State: RUNNABLE

"Finalizer" daemon prio=10 tid=0x00007f4fe807b000 nid=0x4821 in Object.wait() [0x00007f4fdf730000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method)

"Reference Handler" daemon prio=10 tid=0x00007f4fe8079000 nid=0x4820 in Object.wait() [0x00007f4fdf831000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method)

"main" prio=10 tid=0x00007f4fe800b000 nid=0x481a in Object.wait() [0x00007f4fecf45000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method)

"VM Thread" prio=10 tid=0x00007f4fe8074800 nid=0x481f runnable

"GC task thread#0 (ParallelGC)" prio=10 tid=0x00007f4fe8015800 nid=0x481b runnable

"GC task thread#1 (ParallelGC)" prio=10 tid=0x00007f4fe8017800 nid=0x481c runnable

"GC task thread#2 (ParallelGC)" prio=10 tid=0x00007f4fe8019800 nid=0x481d runnable

"GC task thread#3 (ParallelGC)" prio=10 tid=0x00007f4fe801b000 nid=0x481e runnable

"VM Periodic Task Thread" prio=10 tid=0x00007f4fe80ad800 nid=0x4826 waiting on condition

JNI global references: 1509