orientechnologies / orientdb

OrientDB is the most versatile DBMS supporting Graph, Document, Reactive, Full-Text and Geospatial models in one Multi-Model product. OrientDB can run distributed (Multi-Master), supports SQL, ACID Transactions, Full-Text indexing and Reactive Queries.
https://orientdb.dev
Apache License 2.0
4.73k stars 871 forks source link

OMemoryWatchDog issue : WeakHashMap used for listeners #1958

Closed sspi closed 10 years ago

sspi commented 10 years ago

OMemoryWatchDog has been updated after 1.3 version and use now (in develop branch) a WeakHashMap for referencing the registered OMemoryWatchDog.Listeners.

When JRE need memory, entries in this WeakHashMap are freed, then listeners are unregistered and they can't do the job anymore !!!

I suppose this update try to manage OMemoryWatchDog.removeListener(Listener) leaks, but it is not the right way.

Here is a test case :

        ODatabaseDocumentTx vDb = new ODatabaseDocumentTx("plocal:D:\\temp\\orientTests\\cacheL1");
        vDb.create();
        for (int i = 0; i < 10000000; i++) {
            try {Thread.sleep(25);} catch (InterruptedException e) {}
            ODocument vDoc = new ODocument();
            vDoc.field("test", new byte[100000]);
            vDoc.save();
            if (i % 10 == 0) {
                System.out.println("Records created:" + i + " cacheSize: " + vDb.getLevel1Cache().getSize());
            }
        }

Here is the log :

...
Records created:400 cacheSize: 1
Records created:410 cacheSize: 2
Records created:420 cacheSize: 1
Records created:430 cacheSize: 1
Exception in thread "main" com.orientechnologies.orient.core.exception.ODatabaseException: ...
Caused by: java.lang.OutOfMemoryError: Java heap space

Listeners are never called. In debug, I can see that the WeakHashMap is empty when the listeners should be called.

Here is a patch with IndentityHashMap instead of WeakHashMap :

diff --git a/core/src/main/java/com/orientechnologies/orient/core/memory/OMemoryWatchDog.java b/core/src/main/java/com/orientechnologies/orient/core/memory/OMemoryWatchDog.java
old mode 100755
new mode 100644
index 8cb8014..26df1fd
--- a/core/src/main/java/com/orientechnologies/orient/core/memory/OMemoryWatchDog.java
+++ b/core/src/main/java/com/orientechnologies/orient/core/memory/OMemoryWatchDog.java
@@ -21,10 +21,10 @@
 import java.lang.ref.ReferenceQueue;
 import java.lang.ref.SoftReference;
 import java.util.ArrayList;
+import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TimerTask;
-import java.util.WeakHashMap;

 import com.orientechnologies.common.io.OFileUtils;
 import com.orientechnologies.common.log.OLogManager;
