joblib / joblib-spark

Joblib Apache Spark Backend
Apache License 2.0
242 stars 26 forks source link

Memory leak in task serialization #16

Open gench opened 4 years ago

gench commented 4 years ago

In the parallelization of multiple tasks, the memory take of the spark driver is increased by the memory requirement of each task. I think the problem is in apply_async function which keeps the reference to the serialised pickle objects so the driver easily goes out of memory when the number_of_tasks x memory_take_of_a_task > driver_memory.

WeichenXu123 commented 4 years ago

@gench This should be your code issue.

in apply_async function which keeps the reference to the serialised pickle objects

The serialized func object will only include function code bytes and some param data which used to run tasks. When tasks launched in spark executor, it will allocate memory in executor side.

gench commented 4 years ago

The serialized func object will only include function code bytes and some param data which used to run tasks.

That is the problem.

My function takes a big dataframe generated in the driver as you can see below. Each time the function is serialised for an executor, its memory is not released afterwards. When I run the following code, it takes 8 times the memory of X pandas dataframe.

Parallel(backend="spark", n_jobs=8)(delayed(my_function)(X=X, ...) for fold, (tr_ind, val_ind) in enumerate(cv_iterator))

WeichenXu123 commented 3 years ago

@gench

My function takes a big dataframe

Could you try convert the dataframe into a spark broadcast variable ?

like: bc_pandas_df = sparkContext.broadcast(pandas_df) then in remote executed function, get the broadcast variable value by bc_pandas_df.value