Open arinban opened 10 years ago
@arinban Commented
@arinban Commented @arinban Commented
@arinban Commented @arinban Commented @arinban Commented
@arinban Commented @arinban Commented @arinban Commented @glassfishrobot Commented Reported by gerhard_niklasch
@arinban Commented @arinban Commented @arinban Commented @glassfishrobot Commented gerhard_niklasch said: This JIRA doesn't seem to allow me to upload attachments...
Unified context diffs against latest rev=236:
— ../../InboundJmsResourcePool.java,rev=236 2013-12-03 10:37:32.000000000 +0100 +++ com/sun/genericra/inbound/async/InboundJmsResourcePool.java 2014-01-14 11:27:28.000000000 +0100 @@ -1,5 +1,5 @@ /**
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. @@ -25,7 +25,6 @@ import java.util.logging.Logger;
import javax.jms.; -import com.sun.genericra.GenericJMSRA; import javax.resource.spi.; import javax.resource.spi.endpoint.*; import javax.resource.ResourceException; @@ -34,7 +33,7 @@
/**
@author Binod P.G */ public class InboundJmsResourcePool extends AbstractJmsResourcePool implements ServerSessionPool { @@ -48,21 +47,38 @@ private ArrayList resources; private int maxSize; private int connectionsInUse = 0;
{awakened PauseObjects}
== maxSize
public InboundJmsResourcePool(EndpointConsumer consumer, boolean transacted)
{ - super(consumer, transacted); + super(consumer, transacted); this.waitQ = new LinkedList(); }
+ public int getMaxSize()
{ return this.maxSize; }
+ public long getMaxWaitTime()
{ return this.maxWaitTime; }
@@ -74,7 +90,7 @@ } return ret; }
+ public int getBusyResources()
Unknown macro: { int busy = 0; if (resources != null) { @@ -88,8 +104,8 @@ } return busy; }
public int getFreeResources() { +
return free;
}
public int getConnectionsInUse() { - return this.connectionsInUse; - }
public int getWaiting() {
int wait = 0;
if (this.waitQ != null) { - wait = this.waitQ.size(); - }
return wait;
}
public ConnectionConsumer createConnectionConsumer(Destination dest, String name,
}
public int getConnectionsInUse() { + return this.connectionsInUse; + }
public int getConnectionsAvail()
{ + return this.connectionsAvail; + }
+
Unknown macro: {+ int wait = 0;+ if (this.waitQ != null) { + wait = this.waitQ.size(); + }+ return wait;+ }
+
+ if (isTopic())
{ String selector = constructSelector(name); conconsumer = ((TopicConnection) con).createConnectionConsumer((Topic) dest, @@ -130,10 +151,10 @@ conconsumer = con.createConnectionConsumer(dest, name, this, maxMessages); }
+ return conconsumer; }
+ public ConnectionConsumer createDurableConnectionConsumer(Destination dest, String name, String sel, int maxMessages) throws JMSException
{ ConnectionConsumer conconsumer = null; @@ -142,11 +163,14 @@ conconsumer = ((TopicConnection) con).createDurableConnectionConsumer((Topic) dest, name, selector, this, maxMessages); return conconsumer; - }
public synchronized void initialize() throws ResourceException {
{ XAConnectionFactory xacf = (XAConnectionFactory) consumer.getConnectionFactory(); @@ -168,15 +192,15 @@ this.con = createConnection(cf); }
// XX stopped is already false (from AbstractJmsResourcePool) stopped = false;
_logger.log(Level.FINE, "ServerSession resource pool initialized"); } catch (JMSException e)
{ throw ExceptionUtils.newResourceException(e); }
}
-
public InboundJmsResource create() throws JMSException {
public ServerSession getServerSession() throws JMSException { InboundJmsResource result = null; PauseObject obj = null;
while (result == null) { validate();
result = _getServerSession();
if (result == null) {
if (maxWaitTime >= 0) {
if (obj == null) { - obj = new PauseObject(); - }
-
obj.pauseCallingThread();
}
}
synchronized (this) {
// When obj != null, we've come around the loop, thus must
// have been awakened by a resource having been returned
// to the pool. Moreover, the awakened PauseObject has
// already been removed from waitQ, and the invariant is
// maintained by the PauseObject going out of scope.
// Otherwise we're new here and compete for connectionsAvail
// (only).
if (!stopped && ((obj != null) || (connectionsAvail > 0)))Unknown macro: {+ _logger.log(Level.FINER,+ "JMS provider is getting the ServerSession");+ result = _getServerSession();+ if (obj == null) { + connectionsAvail--; + }+ break;+ }
// prevent a race against releaseAllWaitingThreads():
if (destroyed)
{ + // Can happen when destroy() was called just after we've + // come through validate() but before we locked the pool + // monitor and checked for being stopped. If the pool is + // stopped in order to be destroyed, we don't want to hide + // ourselves on the waitQ. + // Instead, we go around the loop, whereupon validate() + // will throw an exception. + continue; + }
+
{ + waitQ.addLast(obj); + }
public void validate() throws JMSException {
if(deploymentCompleted)
} catch(UnavailableException ue) { + } catch (UnavailableException ue) { failed = true; - } catch(Throwable t) { + }
catch (Throwable t)
{ //throwable may not be thrown but as safeguard I added this //since I am passing null object for createEndpoint. }
@@ -253,13 +310,13 @@ } } try
{ - if(endPoint != null ) + if (endPoint != null ) endPoint.release(); - }
catch(Exception e)
{ + }
catch (Exception e)
{ //ignore }
if(failed)Unknown macro: {+ if (failed) { _logger.log(Level.FINER, "Application not yet deployed or" + " deployment failed.\n Use properties MDBDeploymentRetryAttempt" + " & MDBDeploymentRetryInterval for tuning MDB deployment"); @@ -268,14 +325,8 @@ } }
private synchronized InboundJmsResource _getServerSession()
if (stopped) { - return null; - }
- Iterator it = resources.iterator();
while (it.hasNext()) { @@ -283,25 +334,27 @@
if (resource.isFree())
{ connectionsInUse++; - + return resource.markAsBusy(); }
}
if (resources.size() < this.maxSize)
{ - InboundJmsResource res = create(); - resources.add(res); + InboundJmsResource resource = create(); + resources.add(resource); connectionsInUse++; - return res.markAsBusy(); + return resource.markAsBusy(); }
-
public synchronized void put(InboundJmsResource resource) { resource.markAsFree(); connectionsInUse--;
if (stopped) { if (connectionsInUse <= 0)
{ @@ -312,35 +365,55 @@ }
}
{ + obj = (PauseObject) waitQ.removeFirst(); + }
{ + // released resource is reserved for oldest waiter, about to + // be awakened + obj.resume(); + }
else
{ + // no waiters: released resource becomes immediately available + connectionsAvail++; + }
}
/**
Stops message delivery. Any message that is currently being delivered
this.stopped = true;
this.maxWaitTime = 0;
waitForAll(); releaseAllResources();
if (dmdCon != null)
{ this.dmdCon.close(); }
_logger.log(Level.FINE, "ServerSession resource pool is now stopped."); }
/**
*/
public void destroy() throws JMSException { - this.destroyed = true; - stop(); - releaseAllWaitingThreads(); - }
-
public synchronized void waitForAll() {
" to come back to pool");
try
{ wait(this.consumer.getSpec().getEndpointReleaseTimeout() * 1000); @@ -349,20 +422,9 @@ }
}
public void releaseAllWaitingThreads() {
Iterator it = waitQ.iterator();
while (it.hasNext()) { - PauseObject obj = (PauseObject) it.next(); - obj.resume(); - count++; - }
-
_logger.log(Level.FINE, "Released a total of " + count + " requests");
public void releaseAllResources() {
while (it.hasNext())
{ @@ -372,78 +434,112 @@ obj.destroy(); }
catch (Exception e)
{ // This is just to make sure that if one resource fails to destroy - // we still call destroy on others. + // we still call destroy on others. _logger.log(Level.SEVERE, "Cannot destroy resource " + obj.toString()); }
}
// We do not clean up the resources object itself - this would need
// to be changed (among other things) in order for stop() to become
// undoable. }
/**
*/
public void destroy() throws JMSException
{ + _logger.log(Level.FINE, "Destroying the ServerSession resource pool..."); + this.destroyed = true; + stop(); + releaseAllWaitingThreads(); + _logger.log(Level.FINE, "ServerSession resource pool destroyed."); + }
/**
*/
private void releaseAllWaitingThreads() {
int count = 0;
public void resumeWaitingThread() {
synchronized (waitQ) {
if (waitQ.size() > 0) { - obj = (PauseObject) waitQ.removeFirst(); - }
if (obj != null) {
while (true) {
PauseObject obj = null;
synchronized (waitQ)Unknown macro: {+ if (waitQ.size() > 0) { + obj = (PauseObject) waitQ.removeFirst(); + }+ }
if (obj == null)
{ + break; + }
obj.resume();
/**
void pauseCallingThread() throws JMSException {
if (maxWaitTime == 0) {
else { if (startTime == 0)
{ this.startTime = System.currentTimeMillis(); - }
-
elapsedWaitTime = startTime - System.currentTimeMillis();
if (elapsedWaitTime > maxWaitTime) {
/ WE SHOULD LOG SOMETHING /
_logger.log(Level.WARNING,
"MaxWaitTime exceeded without acquiring " +
"a ServerSession");
// Don't leave junk behind on the waitQ. No-op when we
// aren't in fact enqueued.
synchronized (waitQ) { + waitQ.remove(this); + }
// make it the JMS Provider's problem String msg = sm.getString("pool_limit_reached"); throw new JMSException(msg); }
remainingWaitTime = startTime - elapsedWaitTime;
pause();
synchronized void pause() {
synchronized (waitQ) { - waitQ.addLast(this); - }
-
try { - _logger.log(Level.FINE, "Waiting for :" + remainingWaitTime); - wait(remainingWaitTime); - }
catch (InterruptedException ie)
{ - }
-
synchronized (waitQ) { - waitQ.remove(this); - }
Unknown macro: {+ try { + checkTime(); + wait(remainingWaitTime); + } catch (InterruptedException ie) { + // treat as a spurious wakeup + }+ }
}
synchronized void resume()
{ - _logger.log(Level.FINE, "Notifying the thread"); + _logger.log(Level.FINE, "Awakening and notifying the thread"); + awakened = true; notify(); }
}
— ../../ActivationSpec.java,rev=236 2013-12-12 13:16:49.000000000 +0100 +++ com/sun/genericra/inbound/ActivationSpec.java 2014-01-14 11:25:34.000000000 +0100 @@ -1,5 +1,5 @@ /**
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. @@ -54,7 +54,7 @@ private int reconnectAttempts; private int reconnectInterval; private int maxPoolSize = 8;
private int maxWaitTime = 3;
@arinban Commented @arinban Commented @arinban Commented @glassfishrobot Commented gerhard_niklasch said: Additional explanations of what's being changed and why:
com.sun.genericra.inbound.async.InboundJmsResourcePool com.sun.genericra.inbound.async.InboundJmsResourcePool$PauseObject
Unfortunately, maintaining the required invariants requires fixing a number of additional glitches, including two other race conditions.
Refactor the "stopped" check from _getServerSession() to its caller to simplify the logic. _getServerSession() will now only be called when it's guaranteed to succeed.
Fix elapsedWaitTime and remainingWaitTime calculations in PauseObject (i.e. fix #56 ), refactoring pause() into pauseWaitingThread() whilst factoring out checkTime() for legibility. See below for a tweak in ActivationSpec necessitated by this fix.
Treat MaxWaitTime < 0 as meaning "infinite" like a value of 0 does instead of embarking on an infinite busy loop. (NB User's Guide doesn't specify anything for this case.) In fact, ActivationSpec.validate() would catch negative values, but it has a setter method for the tunable, so better be safe here.
Fix race where a freshly called getServerSession() could grab a recently resource out of order, although there had been older waiters. We need to distinguish between resources earmarked for an existing waiter at the time they are freed, and resources that will be immediately available to a newcomer. connectionsAvail keeps track of the latter.
Fix race between late waitQ additions from getServerSession() vs. waitQ clean-up by releaseAllWaitingThreads().
Avoid a thread-unsafe iterator in releaseAllWaitingThreads().
The remainder here is cosmetic in nature:
Unify confusing use of different variable names for identical purposes (resource vs res) in _getServerSession()
Add several debug-level log messages, clarify others, move some around to where there'll be more informative
Various convenience methods for internal use shouldn't be public
Permuted some convenience-method definitions to keep them closer to their (unique) call sites
com.sun.genericra.GenericJMSRA needn't be imported twice
Misc spelling fixes in log messages and commentary
Sanitize whitespace (extraneous spaces at line ends removed) and indentation
Update copyright line
com.sun.genericra.inbound.ActivationSpec
Default MaxWaitTime changed from 3s to 0 (meaning "infinite"). Rationale: This had in fact never before worked as intended. InboundJmsResourcePool$PauseObject had always miscomputed the remainingWaitTime so as to act as if MaxWaitTime was several years (or infinite). Changing the default to 0 preserves this legacy behaviour and won't stop developers from configuring their own preferred value. A default of 3s is way too short for the real world – the clock is usually ticking while one or more onMessage() calls are in progress, and these can easily take many seconds. (I've seen some take several hours when processing involved talking to a peer application which had been temporarily shut down for maintenance.)
Misc whitespace cosmetics
Update copyright line
@arinban Commented @arinban Commented @arinban Commented @glassfishrobot Commented gerhard_niklasch said: I see that the JIRA web interface has rather mangled the above context diff. Contact me offline via the dev mailing list for the literal version. I'll also be happy to supply a diff -ub (i.e. ignoring the whitespace-only changes) for ease of reviewing.
@arinban Commented @arinban Commented @arinban Commented @glassfishrobot Commented gerhard_niklasch said: See also #58 for one further issue in this area, filed separately since its fix is logically independent from the present issue.
@arinban Commented @arinban Commented @arinban Commented @glassfishrobot Commented gerhard_niklasch said: The putback for Bug#17973448 into trunk revision 237 delivers a fix for the primary race (somewhat different from my proposal, succesfully tested on both GF 2.1.1 and GF 3.1.2 with MQS7 as the JMS provider), and it also cleans up the formatting and improves the log messages.
It also changes the default MaxWaitTime from 3s to 300s.
Rev 237 still has a potential for a new caller of getServerSession() to race past older callers sitting in the wait queue, and starve the latter. However, this situation cannot arise with MQS: The MQS7 driver will never call getServerSession() to the same InboundJmsResourcePool from more than one dispatch thread simultaneously.
@arinban Commented @arinban Commented @arinban Commented @glassfishrobot Commented This issue was imported from java.net JIRA GENERICJMSRA-57
A race condition in com.sun.genericra.inbound.async.InboundJmsResourcePool.java can cause all inbound message processing on the queue associated with a particular pool instance to halt; processing on this queue will not resume until the affected application server is restarted.
MDBs configured with a MaxPoolSize of 1 are particularly vulnerable, as are queues receiving bursty traffic where messages sometimes arrive in considerable numbers faster than the MDB can process them. The onMessage() execution time also plays a role. The bug can manifest itself randomly within minutes, hours, or many days after startup. In clustered configurations, each server is separately and independently vulnerable to the race.
The race condition is triggered when the MQS driver, seeing a new message in the inbound queue, calls getServerSession() in one thread at the same time when a worker thread tries to release a ServerSession instance (= InboundJmsResource instance) back to the pool. In the problematic scenario, the resource is successfully marked as free, and connectionsInUse decremented, by the second thread; but the first thread, having found the pool exhausted a moment earlier, creates a PauseObject and puts itself on the pool's waitQ after the second thread has found the waitQ empty, and thus the first thread is never notified of the freed resource. This outcome can be verified in a heap dump; the only indication in the server log is that a "Notifying the thread" at level FINE is not logged.
I am going to attach a proposed fix. This also subsumes a fix for #56 (which had arisen at an earlier point of the same incident investigation).
Environment
Seen both with GlassFish 2.1.1-p17 through -p21, GenericJMSRA as bundled with this (= 2.0.1 from mid 2009), and with GlassFish 3.1.2.6, GenericJMSRA 2.1b, on Linux (SLES10SP2 and SLES11SP1) x86_64, and using the IBM MQS driver 7.0.1.6. By inspection, the bug has existed since inception of the code.
Affected Versions
[0.9, 1.0a, 1.0, v1.5, 1.5, v1.6, 1.6, 2.1a, 2.1b, current, 2.2a]