bartosz25 / spark-scala-playground

Sample processing code using Spark 2.1+ and Scala
50 stars 25 forks source link

About spark on heap memory mode and off heap memory mode #10

Closed bithw1 closed 5 years ago

bithw1 commented 5 years ago

Hi @bartosz25,

I would ask you a question:

Spark has two memory mode: on heap mode and off heap mode. And there are two options to control the use of off heap memory

spark.memory.offheap.size
spark.memory.offheap.enabled

With spark.memory.offheap.enabled sets to false, I submit application using the following config

(1)spark-submit --master yarn --executor-memory 4G

That means, I allocate 4G on heap memory for each executor.

With spark.memory.offheap.enabled sets to true, I submit application using the following config

(2) spark-submit --master yarn --executor-memory 1G --conf spark.memory.offheap.enabled=true --conf spark.memory.offheap.size=4G

That means, I also have at least 4G memory (off heap)to do data cache or shuffle/aggregation? It looks that spark memory manager only choose one mode at runtime. That's, when I enable offheap mode, then spark will only use off heap memory to do data cache or shuffle/aggregation.

Actually, I don't have a good understanding about how off heap mode affects spark application,and when I should think to enable off heap mode? I have always used on heap mode.

Thank you @bartosz25 for the help.

bartosz25 commented 5 years ago

Thanks for your interesting point. I've never had to use off-heap memory but I'll investigate more the topic in next weeks. Please leave the issue open. I'll put the results of my analysis here before publishing the post.

bithw1 commented 5 years ago

Thanks @bartosz25 for the help!

bartosz25 commented 5 years ago

Hi @bithw1 ,

I finished the post this morning. I will publish it in the week 03-09.12. For now you can watch the video I made to illustrate the behavior you point out: https://www.youtube.com/watch?v=hR4gcj10sO4

As you can see, YARN allocates the amount of memory defined in executor-memory property but Apache Spark uses the memory from the off-heap parameters. And it can be misleading because YARN thinks that the app will use less memory than in the reality. Hence it can accept more application that it's able to execute. Hence the off-heap memory should always be equal to the memory defined in executor-memory to avoid such problematic situations.

Best regards, Bartosz.

bithw1 commented 5 years ago

Thanks @bartosz25. I watched your video and played around with SparkPi, looks I haven't grasped a good understanding for now, I will read and try to understand when you post your article then.

When we submit a spark app to yarn, spark will request memory for each container from yarn, the memory amount for each container is executor-memory + spark.yarn.executor.memoryOverhead , but spark doesn't tell yarn how much memory is for on heap, how much is for offheap, then I don't understand how spark could use offheap memory in the yarn container process? I am kind of confused.

bartosz25 commented 5 years ago

In fact off-heap memory is not managed memory so Apache Spark should be able to use without YARN being aware of that. Off-heap is the physical memory of the server. I don't know YARN containers details though but IMO there are 2 options:

But it's my guess. I've never explored YARN internals so far and I doubt I will because I'll focus more on the Spark on Kubernetes initiative.

bithw1 commented 5 years ago

sure, Thanks @bartosz25

bartosz25 commented 5 years ago

Hi @bithw1

I've just published the post about on- and off-heap memory https://www.waitingforcode.com/apache-spark/apache-spark-off-heap-memory/read

Best regards, Bartosz.

bithw1 commented 5 years ago

Hi @bartosz25 ,

Thanks for the great article, but I am still not clear why Spark application could be able to use off heap memory with the amount that user specifies

spark.memory.offheap.size
spark.memory.offheap.enabled

The shell script to launch java process (namely, the executor process) that runs in yarn container is in

ExecutorRunnable#startContainer and ExecutorRunnable#prepareCommand

For short, the command to launch the executor process is: java -server -Xmx executorMemory ...., but it doesn't specify the amount of off heap memory, then I am not sure why the process could use off heap as specified?

I thought there should be a JVM argument to specify how much off heap memory that a java process can use?

bartosz25 commented 5 years ago

You're right, I didn't explain it very well.

I thought there should be a JVM argument to specify how much off heap memory that a java process can use?

I don't think so. The engine computes the max amount of memory it can allocate for storage and execution directly from spark.memory.offheap.size property, only when enabled:


  protected[this] val maxOffHeapMemory = conf.get(MEMORY_OFFHEAP_SIZE)
  protected[this] val offHeapStorageMemory =
    (maxOffHeapMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong

  offHeapExecutionMemoryPool.incrementPoolSize(maxOffHeapMemory - offHeapStorageMemory)
  offHeapStorageMemoryPool.incrementPoolSize(offHeapStorageMemory)

The memory allocation is managed by appropriate MemoryAllocator instance and for off-heap it's UnsafeMemoryAllocator. The allocation happens with the methods taking in parameter the size of memory to use (object size) and it internally calls Java's Unsafe package so some low-level C++ code.

In addition to that, the engine tries to protect itself against unexpected OOMs and it's why you can see that MemoryStore put operations return a boolean flag telling whether allocation succeeded or not. The most of the time it fails when there is no enough available space in memory. It does similarly to the tasks:

// MemoryConsumer implemented by some of specific tasks
  public LongArray allocateArray(long size) {
    long required = size * 8L;
    MemoryBlock page = taskMemoryManager.allocatePage(required, this);
    if (page == null || page.size() < required) {
      throwOom(page, required);
    }
  private void throwOom(final MemoryBlock page, final long required) {
    long got = 0;
    if (page != null) {
      got = page.size();
      taskMemoryManager.freePage(page, this);
    }
    taskMemoryManager.showMemoryUsage();
    throw new SparkOutOfMemoryError("Unable to acquire " + required + " bytes of memory, got " +
      got);
  }
/**
 * This exception is thrown when a task can not acquire memory from the Memory manager.
 * Instead of throwing {@link OutOfMemoryError}, which kills the executor,
 * we should use throw this exception, which just kills the current task.
 */
@Private
public final class SparkOutOfMemoryError extends OutOfMemoryError {

Also if you look at memory pools (eg. ExecutionMemoryPool), you'll see the signs of tracking of used memory:

  override def memoryUsed: Long = lock.synchronized {
    memoryForTask.values.sum
  }
bithw1 commented 5 years ago

Thanks @bartosz25, understood now.

I was confused with java direct memory (ByteBuffer.allocateDirect)and off heap.... I have thought that java direct memory and off heap are the same thing before, actually, direct memory is only one kind of off heap,also allocated/freed with Unsafe API.

The amount of direct memory is controlled by JVM argument: XX:MaxDirectMemorySize, but off heap memory itself has no such limit or control

bithw1 commented 5 years ago

@bartosz25

Still one thing, I would like to conform whether I am right:

I have one node that has 40G physical memory in the yarn cluster

The memory specification to launch spark application is:

--executor-memory 4G --conf spark.memory.offheap.size=4G --conf spark.memory.offheap.enabled=true

Then,

  1. From Yarn point, since this node has 40G in total, so that is could be able to launch 40G/executor-memory=10 containers(executors) at most on this node(ignore memory overhead)
  2. From Spark point, each executor will occupy 8G memory, so that, there could be 40/8=5 executors at most that can be run on this machine.