rajith77 / qpid

Mirror of Apache Qpid
0 stars 0 forks source link

Unable to reconnect after producer getting exception from broker that closes connection #9

Open pmoravec opened 11 years ago

pmoravec commented 11 years ago

After a producer gets exception from the broker such that the connection is closed, there is no way how to re-utilize the Connection or Session object.

Reproducer:

import javax.jms.*;

import org.apache.qpid.amqp_0_10.jms.impl.ConnectionImpl;

public class reconnect_after_producerError { public static void main(String[] args) { reconnect_after_producerError testMsg = new reconnect_after_producerError(); }

private MessageConsumer backupConsumer;
private MessageProducer initProducer;

private Session session;

private Connection connection;

public reconnect_after_producerError()
{
    try
    {
        connection = new ConnectionImpl(
                "amqp://anonymous:anonymous@clientid/test?brokerlist='tcp://localhost:5672?retries='100''");
        connection.start();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination backupQueue = session.createQueue("BackupQueue; {create:always, node:{durable:true, x-declare:{ arguments:{'qpid.file_size':10, 'qpid.file_count':4 }}}}");

        backupConsumer = session.createConsumer(backupQueue);
        while (backupConsumer.receive(1)!=null)
            {}
        backupConsumer.close();
        initProducer = session.createProducer(backupQueue);

        String s = "someLongText ";
        for (int i=0; i<10; i++)
            s+=s;
        TextMessage msg = session.createTextMessage(s);
        try {
            while (true) {
                initProducer.send(msg);
            }
        }
        catch (Exception e)
        {
            Thread.sleep(100);
            System.out.println("Caught exception: " + e);
            e.printStackTrace();
            System.out.println("Init cause:" + e.getCause().toString());
            System.out.println("--------------------------------------------------------");
            System.out.println("Reconnecting");
            System.out.println("--------------------------------------------------------");

// session.close(); // connection.close(); Thread.sleep(100);

// connection.start(); // session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); backupConsumer = session.createConsumer(backupQueue); while (backupConsumer.receive(1)!=null) {} }

        System.out.println("Closing everything");
        initProducer.close();
        session.close();
        connection.close();
    }
    catch (Exception e)
    {
        System.out.println("Caught exception in main: " + e);
        e.printStackTrace();
        if (e.getCause() != null)
            System.out.println("Init cause:" + e.getCause().toString());
    }
    System.out.println("Finishing.");
}

}

(end of reproducer)

When running as is, the below exception is raised:

Caught exception in main: javax.jms.IllegalStateException: Session is closed javax.jms.IllegalStateException: Session is closed at org.apache.qpid.amqp_0_10.jms.impl.SessionImpl.checkClosed(SessionImpl.java:800) at org.apache.qpid.amqp_0_10.jms.impl.SessionImpl.checkPreConditions(SessionImpl.java:791) at org.apache.qpid.amqp_0_10.jms.impl.SessionImpl.createConsumer(SessionImpl.java:523) at org.apache.qpid.amqp_0_10.jms.impl.SessionImpl.createConsumer(SessionImpl.java:511) at reconnect_after_producerError.(reconnect_after_producerError.java:59) at reconnect_after_producerError.main(reconnect_after_producerError.java:9)

When uncommenting session closing+resetting, the below exception is raised:

Caught exception in main: javax.jms.IllegalStateException: Connection is CLOSED javax.jms.IllegalStateException: Connection is CLOSED at org.apache.qpid.amqp_0_10.jms.impl.ConnectionImpl.checkClosed(ConnectionImpl.java:946) at org.apache.qpid.amqp_0_10.jms.impl.ConnectionImpl.createSessionImpl(ConnectionImpl.java:237) at org.apache.qpid.amqp_0_10.jms.impl.ConnectionImpl.createSession(ConnectionImpl.java:219) at reconnect_after_producerError.(reconnect_after_producerError.java:58) at reconnect_after_producerError.main(reconnect_after_producerError.java:9)

And when uncommenting also connection closing+reopening, then the below exception is raised:

Caught exception in main: org.apache.qpid.transport.TransportFailureException: connection aborted org.apache.qpid.transport.TransportFailureException: connection aborted at org.apache.qpid.transport.network.io.IoSender.send(IoSender.java:118) at org.apache.qpid.transport.network.io.IoSender.send(IoSender.java:43) at org.apache.qpid.transport.network.Disassembler.frame(Disassembler.java:117) at org.apache.qpid.transport.network.Disassembler.fragment(Disassembler.java:147) at org.apache.qpid.transport.network.Disassembler.method(Disassembler.java:233) at org.apache.qpid.transport.network.Disassembler.control(Disassembler.java:167) at org.apache.qpid.transport.network.Disassembler.control(Disassembler.java:48) at org.apache.qpid.transport.Method.delegate(Method.java:163) at org.apache.qpid.transport.network.Disassembler.send(Disassembler.java:73) at org.apache.qpid.transport.network.Disassembler.send(Disassembler.java:48) at org.apache.qpid.transport.ClientConnection.send(ClientConnection.java:95) at org.apache.qpid.transport.Session.send(Session.java:611) at org.apache.qpid.transport.ClientSession.invoke(ClientSession.java:143) at org.apache.qpid.transport.Session.invoke(Session.java:636) at org.apache.qpid.transport.SessionInvoker.sessionRequestTimeout(SessionInvoker.java:45) at org.apache.qpid.transport.Session.close(Session.java:1067) at org.apache.qpid.amqp_0_10.jms.impl.SessionImpl.closeImpl(SessionImpl.java:251) at org.apache.qpid.amqp_0_10.jms.impl.SessionImpl.close(SessionImpl.java:206) at reconnect_after_producerError.(reconnect_after_producerError.java:53) at reconnect_after_producerError.main(reconnect_after_producerError.java:9) Caused by: java.net.SocketException: Connection reset at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113) at java.net.SocketOutputStream.write(SocketOutputStream.java:153) at org.apache.qpid.transport.network.io.IoSender.run(IoSender.java:321) at java.lang.Thread.run(Thread.java:679) Init cause:java.net.SocketException: Connection reset Finishing.

