forcedotcom / phoenix

BSD 3-Clause "New" or "Revised" License
558 stars 227 forks source link

Join memory issue #622

Closed mujtabachohan closed 10 years ago

mujtabachohan commented 10 years ago

Join over 1M/100K works fine for first time but then I get the following exception. At this point only join query with 1M/10K works, even join query over 1M/50K fails. If I wait for a few mins/couple of GC cycles. I can again successfully run the join query over 1M/100K tables.

com.salesforce.phoenix.memory.InsufficientMemoryException: Requested memory of 22298853 bytes could not be allocated from remaining memory of 52261345 bytes from global pool of 73703424 bytes after waiting for 10000ms. at com.salesforce.phoenix.memory.GlobalMemoryManager.allocateBytes(GlobalMemoryManager.java:91) at com.salesforce.phoenix.memory.GlobalMemoryManager.access$300(GlobalMemoryManager.java:42) at com.salesforce.phoenix.memory.GlobalMemoryManager$GlobalMemoryChunk.resize(GlobalMemoryManager.java:152) at com.salesforce.phoenix.join.HashCacheFactory$HashCacheImpl.(HashCacheFactory.java:101) at com.salesforce.phoenix.join.HashCacheFactory$HashCacheImpl.(HashCacheFactory.java:78) at com.salesforce.phoenix.join.HashCacheFactory.newCache(HashCacheFactory.java:71) at com.salesforce.phoenix.cache.TenantCacheImpl.addServerCache(TenantCacheImpl.java:95) at com.salesforce.phoenix.coprocessor.ServerCachingEndpointImpl.addServerCache(ServerCachingEndpointImpl.java:55) at sun.reflect.GeneratedMethodAccessor29.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.hadoop.hbase.regionserver.HRegion.exec(HRegion.java:5634) at org.apache.hadoop.hbase.regionserver.HRegionServer.execCoprocessor(HRegionServer.java:3924) at sun.reflect.GeneratedMethodAccessor26.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.hadoop.hbase.ipc.WritableRpcEngine$Server.call(WritableRpcEngine.java:323) at org.apache.hadoop.hbase.ipc.HBaseServer$Handler.run(HBaseServer.java:1426)

DDL: CREATE TABLE T (mypk CHAR(10) NOT NULL PRIMARY KEY,CF.column1 char(10),CF.column2 char(10),CF.column3 char(10));

Query: select count(*) from table1M JOIN table100K on table100K.mypk = table.column1;

jtaylor-sfdc commented 10 years ago

@maryannxue - once the HashCache is removed by the client, the memory should be available again. This isn't related to GC, as we're just tracking coarse grained memory usage ourselves. Take a look at TenantCacheImpl - the addServerCache method "allocates" the memory by adding the cache size, while the removeServerCache invalidates the cache and in theory should call the onRemoval method of our cache, which in turns calls the close method and in theory adds the amount of memory that was previously allocated back to the memory manager.

We allow the memory tracked by our MemoryManager to grow up to a percentage of the heap size (Runtime.getRuntime().totalMemory()) as determined by QueryServices.MAX_MEMORY_PERC_ATTRIB. If necessary, we could support a fixed size upper bound too if that makes testing easier.

To override any config properties in a unit test, create a static method with a @BeforeClass annotation like this:

@BeforeClass 
public static void doSetup() throws Exception {
    Map<String,String> props = Maps.newHashMapWithExpectedSize(1);
    // Forces server cache to be used
    props.put(QueryServices.INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB, Integer.toString(2));
    // Must update config before starting server
    startServer(getUrl(), new ReadOnlyProps(props.entrySet().iterator()));
}
maryannxue commented 10 years ago

Understood. Let me fix it in my next check-in.

maryannxue commented 10 years ago

@mujtabachohan @jtaylor-sfdc After a closer look at the code, it seems to me that the phenomenon should be expected. The TenantCacheImpl sets the max_time_to_live for a server cache to 30000 (30 sec) as default, and it has a removal listener which calls the close() method of the server cache. Here, the server cache implementation "HashCacheImpl" calls the MemoryChunk's close() method which releases the allocated memory to the MemoryManager. So I suppose 30 seconds after finishing the first query, the old caches will be invalidated and will give way to the new hash cache. There is actually a removeServerCache() method in ServerCacheClient, and this can be invoked through the close() method of the ServerCache object. But since there is no explicit close() methods in either the QueryPlan interface or the Scanner interface, there is currently no way to know the right point of time when the client can remove the server cache. IMHO, I'm thinking to set a much shorter "max_time_to_live" value, like a few seconds, since in most cases, requests for different regions can always catch up with one another very quickly. And also, we could probably add a close() method in the QueryPlan interface or the Scanner interface, so that the user can release any resource explicitly by calling the connection/statement/resultset close() method.

