rajith77 / qpid

Mirror of Apache Qpid
0 stars 0 forks source link

"Exception when trying to send message accepts" JMSE raised when closing session just after receiving messages on open consumer #6

Open pmoravec opened 11 years ago

pmoravec commented 11 years ago

Reproducer:

$ cat SendReceive_HugeMapMessage.java import javax.jms.*; import org.apache.qpid.amqp_0_10.jms.impl.ConnectionImpl;

public class SendReceive_HugeMapMessage { private int blocks; private int block_size;

public static void main(String[] args) throws Exception { SendReceive_HugeMapMessage SendReceive = new SendReceive_HugeMapMessage(); if (args.length > 1) { SendReceive.blocks = Integer.parseInt(args[0]); SendReceive.block_size = Integer.parseInt(args[1]); } else { System.err.println("Missing arguments with #blocks and block_size"); System.exit(1); } SendReceive.fireSndRcv(); }

public SendReceive_HugeMapMessage() {} public void fireSndRcv() throws Exception { try { Connection connection = new ConnectionImpl("amqp://guest:guest@guest/test?brokerlist='tcp://localhost:5672'&maxprefetch='100'"); connection.start(); Session session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

  Queue myQueue = session.createQueue("testQueue; {create:always}");
  MessageProducer sender = session.createProducer(myQueue);
  MessageConsumer receiver = session.createConsumer(myQueue);

  MapMessage request = session.createMapMessage();
  String txt = "";
  for (int i=0; i<block_size; i++)
txt+="X";
  for (int i=0; i<blocks; i++)
    request.setString("prop"+i, txt);

  request.setIntProperty("Blocks", blocks);
  request.setIntProperty("BlockSize", block_size);
  request.setJMSType("amqp/map");
  sender.send(request);

  MapMessage response;

  while ((response = (MapMessage) receiver.receive(3*1000)) != null) {
System.out.println(response);
  }

  session.close();
  connection.close();
}
catch (Exception exp)
{
  exp.printStackTrace();
}

} } $

(end of reproducer)

Run it e.g.: $ java SendReceive_HugeMapMessage 3 5 javax.jms.JMSException: Exception when trying to send message accepts at org.apache.qpid.util.ExceptionHelper.toJMSException(ExceptionHelper.java:59) at org.apache.qpid.amqp_0_10.jms.impl.MessageConsumerImpl.sendMessageAccept(MessageConsumerImpl.java:659) at org.apache.qpid.amqp_0_10.jms.impl.MessageConsumerImpl.postDeliver(MessageConsumerImpl.java:481) at org.apache.qpid.amqp_0_10.jms.impl.MessageConsumerImpl.receiveImpl(MessageConsumerImpl.java:341) at org.apache.qpid.amqp_0_10.jms.impl.MessageConsumerImpl.receive(MessageConsumerImpl.java:282) at org.apache.qpid.amqp_0_10.jms.impl.MessageConsumerImpl.receive(MessageConsumerImpl.java:51) at SendReceive_HugeMapMessage.fireSndRcv(SendReceive_HugeMapMessage.java:53) at SendReceive_HugeMapMessage.main(SendReceive_HugeMapMessage.java:24) Caused by: javax.jms.JMSException: Exception when trying to send message accepts at org.apache.qpid.util.ExceptionHelper.toJMSException(ExceptionHelper.java:113) at org.apache.qpid.util.ExceptionHelper.toJMSException(ExceptionHelper.java:51) at org.apache.qpid.amqp_0_10.jms.impl.SessionImpl.sendAcknowledgements(SessionImpl.java:883) at org.apache.qpid.amqp_0_10.jms.impl.MessageConsumerImpl.sendMessageAccept(MessageConsumerImpl.java:655) ... 6 more Caused by: org.apache.qpid.transport.SessionClosedException: session closed at org.apache.qpid.transport.Session.sync(Session.java:885) at org.apache.qpid.transport.Session.sync(Session.java:846) at org.apache.qpid.amqp_0_10.jms.impl.SessionImpl.sendAcknowledgements(SessionImpl.java:878) ... 7 more (program stalls here)

pmoravec commented 11 years ago

See relevant https://github.com/rajith77/qpid/issues/5. In both cases, ./amqp-0-10-client-jms/src/main/java/org/apache/qpid/amqp_0_10/jms/impl/SessionImpl.java:closeImpl method calls "_closed.set(true);" first, and then it tries to close producers, consumers etc. Whereas these activities include sending e.g. message.cancel AMQP command over a session in (internally) closed state. And no waiting for other pending activity is done.