(and the program does not terminate while connected to the broker on AMQP level and having jstack:

2013-09-02 11:43:42 Full thread dump OpenJDK 64-Bit Server VM (20.0-b12 mixed mode):

"Attach Listener" daemon prio=10 tid=0x00007f4fb0001000 nid=0x29a9 waiting on condition [0x0000000000000000] java.lang.Thread.State: RUNNABLE

"IoReceiver - localhost/127.0.0.1:5672 -2013-09-02 11:43:14.185" daemon prio=10 tid=0x00007f4f84002800 nid=0x2998 runnable [0x00007f4fd2706000] java.lang.Thread.State: RUNNABLE at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.read(SocketInputStream.java:146) at org.apache.qpid.transport.network.io.IoReceiver.run(IoReceiver.java:162) at java.lang.Thread.run(Thread.java:679)

"IoSender - localhost/127.0.0.1:5672 -2013-09-02 11:43:14.186" daemon prio=10 tid=0x00007f4f84001800 nid=0x2997 in Object.wait() [0x00007f4fd2605000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method)

"DestroyJavaVM" prio=10 tid=0x00007f4fd400a800 nid=0x2982 waiting on condition [0x0000000000000000] java.lang.Thread.State: RUNNABLE

"ack-flusher" daemon prio=10 tid=0x00007f4fd4357800 nid=0x2994 in Object.wait() [0x00007f4fd22fa000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method)

"Dispatcher-1-Conn-0" prio=10 tid=0x00007f4fd42d5800 nid=0x298f waiting on condition [0x00007f4fd2a95000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method)

"Low Memory Detector" daemon prio=10 tid=0x00007f4fd40a2800 nid=0x298d runnable [0x0000000000000000] java.lang.Thread.State: RUNNABLE

"C2 CompilerThread1" daemon prio=10 tid=0x00007f4fd40a1000 nid=0x298c waiting on condition [0x0000000000000000] java.lang.Thread.State: RUNNABLE

"C2 CompilerThread0" daemon prio=10 tid=0x00007f4fd409b800 nid=0x298b waiting on condition [0x0000000000000000] java.lang.Thread.State: RUNNABLE

"Signal Dispatcher" daemon prio=10 tid=0x00007f4fd409a000 nid=0x298a runnable [0x0000000000000000] java.lang.Thread.State: RUNNABLE

"Finalizer" daemon prio=10 tid=0x00007f4fd407b000 nid=0x2989 in Object.wait() [0x00007f4fd32d9000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method)

"Reference Handler" daemon prio=10 tid=0x00007f4fd4079000 nid=0x2988 in Object.wait() [0x00007f4fd33da000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method)

"VM Thread" prio=10 tid=0x00007f4fd4074000 nid=0x2987 runnable

"GC task thread#0 (ParallelGC)" prio=10 tid=0x00007f4fd4015800 nid=0x2983 runnable

"GC task thread#1 (ParallelGC)" prio=10 tid=0x00007f4fd4017800 nid=0x2984 runnable

"GC task thread#2 (ParallelGC)" prio=10 tid=0x00007f4fd4019000 nid=0x2985 runnable

"GC task thread#3 (ParallelGC)" prio=10 tid=0x00007f4fd401b000 nid=0x2986 runnable

"VM Periodic Task Thread" prio=10 tid=0x00007f4fd40ad800 nid=0x298e waiting on condition

JNI global references: 1577

)