Open mayankbhagya opened 8 years ago
I've always been wondering how a shuffle stage would be able to handle all entities in memory. Could a key-only query help?
Thanks @ZiglioNZ A key only query might not work in my case since I need to emit a property of the entity that stores its type. Also, I don't think that could be the problem since the shuffle phase shouldn't go back and make queries to the datastore. It should ideally be working on the values emitted by the mapper only.
I see but the mapper are many instances and the shuffler just one, right? Therefore you can only fit so many entities in one single instance memory. Don't think it would help much because your entities are small but I know you can query for a single field: https://cloud.google.com/appengine/docs/python/datastore/projectionqueries
Yes, strangely shuffle is running only a single instance.
However, it should not have anything to do with entities
because the mapper emits a (string, integer) tuple.
Oh, that wasn't clear. Anyway, the point is the same: one single instance has to handle lots of things in memory. Not sure how to solve that but it's a problem I'm gonna face: I need to serialize a large number of entities in time order and I'm not sure whether the mapper could partition the query in a way that preserves the order, so a shuffler is probably needed. Hopefully I'll make some progress this week.
@ZiglioNZ What I don't understand is, why is this task not getting distributed on multiple instances? A shuffler has the same number of shards as the number of output files in the mapper. Since that number is 5000 in my case, I can see 5000 shards for the shuffler, on the mapreduce status page.
But still, only one instance is used to run the shuffle-hash phase. That is not very clear to me.
See my reply on StackOverflow http://stackoverflow.com/questions/37645138/appengine-mapreduce-fails-with-out-of-memory-during-shuffle-stage .
Arie | Ozarov | ozarov@google.com | 415-624-6429
On Mon, Jun 6, 2016 at 2:20 AM, Mayank Bhagya notifications@github.com wrote:
@ZiglioNZ https://github.com/ZiglioNZ What I don't understand is, why is this task not getting distributed on multiple instances? A shuffler has the same number of shards as the number of output files in the mapper. Since that number is 5000 in my case, I can see 5000 shards for the shuffler, on the mapreduce status page.
But still, only one instance is used to run the shuffle-hash phase. That is not very clear to me.
— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/GoogleCloudPlatform/appengine-mapreduce/issues/95#issuecomment-223908304, or mute the thread https://github.com/notifications/unsubscribe/AIgiW-t79vG9W1XyUWso_kuJqSXQohYSks5qI-ZKgaJpZM4IucPL .
Thanks, luckily I use Java then ;-)
I have ~50M entities stored in datastore. Each item can be of one type out of a total 7 types.
Next, I have a simple MapReduce job that counts the number of items of each type. The Mapper emits (type, 1). The reducer simply adds the number of 1s received for each type.
When I run this job with 5000 shards, the map-stage runs fine. It uses a total of 20 instances which is maximum possible based on my task-queue configuration.
However, the shuffle-hash stage makes use of only one instance and fails with an out-of-memory error. I am not able to understand why only one instance is being used for hashing and how can I fix this out-of-memory error in /mapreduce/kickoffjob_callback/15714734****.
I have tried writing a combiner but I never saw a combiner stage on the mapreduce status page or in the logs.