fingltd / 4mc

4mc - splittable lz4 and zstd in hadoop/spark/flink
Other
108 stars 36 forks source link

physical memory issues on EMR #18

Open refaelos opened 7 years ago

refaelos commented 7 years ago

@carlomedas maybe you can help me here ...

When I'm running my EMR mapreduce and want the output compressed with 4mz I get multiple errors like: ... is running beyond physical memory limits. Current usage: 4.6 GB of 4 GB physical memory used; 6.3 GB of 20 GB virtual memory used. Killing container. ...

I tried increasing the memory limit and I still get these errors (only later on the reducer processing).

Do you have an idea why the moment I started compressing with 4mz I started getting these errors? (When I compressed with lzo or bz I wasn't getting it)

Thanks!

carlomedas commented 7 years ago

Hello.

I'm not aware of any memory leak in current 4mc. Where are you using it, as input or output? Are you doing just Maps or also reduces and it's final output format?

To be honest I never tried it as output of reduce as we've been using it directly in spark as final output format, and it's ages faster than standard map/reduce, and btw it's not leaking even there.

Carlo

refaelos commented 7 years ago

Thanks!

It happens when I'm running on larger amounts of data (still not large enough... just a few 10s of GBs).

I'm using 4mz (not 4mc) as the compression of the final output. I basically stream files through one end and compressing them out to another place on the other end.

If you have any suggestions it'll be great!

refaelos commented 7 years ago

@carlomedas still having this issue. Can you suggest a solution?

It happens even when running with smaller amounts of data (a few hundred MBs). Also, I tried using BZip and it worked. Just when switching to 4mz it fails.

Note: we use this codec as the output format: com.hadoop.compression.fourmc.FourMzMediumCodec

carlomedas commented 7 years ago

does it fail only with 4mz or also 4mc? we have been using both with quite huge files (several GB each) on small and medium size cluster. The only difference is that I've been using it on an Hadoop/Spark cluster on AWS, but I never tried them on EMR. Unfortunately I do not have time now to compare manual hadoop/Spark with EMR, but it's in our plans for the future.

refaelos commented 7 years ago

@carlomedas thanks!

We only tried 4mz because we needed a splittable compression that is better than Snappy.

Can you offer a different compression until it'll be ready? Maybe something that comes natively with Hadoop? From what you say I understand that for now 4mz is not usable for us.

refaelos commented 7 years ago

@carlomedas also tested FourMcMediumCodec and it's still failing :(

carlomedas commented 7 years ago

I guess it's not working properly with EMR. What version of hadoop do you have there and can you better describe how your job is composed? If it's spark, please describe the spark DAG. Maybe we can guess what's wrong.

refaelos commented 7 years ago

@carlomedas

Hadoop distribution: Amazon 2.7.3 (probably based on Hadoop 2.7.3)

These is the step configuration:

-libjars /home/hadoop/CustomOutputFormats.jar,/home/hadoop/hadoop-4mc-2.0.0.jar -D dfs.replication=1 -D stream.map.output.field.separator='\t' -D mapreduce.reduce.shuffle.input.buffer.percent=0.5 -D mapreduce.map.memory.mb=4096 -D mapreduce.reduce.memory.mb=4096 -D mapreduce.map.java.opts=-Xmx3277m -D mapreduce.reduce.java.opts=-Xmx3277m -D mapreduce.map.output.compress=true -D mapreduce.output.fileoutputformat.compress=true -D mapreduce.output.fileoutputformat.compress.codec=com.hadoop.compression.fourmc.FourMzMediumCodec -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator -D mapreduce.partition.keycomparator.options=-r -outputformat com.soomla.hadoop.MultipleTextOutputFormatByKey -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner

It's not spark ... it's a only hadoop streaming on EMR.

It must be something with the compression b/c the moment I change back to BZip codec it works fine. Of course I don't want to use BZip (VERY slow) and I can't find any other good compression codec that supports streaming.

Thank you very much for your help. I really want it to work b/c it'll be a pain for us to change compression again.

carlomedas commented 7 years ago

As far as I understood you are using it to store output of reduce, each reduce is writing to a 4mz/4mz file. Each file only uses 4+4MB of direct memory buffers to handle buffering and compression, no more than that. To make it even more performant and to control better the direct memory buffer, in previous releases I introduced direct memory buffer pooling, to make sure even if you create a huge number of files, it's not exhausting those special buffers.

Is it failing in standard OOM or in direct memory buffer related OOM? Please try again by adding this setting to Java VM: -XX:MaxDirectMemorySize=256M

refaelos commented 7 years ago

Thanks for the info!

Do you mean setting it for mapper or reducer? I see 2 options:

mapreduce.map.java.opts=...
mapreduce.reduce.java.opts=...
carlomedas commented 7 years ago

Try both please, they will not harm. But if you are compressing with 4mz only on final reduce stage, it could stay only on reducer.

refaelos commented 7 years ago

I tried setting it on both like this:

 -D mapreduce.reducer.java.opts='-Xmx3277m -XX:MaxDirectMemorySize=256m'

But I'm getting an error: Unrecognized option: -XX:MaxDirectMemorySize=256m'

I'm trying to figure out how it should be done in hadoop streaming.

refaelos commented 7 years ago

I tried this -D mapred.child.java.opts=-XX:MaxDirectMemorySize=256m and still getting the physical memory error.

I don't understand why it doesn't start using virtual memory once physical memory is "filled up".

carlomedas commented 7 years ago

can you please paste the full exception?

refaelos commented 7 years ago

Container [pid=19884,containerID=container_1482229402435_0001_01_000035] is running beyond physical memory limits. Current usage: 4.0 GB of 4 GB physical memory used; 6.4 GB of 20 GB virtual memory used. Killing container. Dump of the process-tree for container_1482229402435_0001_01_000035 : |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE |- 19892 19884 19884 19884 (java) 1551 216 6754443264 1052796 /usr/lib/jvm/java-openjdk/bin/java -Djava.net.preferIPv4Stack=true -Dhadoop.metrics.log.level=WARN -Xmx3277m -Djava.io.tmpdir=/mnt/yarn/usercache/hadoop/appcache/application_1482229402435_0001/container_1482229402435_0001_01_000035/tmp -Dlog4j.configuration=container-log4j.properties -Dyarn.app.container.log.dir=/var/log/hadoop-yarn/containers/application_1482229402435_0001/container_1482229402435_0001_01_000035 -Dyarn.app.container.log.filesize=0 -Dhadoop.root.logger=INFO,CLA -Dhadoop.root.logfile=syslog -Dyarn.app.mapreduce.shuffle.logger=INFO,shuffleCLA -Dyarn.app.mapreduce.shuffle.logfile=syslog.shuffle -Dyarn.app.mapreduce.shuffle.log.filesize=0 -Dyarn.app.mapreduce.shuffle.log.backups=0 org.apache.hadoop.mapred.YarnChild 10.0.4.82 42967 attempt_1482229402435_0001_r_000001_0 35 |- 19955 19892 19884 19884 (split_raw_reduc) 460 70 19734528 3565 /mnt/yarn/usercache/hadoop/appcache/application_1482229402435_0001/container_1482229402435_0001_01_000035/./split_raw_reducer |- 19960 19955 19884 19884 (split_raw_reduc) 0 0 12095488 1824 /mnt/yarn/usercache/hadoop/filecache/13/split_raw_reducer |- 19884 19882 19884 19884 (bash) 0 0 115806208 673 /bin/bash -c /usr/lib/jvm/java-openjdk/bin/java -Djava.net.preferIPv4Stack=true -Dhadoop.metrics.log.level=WARN -Xmx3277m -Djava.io.tmpdir=/mnt/yarn/usercache/hadoop/appcache/application_1482229402435_0001/container_1482229402435_0001_01_000035/tmp -Dlog4j.configuration=container-log4j.properties -Dyarn.app.container.log.dir=/var/log/hadoop-yarn/containers/application_1482229402435_0001/container_1482229402435_0001_01_000035 -Dyarn.app.container.log.filesize=0 -Dhadoop.root.logger=INFO,CLA -Dhadoop.root.logfile=syslog -Dyarn.app.mapreduce.shuffle.logger=INFO,shuffleCLA -Dyarn.app.mapreduce.shuffle.logfile=syslog.shuffle -Dyarn.app.mapreduce.shuffle.log.filesize=0 -Dyarn.app.mapreduce.shuffle.log.backups=0 org.apache.hadoop.mapred.YarnChild 10.0.4.82 42967 attempt_1482229402435_0001_r_000001_0 35 1>/var/log/hadoop-yarn/containers/application_1482229402435_0001/container_1482229402435_0001_01_000035/stdout 2>/var/log/hadoop-yarn/containers/application_1482229402435_0001/container_1482229402435_0001_01_000035/stderr Container killed on request. Exit code is 143 Container exited with a non-zero exit code 143

refaelos commented 7 years ago

Ok so I managed to run with -XX:MaxDirectMemorySize=256m by setting only this argument like this: -D mapreduce.reducer.java.opts=-XX:MaxDirectMemorySize=256m (It dosn't have the -Xmx3277m like before).

This fails the reducer immediately with: image

carlomedas commented 7 years ago

To be honest I don't know how to help you on this as I'm not familiar with hadoop streaming PipeMapRed and don't know how the EMR is tuned on AWS. Seems like PipeMapRed is using direct memory buffers as well and needs much more than 256MB.

While the error you reported above in previous comment is from YARN that is automatically killing the container since it exceeded the max allowed memory. Did you try already to increase it, just to see if it helps? I'm talking about the setting of 4 GB physical. Please note anyways that 4mc codecs just use above mentioned 4MB+4MB of direct memory buffers plus some little overhead but that's it. It runs with much smaller tunings, but again, on different hadoop(+spark) 2.6.0 and not on EMR. Apart from this test, I'll try and find the time to test it myself on EMR.

refaelos commented 7 years ago

@carlomedas I did try to increase it as much as possible (hadoop allows up to ~5gb on a c4.xlarge instance) but still it fails.

Something interesting is that when I run it with mapreduce.reduce.java.opts=-Xmx3277m it fails with the YARN error but without it I'm getting the 'direct memory' error. It's not even related to the MaxDirectMemorySize setting.

Maybe the only solution is really to start using larges instances that have more memory ... but it also costs more.

refaelos commented 7 years ago

What do you mean by 4MB+4MB of direct memory ? Is it per chunk or something?

carlomedas commented 7 years ago

It buffers chunks up to 4 MB and then compresses and writes.

refaelos commented 7 years ago

So it's really weird that it fails ... Either there's a memory leak or something with my config is wrong. Why would it succeed with BZ2 and fail with 4mc ?

refaelos commented 7 years ago

@carlomedas we have a theory ... does it keep 4MB per open file that the reducer needs to compress? I mean, our out contains of a huge amount of small files (1000s of ~1MB files). If it keeps the 4MB saved until the process of all reducers finishes, this might overflow the memory.

carlomedas commented 7 years ago

Yes exactly, and I agree with you: that's the reason. 4mc/4mz was designed to be very powerful with huge files, both in input but also in output. Maybe I'm not understanding your use-case because it's pretty unusual that you have so huge number of reducers.

refaelos commented 7 years ago

@carlomedas No I don't have a lot of reducers .. just a lot of files are created from the 7 reducers that I have. I mean, the output consists of 1000s of files.

refaelos commented 7 years ago

@carlomedas I think we've figured out the problem and it looks like it is related to the output consisting of A LOT of files.

You said that you keep 4MB of memory that you compress and print. The thing is that when we want to output ~4000 files that are all less than 4MB then it keeps 4MB*4000 in memory which is about 16GB of memory and it crashes the job.

About the ~4000 files: We use a custom output format that separates the data according to different params in the input. Sometimes it causes the output to consist of a lot of very small files which is fine with us b/c we need it split into curtain folders.

Do you have an idea how this can be solved? Maybe we can keep 1MB instead of 4MB? Will it cause a performance issue?

carlomedas commented 7 years ago

I see now. It was not designed to work with such use cases, so I don't think it's easily fixable for this non usual big-data use case. As matter of fact I removed the possibility configure it in https://github.com/carlomedas/4mc/blob/master/java/hadoop-4mc/src/main/java/com/hadoop/compression/fourmc/FourMzCodec.java because it was causing problems, beyond the fact that it was strongly decreasing the compression ratio achieved, so it's fixed at 4 MB right now.

refaelos commented 7 years ago

@carlomedas :(

Do you have a suggested compression method that is splittable and can work for this use case?

carlomedas commented 7 years ago

A contributor, @advancedxy , added compression/decompression for standard zstd (i.e. no 4mc). You can find them here: https://github.com/carlomedas/4mc/tree/master/java/hadoop-4mc/src/main/java/com/hadoop/compression/fourmc/zstd

As far as I remember that is just 128KB and not 4MB.

refaelos commented 7 years ago

Thanks.

What's the difference with/without mc? Is it not splittable now?

advancedxy commented 7 years ago

ZstCodec is not splittable and uses each ~128KB for raw input buffer and compressed buffer.

@refaelos When you are generating so many small files, there is no point to keep them splittable. You need to combine them rather than splitting.

advancedxy commented 7 years ago

As for the 4MB compress buffer, it should be possible to make it configurable with some additional work. But I don't have the spare time now and I don't think your case justify it.

However, the 4MB buffer makes it unpractical to use it in the mapreduce.map.output.compress, as the reduce task may have a lot of copier threads(80 for example) to fetch map output. That's 80 * (4+4) = 640MB additional memory overhead. But that's when ZstCodec comes in handy.

carlomedas commented 7 years ago

Yes, as matter of fact I did remove the configurability of the buffer because of some issue we had on the native part, that unfortunately I did not have time to follow up and fix. Thanks for the note on the reducer, I never thought about that...!

refaelos commented 7 years ago

@advancedxy thanks!

The reason why we keep so many files is that we separate files per day and category. We need this to be able to have an s3 storage that we can invoke some processes on top of. The processes we invoke are running on a combination of that day+category.

The thing is that not all files are small... Some might get to 1gb or more (compressed). I think we want that splittable... Don't you?

advancedxy commented 7 years ago

After reviewing the source code of MultipleTextOutputFormat, one possible solution is to increase the number of reducers, then one reducer can process a smaller number of files and run faster

refaelos commented 7 years ago

@advancedxy yeah but still ... there will be less mappers so it'll be slow.

advancedxy commented 7 years ago

What do you mean by less mappers?

The total number of mappers should be same for same input. The concurrent number of mappers should be determined by your settings and bound to your available resources.

The number of reducers shouldn't affect that. The only overhead with more reducers is containers and JVMs startup, and that should be small.

On Fri, 23 Dec 2016 at 2:47 AM Refael Dakar notifications@github.com wrote:

@advancedxy https://github.com/advancedxy yeah but still ... there will be less mappers so it'll be slow.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/carlomedas/4mc/issues/18#issuecomment-268864280, or mute the thread https://github.com/notifications/unsubscribe-auth/AAxScSvfqGfnUb_zNzTiCgtk1kNKdHAwks5rKsW4gaJpZM4K4Fe8 .

refaelos commented 7 years ago

@advancedxy I mean that if there's a big file and it's not splittable then the entire file will be handled by one mapper. I hope I'm not wrong on this one ... That's the whole idea of splitting ... isn't it?

advancedxy commented 7 years ago

Increase the number of reducers to avoid oom while still using FourMzMediumCodec. On Fri, 23 Dec 2016 at 5:34 PM Refael Dakar notifications@github.com wrote:

@advancedxy https://github.com/advancedxy I mean that if there's a big file and it's not splittable then the entire file will be handled by one mapper. I hope I'm not wrong on this one ...

That's the whole idea of splitting ... isn't it?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/carlomedas/4mc/issues/18#issuecomment-268963231, or mute the thread https://github.com/notifications/unsubscribe-auth/AAxScYmwrb3qcjlmZbbcc2LDSm4w_PZrks5rK5WUgaJpZM4K4Fe8 .

pradeepg26 commented 7 years ago

@refaelos @advancedxy @carlomedas I may be a little late to the party, this is can be solved another way by setting the yarn memory overhead to be higher.

The container itself has 4GB of memory. And JVM has 3277MB of heap. That only leaves 819MB of overhead. In CDH distro, the default overhead is 20%. (0.8 * 4096 = 3277). So you're eating up most of the overhead with your direct buffers and there isn't enough additional overhead for the other off-heap usages. I suspect that if you leave the heap as is and set mapreduce.reduce.memory.mb=5120, your job will succeed.

refaelos commented 7 years ago

@advancedxy getting back to this one. So you're saying increase number of reducers without increasing number of nodes or memory configurations?

On MR2 - increasing number of reducers means decreasing memory limits with mapreduce.reduce.memory.mb config. Is that what you mean? (I'm pretty sure doing that will still make the process fail as there's even less memory now).

DataWanderer commented 7 years ago

Any solution here I am seeing the same issue on my end.

refaelos commented 7 years ago

@DataWanderer I wish there was. The problem is that when your reducer is creating too many output files, 4MC needs to use a lot of memory to handle them. So it'll probably fail.

advancedxy commented 7 years ago

This is an old issue...

@advancedxy getting back to this one. So you're saying increase number of reducers without increasing number of nodes or memory configurations?

Yes, increase the number of reducers: the more reducers, the less memory requirement for one reducer task. And there should be capacity setting.

Let's say, you job has 100 reducers and your resource supports you run the job concurrently, then increase the number of reducers to 200, but set your job reducer's capacity to 100, there will be only 100 reducers running concurrently.

refaelos commented 7 years ago

@advancedxy thanks. Increasing the number of reducers means adding more machines to the cluster? If yes, This makes the entire process more expensive to run.

advancedxy commented 7 years ago

Increasing the number of reducers means adding more machines to the cluster?

No. Number of reducers should be an setting of your MR Job. It's a logical concept

refaelos commented 7 years ago

@advancedxy ok got it. I'll take a look.

trixpan commented 6 years ago

@carlomedas I see you haven't released after the merge of @advancedxy PR.

Do you plan to do so?

carlomedas commented 6 years ago

Yes it's in my plan but I'm travelling and not be from here to recompile the native on all platforms, which is needed for a release. If you can help compile on some platform (e.g. Win64 is worst atm as I miss the original VM I used), it'd help :)