@@ -38,231 +38,211 @@
  * be one instance of this object created, since the usage threshold can only be set to one number.
  */
 public class OMemoryWatchDog extends Thread {
-  private final Map<ListenerWrapper, Object> listeners    = new WeakHashMap<ListenerWrapper, Object>(128);
-  private static long                        lastGC       = 0;
-  private int                                alertTimes   = 0;
-  protected final ReferenceQueue<Object>     monitorQueue = new ReferenceQueue<Object>();
-  protected SoftReference<Object>            monitorRef   = new SoftReference<Object>(new Object(), monitorQueue);
+   private final Map<Listener, Object>         listeners           = new IdentityHashMap<Listener, Object>(128);

-  protected final MemoryUsage                heapMemory;
-  protected final MemoryUsage                nonheapMemory;
+   private static long                                         lastGC              = 0;

-  private long                               autoFreeCheckEveryMs;
-  private long                               autoFreeHeapThreshold;
+   private int                                                         alertTimes      = 0;

-  /**
-   * we want properties of both IdentityHashMap and WeakHashMap
-   */
-  private static class ListenerWrapper {
-    final Listener listener;
+   protected final ReferenceQueue<Object>  monitorQueue    = new ReferenceQueue<Object>();

-    private ListenerWrapper(Listener listener) {
-      this.listener = listener;
-    }
+   protected SoftReference<Object>                 monitorRef      = new SoftReference<Object>(new Object(), monitorQueue);

-    @Override
-    public boolean equals(final Object o) {
-      if (this == o)
-        return true;
-      if (o == null || getClass() != o.getClass())
-        return false;
-      final ListenerWrapper that = (ListenerWrapper) o;
-      return listener == that.listener;
-    }
+   protected MemoryUsage                                       heapMemory;

-    @Override
-    public int hashCode() {
-      return listener != null ? System.identityHashCode(listener) : 0;
-    }
-  }
+   protected final MemoryUsage                         nonheapMemory;

-  public static interface Listener {
-    /**
-     * Called when free memory is getting lower.
-     * 
-     * @param iFreeMemory
-     *          Current used memory
-     * @param iFreeMemoryPercentage
-     *          Max memory
-     */
-    public void lowMemory(long iFreeMemory, long iFreeMemoryPercentage);
-  }
+   private long                                                        autoFreeCheckEveryMs;

-  /**
-   * Create the memory watch dog with the default memory threshold.
-   * 
-   * @param iThreshold
-   */
-  public OMemoryWatchDog() {
-    super("OrientDB MemoryWatchDog");
+   private long                                                        autoFreeHeapThreshold;

-    final MemoryMXBean memBean = ManagementFactory.getMemoryMXBean();
-    heapMemory = memBean.getHeapMemoryUsage();
-    nonheapMemory = memBean.getNonHeapMemoryUsage();
+   public static interface Listener {
+       /**
+        * Called when free memory is getting lower.
+        * 
+        * @param iFreeMemory
+        *          Current used memory
+        * @param iFreeMemoryPercentage
+        *          Max memory
+        */
+       public void lowMemory(long iFreeMemory, long iFreeMemoryPercentage);
+   }

-    final String size = OGlobalConfiguration.MEMORY_AUTOFREE_HEAP_THRESHOLD.getValueAsString();
-    autoFreeHeapThreshold = OFileUtils.getSizeAsNumber(size);
+   /**
+    * Create the memory watch dog with the default memory threshold.
+    * 
+    * @param iThreshold
+    */
+   public OMemoryWatchDog() {
+       super("OrientDB MemoryWatchDog");

-    setDaemon(true);
-    start();
-  }
+       final MemoryMXBean memBean = ManagementFactory.getMemoryMXBean();
+       heapMemory = memBean.getHeapMemoryUsage();
+       nonheapMemory = memBean.getNonHeapMemoryUsage();

-  public void run() {
-    Orient
-        .instance()
-        .getProfiler()
-        .registerHookValue("system.memory.alerts", "Number of alerts received by JVM to free memory resources",
-            METRIC_TYPE.COUNTER, new OProfilerHookValue() {
-              public Object getValue() {
-                return alertTimes;
-              }
-            });
-    Orient
-        .instance()
-        .getProfiler()
-        .registerHookValue("system.memory.lastGC", "Date of last System.gc() invocation", METRIC_TYPE.STAT,
-            new OProfilerHookValue() {
-              public Object getValue() {
-                return lastGC;
-              }
-            });
+       final String size = OGlobalConfiguration.MEMORY_AUTOFREE_HEAP_THRESHOLD.getValueAsString();
+       autoFreeHeapThreshold = OFileUtils.getSizeAsNumber(size);

-    autoFreeCheckEveryMs = OGlobalConfiguration.MEMORY_AUTOFREE_CHECK_EVERY.getValueAsLong();
-    Orient.instance().getTimer().schedule(new TimerTask() {
+       setDaemon(true);
+       start();
+   }

-      @Override
-      public void run() {
-        // CHECK MEMORY
-        final long usedHeap = heapMemory.getUsed();
-        final long maxHeap = heapMemory.getMax();
-        final int usedMemoryPer = (int) (usedHeap * 100 / maxHeap);
+   @Override
+   public void run() {
+       Orient
+               .instance()
+               .getProfiler()
+               .registerHookValue("system.memory.alerts", "Number of alerts received by JVM to free memory resources",
+                       METRIC_TYPE.COUNTER, new OProfilerHookValue() {
+                           public Object getValue() {
+                               return alertTimes;
+                           }
+                       });
+       Orient
+               .instance()
+               .getProfiler()
+               .registerHookValue("system.memory.lastGC", "Date of last System.gc() invocation", METRIC_TYPE.STAT,
+                       new OProfilerHookValue() {
+                           public Object getValue() {
+                               return lastGC;
+                           }
+                       });

-        if (OLogManager.instance().isDebugEnabled())
-          OLogManager.instance().debug(this, "Checking if memory is lower than configured (%s): used %s of %s (%d%%)",
-              OFileUtils.getSizeAsString(autoFreeHeapThreshold), OFileUtils.getSizeAsString(usedHeap),
-              OFileUtils.getSizeAsString(maxHeap), usedMemoryPer);
+       autoFreeCheckEveryMs = OGlobalConfiguration.MEMORY_AUTOFREE_CHECK_EVERY.getValueAsLong();
+       Orient.instance().getTimer().schedule(new TimerTask() {

-        if (!isMemoryAvailable())
-          // CALL LISTENER TO FREE MEMORY
-          synchronized (listeners) {
-            for (ListenerWrapper listener : listeners.keySet()) {
-              try {
-                listener.listener.lowMemory(maxHeap - usedHeap, 100 - usedMemoryPer);
-              } catch (Exception e) {
-                e.printStackTrace();
-              }
-            }
-          }
-      }
-    }, autoFreeCheckEveryMs, autoFreeCheckEveryMs);
+           @Override
+           public void run() {
+               // CHECK MEMORY
+               final long usedHeap = heapMemory.getUsed();
+               final long maxHeap = heapMemory.getMax();
+               final int usedMemoryPer = (int) (usedHeap * 100 / maxHeap);

-    while (true) {
-      try {
-        // WAITS FOR THE GC FREE
-        monitorQueue.remove();
+               if (OLogManager.instance().isDebugEnabled())
+                   OLogManager.instance().debug(this, "Checking if memory is lower than configured (%s): used %s of %s (%d%%)",
+                           OFileUtils.getSizeAsString(autoFreeHeapThreshold), OFileUtils.getSizeAsString(usedHeap),
+                           OFileUtils.getSizeAsString(maxHeap), usedMemoryPer);

-        if (Thread.interrupted())
-          break;
+               if (!isMemoryAvailable())
+                   // CALL LISTENER TO FREE MEMORY
+                   synchronized (listeners) {
+                       for (Listener listener : listeners.keySet()) {
+                           try {
+                               listener.lowMemory(maxHeap - usedHeap, 100 - usedMemoryPer);
+                           } catch (Exception e) {
+                               e.printStackTrace();
+                           }
+                       }
+                   }
+           }
+       }, autoFreeCheckEveryMs, autoFreeCheckEveryMs);

-        // GC is freeing memory!
-        alertTimes++;
-        final long usedHeap = heapMemory.getUsed();
-        final long maxHeap = heapMemory.getMax();
-        final int usedMemoryPer = (int) (usedHeap * 100 / maxHeap);
+       while (true) {
+           try {
+               // WAITS FOR THE GC FREE
+               monitorQueue.remove();

-        if (OLogManager.instance().isDebugEnabled())
-          OLogManager.instance().debug(this, "Free memory is low %s of %s (%d%%), calling listeners to free memory...",
-              OFileUtils.getSizeAsString(maxHeap - usedHeap), OFileUtils.getSizeAsString(maxHeap), 100 - usedMemoryPer);
+               if (Thread.interrupted())
+                   break;

-        final long timer = Orient.instance().getProfiler().startChrono();
+               // GC is freeing memory!
+               alertTimes++;
+               final long usedHeap = heapMemory.getUsed();
+               final long maxHeap = heapMemory.getMax();
+               final int usedMemoryPer = (int) (usedHeap * 100 / maxHeap);

-        synchronized (listeners) {
-          for (ListenerWrapper listener : listeners.keySet()) {
-            try {
-              listener.listener.lowMemory(maxHeap - usedHeap, 100 - usedMemoryPer);
-            } catch (Exception e) {
-              e.printStackTrace();
-            }
-          }
-        }
+               if (OLogManager.instance().isDebugEnabled())
+                   OLogManager.instance().debug(this, "Free memory is low %s of %s (%d%%), calling listeners to free memory...",
+                           OFileUtils.getSizeAsString(maxHeap - usedHeap), OFileUtils.getSizeAsString(maxHeap), 100 - usedMemoryPer);

-        Orient.instance().getProfiler().stopChrono("OMemoryWatchDog.freeResources", "WatchDog free resources", timer);
+               final long timer = Orient.instance().getProfiler().startChrono();

-      } catch (InterruptedException e) {
-        break;
-      } catch (Exception e) {
-      } finally {
-        // RE-INSTANTIATE THE MONITOR REF
-        monitorRef = new SoftReference<Object>(new Object(), monitorQueue);
-      }
-    }
+               synchronized (listeners) {
+                   for (Listener listener : listeners.keySet()) {
+                       try {
+                           listener.lowMemory(maxHeap - usedHeap, 100 - usedMemoryPer);
+                       } catch (Exception e) {
+                           e.printStackTrace();
+                       }
+                   }
+               }

-    OLogManager.instance().debug(this, "[OMemoryWatchDog] shutdowning...");
+               Orient.instance().getProfiler().stopChrono("OMemoryWatchDog.freeResources", "WatchDog free resources", timer);

-    synchronized (listeners) {
-      listeners.clear();
-    }
-    monitorRef = null;
-  }
+           } catch (InterruptedException e) {
+               break;
+           } catch (Exception e) {
+           } finally {
+               // RE-INSTANTIATE THE MONITOR REF
+               monitorRef = new SoftReference<Object>(new Object(), monitorQueue);
+           }
+       }

-  public boolean isMemoryAvailable() {
-    if (autoFreeHeapThreshold > 0)
-      return getUsedHeapMemory() < autoFreeHeapThreshold;
-    return getUsedHeapMemoryInPercentage() < autoFreeHeapThreshold * -1;
-  }
+       OLogManager.instance().debug(this, "[OMemoryWatchDog] shutdowning...");

-  public Listener addListener(final Listener listener) {
-    synchronized (listeners) {
-      listeners.put(new ListenerWrapper(listener), listener);
-    }
-    return listener;
-  }
+       synchronized (listeners) {
+           listeners.clear();
+       }
+       monitorRef = null;
+   }

-  public boolean removeListener(final Listener listener) {
-    synchronized (listeners) {
-      return listeners.remove(new ListenerWrapper(listener)) != null;
-    }
-  }
+   public boolean isMemoryAvailable() {
+       if (autoFreeHeapThreshold > 0)
+           return getUsedHeapMemory() < autoFreeHeapThreshold;
+       return getUsedHeapMemoryInPercentage() < autoFreeHeapThreshold * -1;
+   }

-  public List<Listener> getListeners() {
-    synchronized (listeners) {
-      List<Listener> listenerList = new ArrayList<Listener>();
-      for (ListenerWrapper wrapper : listeners.keySet()) {
-        listenerList.add(wrapper.listener);
-      }
-      return listenerList;
-    }
-  }
+   public Listener addListener(final Listener listener) {
+       synchronized (listeners) {
+           listeners.put(listener, listener);
+       }
+       return listener;
+   }

-  public static void freeMemoryForOptimization(final long iDelayTime) {
-    freeMemory(iDelayTime, OGlobalConfiguration.JVM_GC_DELAY_FOR_OPTIMIZE.getValueAsLong());
-  }
+   public boolean removeListener(final Listener listener) {
+       synchronized (listeners) {
+           return listeners.remove(listener) != null;
+       }
+   }

-  public static void freeMemoryForResourceCleanup(final long iDelayTime) {
-    freeMemory(iDelayTime, 0);
-  }
+   public List<Listener> getListeners() {
+       synchronized (listeners) {
+           return new ArrayList<Listener>(listeners.keySet());
+       }
+   }

-  public long getUsedHeapMemory() {
-    return heapMemory.getUsed();
-  }
+   public static void freeMemoryForOptimization(final long iDelayTime) {
+       freeMemory(iDelayTime, OGlobalConfiguration.JVM_GC_DELAY_FOR_OPTIMIZE.getValueAsLong());
+   }

-  public int getUsedHeapMemoryInPercentage() {
-    return (int) (heapMemory.getUsed() * 100 / heapMemory.getMax());
-  }
+   public static void freeMemoryForResourceCleanup(final long iDelayTime) {
+       freeMemory(iDelayTime, 0);
+   }

-  private static void freeMemory(final long iDelayTime, final long minimalTimeAmount) {
-    final long dateLastGC = System.currentTimeMillis();
-    if (dateLastGC - lastGC > minimalTimeAmount * 1000) {
-      lastGC = dateLastGC;
-      System.gc();
+   public long getUsedHeapMemory() {
+       return heapMemory.getUsed();
+   }

-      if (iDelayTime > 0)
-        try {
-          Thread.sleep(iDelayTime);
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-        }
-    }
-  }
+   public int getUsedHeapMemoryInPercentage() {
+       // final MemoryMXBean memBean = ManagementFactory.getMemoryMXBean();
+       // heapMemory = memBean.getHeapMemoryUsage();
+       // System.out.println(heapMemory.getUsed());
+       return (int) (heapMemory.getUsed() * 100 / heapMemory.getMax());
+   }
+
+   private static void freeMemory(final long iDelayTime, final long minimalTimeAmount) {
+       final long dateLastGC = System.currentTimeMillis();
+       if (dateLastGC - lastGC > minimalTimeAmount * 1000) {
+           lastGC = dateLastGC;
+           System.gc();
+
+           if (iDelayTime > 0)
+               try {
+                   Thread.sleep(iDelayTime);
+               } catch (InterruptedException e) {
+                   Thread.currentThread().interrupt();
+               }
+       }
+   }

 }

Here is a partial log with this patch (listeners are now called) :

...
Records created:370 cacheSize: 4
Records created:380 cacheSize: 4
com.orientechnologies.orient.core.cache.ODefaultCache$OLowMemoryListener@16953ad9
janv. 13, 2014 10:44:43 PM com.orientechnologies.common.log.OLogManager log
Avertissement: Low heap memory (96%): clearing 390 cached records
Records created:390 cacheSize: 4
Records created:400 cacheSize: 7
Records created:410 cacheSize: 4
Records created:420 cacheSize: 1
Records created:430 cacheSize: 5
Records created:440 cacheSize: 4
...

Note : the percentage in the log (96%) is wrong, see next issue...

Note 2 : I use _base/ide/eclipse-formatter.xml but seems not up to date. Can you commit the new one ?

Sylvain

lvca commented 10 years ago

Please could you send us a Pull Request for all these patches? Have you tried the test cases before to send?

ant clean test mvn clean install