jtaylor-sfdc commented 10 years ago

@maryannxue Thanks for the investigation. The ResultIterator contained by the Scanner has a close() method. This should get called by the client when the ResultSet, Statement, or Connection are closed. This in turn should call the removeServerCache() through the ServerCacheClient to free up the memory. Are you not seeing this happen?

@mujtabachohan - can you make sure you do a resultSet.close() and/or a statement.close() at the end of iteration. I'll check too that we call close automatically once the underlying scanner has processed the last row.

mujtabachohan commented 10 years ago

@jtaylor-sfdc Happens with resultSet.close() and also in sqlline.

maryannxue commented 10 years ago

@jtaylor-sfdc Yes, but thought I had to add that close logic for every type of ResultIterator. Looked at it again though, seems the final and only place where a join-related scan happens is in the ScanningResultIterator, so only this class's close() method should handle the remove server cache logic, am I right?

jtaylor-sfdc commented 10 years ago

@maryannxue I don't think issuing the removeServerCache() will work from ScanningResultIterator, as this is one of the parallel scans that have been farmed out. Instead, you need to call removeServerCache() when the merge or concat iterators are done with their parallel iterators.

To do this, here's what I'd do:

I think that should do it.

maryannxue commented 10 years ago

@jtaylor-sfdc Thanks a lot for the explanation!

maryannxue commented 10 years ago

@jtaylor-sfdc In the original BasicQueryPlan#getScanner(), there is if (scanner != null) { return scanner; } Now that we will make scanner hold some resource (server caches) objects which will be released on being closed(), I assume that the scanner should strictly have the same lifecycle as its containing ResultIterators and the resource objects. So is it correct that I should remove this piece of code from there? and should we also avoid re-using the Scanner object in QueryPlan#getExplainPlan() even if it is supposed to be connectionless?

jtaylor-sfdc commented 10 years ago

@maryannxue We wouldn't want to create/send the hash cache when generating an explain plan. I think it'd be best if we did a bit of refactoring here. Here's a proposal, please let me know if you think this'll work:

Another alternative would be to keep Scanner, do any cache building/sending in Scanner.iterator(), and have your HashJoinPlan derive from a common base class that defines the newScanner() method (so that you can create a JoinScanner that specializes the iterator() method to initiate the hash caching). We don't need to have a newScanner(ConnectionQueryServices services) method, as you can get the PhoenixConnection from the context.

maryannxue commented 10 years ago

@jtaylor-sfdc I'm trying to get myself a clearer statement of problem, so correct me if I'm wrong.

If we look at what QueryPlan#getScanner() and QueryPlan#getExplainPlan() do:

  1. They both need to initiate a scanner, which is in fact a series of iterators.
  2. getScanner() needs to create a "runnable" iterator, while getExplainPlan() just needs a "connectionless" iterator.

But suppose we call getExplainPlan() first and then getScanner(). Thus the iterators would have been created without having the cache objects passed to them, and then on getScanner() being called, the iterators would not be created for a second time.

So I am wondering if we can add a method like "addServerCaches()" or more generally "addCloseableResouces()" into the Iterator interface or to those iterators concerned, so that on calling the getScanner() method, we can pass on the server cache objects even if the iterator objects have been created.

jtaylor-sfdc commented 10 years ago

Top level goal is to call removeServerCache so that resources are freed up and we can run multiple joins at the same time. This refactor seems like the easiest way to get there. What you've said above is true, but don't think we need the Scanner interface at all, as it's just getting in the way at this point. The getExplainPlan() can just be defined at QueryPlan. It's fine if the iterators are created again when the query is actually executed.

maryannxue commented 10 years ago

Was trying not to break the original assumptions designed with the QueryPlan interfaces. But it looks to me like the QueryPlan is never re-used across different calls in PhoenixStatement so we can always create new instances of iterators in QueryPlan. That would make things easier whether we keep the Scanner there or not. I'll try doing some refactoring there, though.

jtaylor-sfdc commented 10 years ago

Fixed by @maryannxue. Excellent job! Thanks for the contributions. @mujtabachohan - would you mind trying your test again please?