rajith77 / qpid

Mirror of Apache Qpid
0 stars 0 forks source link

Reproducer from QPID-5117 causes livelock #10

Open pmoravec opened 11 years ago

pmoravec commented 11 years ago

Reproducer from https://issues.apache.org/jira/browse/QPID-5117 (see below) causes live-lock.

Program:

$ cat Java_MRG/Rajiths_new_client/test_QPID5117.java import javax.jms.*;

import org.slf4j.Logger; import org.slf4j.LoggerFactory;

//import org.apache.qpid.client.AMQConnection;

import org.apache.qpid.amqp_0_10.jms.impl.ConnectionImpl;

public class test_QPID5117 { static final Logger log = LoggerFactory.getLogger(test_QPID5117.class);

public static int threads = 5;

public static void main(String[] args)
{
    if (args.length > 0) {
        try {
            threads = Integer.parseInt(args[0]);
        } catch (NumberFormatException e) {
            System.err.println("Argument must be integer.");
            System.exit(1);
        }
    }
    test_QPID5117 testMsg = new test_QPID5117();
}

private MessageConsumer[] clientConsumer;

private MessageProducer clientProducer;

private Session session;

private Connection connection;

final MessageListener messageListener = new MessageListener()
{
    @Override
    public void onMessage(Message _message)
    {
        try
        {
    log.info("onMessage(): before new session");
    Session ssn2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    log.info("onMessage(): after new session");
    ssn2.close();
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
    }
};

public test_QPID5117()
{
    try
    {
        connection = new ConnectionImpl(
                "amqp://anonymous:anonymous@clientid/test?brokerlist='tcp://localhost:5672'");
        connection.start();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination prodDest = session.createTopic("amq.fanout/key; { node:{type:topic}}");
    Destination consDest = session.createQueue("myQueue; {create:always, node:{ x-bindings:[{'exchange':'amq.fanout', 'queue':'myQueue'}]}}");
    clientConsumer = new MessageConsumer[threads];
    for (int i=0; i<threads; i++) {
            clientConsumer[i] = session.createConsumer(session.createQueue("myQueue"+i+"; {create:always, node:{ x-bindings:[{'exchange':'amq.fanout', 'queue':'myQueue"+i+"'}]}}"));
            while (clientConsumer[i].receive(1)!=null) {}
            clientConsumer[i].setMessageListener(messageListener);
        }
        clientProducer = session.createProducer(prodDest);
        clientProducer.send(session.createTextMessage(""));
    clientProducer.close();

        Thread.sleep(20);
    log.info("main(): before connection.close()");
    connection.close();
    log.info("main(): after connection.close()");

// clientConsumer.close(); // clientProducer.close(); // session.close(); // connection.close(); } catch (Exception e) { e.printStackTrace(); } System.out.println("Finishing."); } } $

Run it as:

java test_QPID5117 50

Sometimes it stucks without logging "Finishing" with jstack:

2013-09-06 13:05:45 Full thread dump OpenJDK 64-Bit Server VM (20.0-b12 mixed mode):

"Attach Listener" daemon prio=10 tid=0x00007fc3f4001000 nid=0x4695 waiting on condition [0x0000000000000000] java.lang.Thread.State: RUNNABLE

"ack-flusher" daemon prio=10 tid=0x00007fc41c376800 nid=0x4660 in Object.wait() [0x00007fc41668b000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method)

"IoReceiver - localhost/127.0.0.1:5672 -2013-09-06 13:00:30.065" daemon prio=10 tid=0x00007fc41c340000 nid=0x465f runnable [0x00007fc416996000] java.lang.Thread.State: RUNNABLE at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.read(SocketInputStream.java:146) at org.apache.qpid.transport.network.io.IoReceiver.run(IoReceiver.java:162) at java.lang.Thread.run(Thread.java:679)

"IoSender - localhost/127.0.0.1:5672 -2013-09-06 13:00:30.068" daemon prio=10 tid=0x00007fc41c2fd000 nid=0x465e in Object.wait() [0x00007fc416a97000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method)

"Dispatcher-1-Conn-0" prio=10 tid=0x00007fc41c323000 nid=0x465b waiting for monitor entry [0x00007fc416e26000] java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.qpid.amqp_0_10.jms.impl.ConnectionImpl.removeSession(ConnectionImpl.java:659)

"Low Memory Detector" daemon prio=10 tid=0x00007fc41c0a2800 nid=0x4659 runnable [0x0000000000000000] java.lang.Thread.State: RUNNABLE

"C2 CompilerThread1" daemon prio=10 tid=0x00007fc41c0a1000 nid=0x4658 waiting on condition [0x0000000000000000] java.lang.Thread.State: RUNNABLE

"C2 CompilerThread0" daemon prio=10 tid=0x00007fc41c09b800 nid=0x4657 waiting on condition [0x0000000000000000] java.lang.Thread.State: RUNNABLE

"Signal Dispatcher" daemon prio=10 tid=0x00007fc41c09a000 nid=0x4656 runnable [0x0000000000000000] java.lang.Thread.State: RUNNABLE

"Finalizer" daemon prio=10 tid=0x00007fc41c07b000 nid=0x4655 in Object.wait() [0x00007fc417652000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method)

"Reference Handler" daemon prio=10 tid=0x00007fc41c079000 nid=0x4654 in Object.wait() [0x00007fc417753000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method)

"main" prio=10 tid=0x00007fc41c00a800 nid=0x464e in Object.wait() [0x00007fc420e53000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method)

"VM Thread" prio=10 tid=0x00007fc41c074000 nid=0x4653 runnable

"GC task thread#0 (ParallelGC)" prio=10 tid=0x00007fc41c015800 nid=0x464f runnable

"GC task thread#1 (ParallelGC)" prio=10 tid=0x00007fc41c017800 nid=0x4650 runnable

"GC task thread#2 (ParallelGC)" prio=10 tid=0x00007fc41c019000 nid=0x4651 runnable

"GC task thread#3 (ParallelGC)" prio=10 tid=0x00007fc41c01b000 nid=0x4652 runnable

"VM Periodic Task Thread" prio=10 tid=0x00007fc41c0a5000 nid=0x465a waiting on condition

JNI global references: 1679

Checking tcpdump, sometimes the client opens a new session (jstack above), sometimes it closes AMQP connection - but remains TCP connection established.