shakedzy / dython

A set of data tools in Python
http://shakedzy.xyz/dython/
MIT License
496 stars 102 forks source link

(feat) parallellize associations() function #132

Closed mahieyin-rahmun closed 2 years ago

mahieyin-rahmun commented 2 years ago

Why

This PR attempts to address #117 by adding a separate function called associations_parallel() which utilizes the multiprocessing module to parallelize computations.

How

If the dataset has more nominal (categorical) columns than numerical columns, the associations() function slows down considerably since it is performing all the calculations in a nested for loops. This is also happening on a single CPU core. We can exploit the fact that the column index pairs used by the nested for loops can be precomputed (essentially cartesian product of the indices of the columns of the dataset) and then the task of further computation can be delegated to multiple processes.

Benchmarking

I used the following script to run benchmarks. My system information can be found at the bottom of the post.

from dython.nominal import associations
import numpy as np
import pandas as pd
import time
import random
import matplotlib.pyplot as plt

random.seed(42)
np.random.seed(42)

def get_random_categorical_array():
    categorical_data_array = [
        ["A", "B", "C"],
        ["W", "X", "Y", "Z"],
        ["Yes", "No"],
        ["M", "N", "O", "P"]
    ]
    return random.choice(categorical_data_array)

def benchmark_function(func, dataframe, **kwargs):
    times = []

    for run_count in range(5):
        print(f"Run: {run_count}")
        start = time.time()
        results = func(dataframe, compute_only=True, **kwargs)
        end = time.time()
        print(f"Time taken: {(end - start):.2f} seconds")
        times.append(end - start)

    print(
        f"Average time taken by {func}: {(sum(times) / len(times)):.2f} seconds")

    return times

def run_benchmark():
    num_cols = 86
    bar_chart_data = np.random.randn(32001, num_cols)

    dataframe = pd.DataFrame(
        data=bar_chart_data[1:, 1:],
        index=bar_chart_data[1:, 0],
        columns=bar_chart_data[0, 1:]
    )

    # 75% of the columns are nominal
    for index in range(int(num_cols * 0.75)):
        dataframe.iloc[:, index] = np.random.choice(
            get_random_categorical_array(), len(dataframe))

    times_vanialla = benchmark_function(
        associations, dataframe, multiprocessing=False)
    times_2_cores = benchmark_function(associations, dataframe,
                                       multiprocessing=True, max_cpu_cores=2)
    times_4_cores = benchmark_function(associations, dataframe,
                                       use_multiprocessing=True, max_cpu_cores=4)
    times_6_cores = benchmark_function(associations, dataframe,
                                       use_multiprocessing=True, max_cpu_cores=6)

    # plot the results
    fig, axes = plt.subplots(1, 2, figsize=(12, 6), dpi=150)
    axes[0].plot(times_vanialla, label="Vanilla")
    axes[0].plot(times_2_cores, label="2 cores")
    axes[0].plot(times_4_cores, label="4 cores")
    axes[0].plot(times_6_cores, label="6 cores")
    axes[0].set_title("Time taken by associations function (5 runs)")
    axes[0].set_xlabel("Run count")
    axes[0].set_ylabel("Time taken (seconds)")
    axes[0].set_xticks(np.arange(len(times_vanialla)),
                       np.arange(1, len(times_vanialla)+1))
    axes[0].legend()

    bar_chart_data = {
        "Vanilla": np.mean(times_vanialla),
        "2 cores": np.mean(times_2_cores),
        "4 cores": np.mean(times_4_cores),
        "6 cores": np.mean(times_6_cores),
    }
    axes[1].bar(bar_chart_data.keys(), bar_chart_data.values(), width=0.5)
    axes[1].set_title(
        "Average time taken by associations function (across 5 runs)")
    axes[1].set_ylabel("Time taken (seconds)")
    plt.tight_layout()
    plt.show()

if __name__ == "__main__":
    run_benchmark()

Figure_2

System Information

CPU

> lscpu
Architecture:            x86_64
  CPU op-mode(s):        32-bit, 64-bit
  Address sizes:         48 bits physical, 48 bits virtual
  Byte Order:            Little Endian
CPU(s):                  12
  On-line CPU(s) list:   0-11
Vendor ID:               AuthenticAMD
  Model name:            AMD Ryzen 5 5500U with Radeon Graphics
    CPU family:          23
    Model:               104
    Thread(s) per core:  2
    Core(s) per socket:  6
    Socket(s):           1
    Stepping:            1
    Frequency boost:     enabled
    CPU max MHz:         4056.0000
    CPU min MHz:         400.0000
    BogoMIPS:            4192.30
    Flags:               fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush mmx fxsr sse sse2 ht syscall nx mmxext fxsr_o
                         pt pdpe1gb rdtscp lm constant_tsc rep_good nopl nonstop_tsc cpuid extd_apicid aperfmperf rapl pni pclmulqdq monitor ssse3 fma
                          cx16 sse4_1 sse4_2 movbe popcnt aes xsave avx f16c rdrand lahf_lm cmp_legacy svm extapic cr8_legacy abm sse4a misalignsse 3d
                         nowprefetch osvw ibs skinit wdt tce topoext perfctr_core perfctr_nb bpext perfctr_llc mwaitx cpb cat_l3 cdp_l3 hw_pstate ssbd
                          mba ibrs ibpb stibp vmmcall fsgsbase bmi1 avx2 smep bmi2 cqm rdt_a rdseed adx smap clflushopt clwb sha_ni xsaveopt xsavec xg
                         etbv1 xsaves cqm_llc cqm_occup_llc cqm_mbm_total cqm_mbm_local clzero irperf xsaveerptr rdpru wbnoinvd cppc arat npt lbrv svm
                         _lock nrip_save tsc_scale vmcb_clean flushbyasid decodeassists pausefilter pfthreshold avic v_vmsave_vmload vgif v_spec_ctrl 
                         umip rdpid overflow_recov succor smca
Virtualization features: 
  Virtualization:        AMD-V
Caches (sum of all):     
  L1d:                   192 KiB (6 instances)
  L1i:                   192 KiB (6 instances)
  L2:                    3 MiB (6 instances)
  L3:                    8 MiB (2 instances)
NUMA:                    
  NUMA node(s):          1
  NUMA node0 CPU(s):     0-11
Vulnerabilities:         
  Itlb multihit:         Not affected
  L1tf:                  Not affected
  Mds:                   Not affected
  Meltdown:              Not affected
  Mmio stale data:       Not affected
  Retbleed:              Mitigation; untrained return thunk; SMT enabled with STIBP protection
  Spec store bypass:     Mitigation; Speculative Store Bypass disabled via prctl and seccomp
  Spectre v1:            Mitigation; usercopy/swapgs barriers and __user pointer sanitization
  Spectre v2:            Mitigation; Retpolines, IBPB conditional, IBRS_FW, STIBP always-on, RSB filling
  Srbds:                 Not affected
  Tsx async abort:       Not affected

Memory

> lsmem              
RANGE                                  SIZE  STATE REMOVABLE  BLOCK
0x0000000000000000-0x00000000cfffffff  3.3G online       yes   0-25
0x0000000100000000-0x000000050fffffff 16.3G online       yes 32-161

Memory block size:       128M
Total online memory:    19.5G
Total offline memory:      0B

OS

> lsb_release -a
Distributor ID: Ubuntu
Description:    Ubuntu 22.04 LTS
Release:        22.04
Codename:       jammy

> uname -r
5.15.0-46-generic

Python

> python --version
Python 3.8.13

Anaconda

> conda --version
conda 4.12.0

Caveats

  1. My observation is that using m cores doesn't necessarily cut down the computation time by a factor of m, but some performance gains are achieved.
  2. When using 2 cores, I never got better performance than a single core. This is probably due to the fact that there is an additional overhead of spawning multiple processes, which is only beneficial if using more than 2 cores.
  3. There might be an upper limit on the number of cores that can be used to get performance gains, i.e. a point of diminishing returns. However, I am not able to test that hypothesis. Benchmarks from other users who own 10/12/16/32 core machines will be able to provide more insights into this.

What issue does this fix

Closes #117

shakedzy commented 2 years ago

hey @mahieyin-rahmun - really cool :) The only issue here I have is the fact that there are (again) two associations methods (something I removed from previous versions). There's a lot of mutual code between these two, especially all the preprocessing at the beginning. What I suggest is having the multiple-core part taken out and placed in a private method, and have it triggered using a flag (for example, if the required number of cores is more than 1). Also, one thing I'm missing from your (really cool) time analysis is how long it takes for the current version to compute? (meaning, what's benchmark_function(associations, dataframe) )?

mahieyin-rahmun commented 2 years ago

@shakedzy, thank you very much for your feedback.

I initially wanted to have changes in the associations() function, but it has some nested inner functions which multiprocessing module cannot handle (since inner functions cannot be pickled). If I have to do as you suggested, I would also have to refactor the inner functions to be class-level or utility functions. Is that okay with you?

I agree about the time analysis part, I thought that the performance of the multiprocessing version restricted to a single core would be the performance of the current version but that may not be the case. I will update the PR.

shakedzy commented 2 years ago

@mahieyin-rahmun you don't have to force everything into one method - you can have a separate private method for the multiprocessing, and call it from within associations. My goal here is to reduce duplicated code as much as possible, but still keep everything simple and maintainable

mahieyin-rahmun commented 2 years ago

Hi, @shakedzy, I have updated the PR with the changes requested. Kindly let me know if additional changes are required.

Here's the benchmark for the current code: Figure_2

I have also updated the benchmark code and chart in the original comment above.

shakedzy commented 2 years ago

This is superb work, @mahieyin-rahmun ! The only issue now is that I just merged your other PR that enforces using Black. This branch needs to merge master as there are conflicts

shakedzy commented 2 years ago

Merged 🎉 great job @mahieyin-rahmun !

KonradHoeffner commented 2 years ago

@mahieyin-rahmun: Thank you! I have an Core i9-10900k with 10 cores / 20 threads and a Core i9-12900k with 16 Cores (8 performance, 8 efficiency) and 24 threads. If you are still interested in benchmarks I could run them on my machines or is that not necessary anymore?

KonradHoeffner commented 2 years ago

@mahieyin-rahmun @shakedzy I copied your benchmark code into benchmark.py and tried to run it but failed, am I doing something wrong?

$ pip install git+https://github.com/shakedzy/dython.git@master
[...]
$ python benchmark.py
Run: 0
Traceback (most recent call last):
  File "/home/konrad/tmp/test/benchmark.py", line 91, in <module>
    run_benchmark()
  File "/home/konrad/tmp/test/benchmark.py", line 54, in run_benchmark
    times_vanialla = benchmark_function(
  File "/home/konrad/tmp/test/benchmark.py", line 28, in benchmark_function
    results = func(dataframe, compute_only=True, **kwargs)
TypeError: associations() got an unexpected keyword argument 'use_multiprocessing'
mahieyin-rahmun commented 2 years ago

Hi @KonradHoeffner, I believe the keyword arguments were shortened a bit before the PR was merged. You can see from the source code that the keyword argument is multiprocessing (drop the use_). I will update the benchmark code as well.

KonradHoeffner commented 2 years ago

@mahieyin-rahmun thanks, that works! Results from my Intel Core i9-10900k with 10 cores and 20 threads: dython10900k

It seems as if hyperthreading doesn't help in this case. 48 GB DDR4 RAM at 3000Mhz, Arch Linux default kernel version 5.19.7, Python 3.10.7.

mahieyin-rahmun commented 2 years ago

Yes, the current attempt at parallelizing does not perform well with hyperthreading, so, only physical cores are being utilized. Your 16 and 20 core results are actually still utilizing the 10 physical cores.

I believe the next steps could be to parallelize/vectorize the helper functions. I think someone is/was already working on it.

KonradHoeffner commented 2 years ago

Results below from an Intel Core i9-12900k with 8 Performance and 8 Efficiency Cores, 32 GB DDR5 RAM. Interestingly, the time goes up again when using more cores than the number of performance cores.

benchmark Another run:

benchmark2

mahieyin-rahmun commented 2 years ago

I do not have much knowledge about performance cores and efficiency cores of the Intel SKUs, but it seems like efficiency cores are somehow ending up bottlenecking the process. Perhaps the performance overhead of starting a separate process on an efficiency core (which probably has lower base clock speeds compared to performance cores?) and then collecting the results from that process bears a large enough overhead to slow down computation.

shakedzy commented 2 years ago

This is a really interesting discussion, @mahieyin-rahmun and @KonradHoeffner . I think to perhaps add a "find the optimal num of cores" to the library, which basically runs this test. What do you think?

KonradHoeffner commented 2 years ago

@shakedzy some thoughts:

Ultimately, limiting the thread count to the number of physical performance cores seems like a good approximation to me, or if there there is no easy way to get the number of non-efficiency cores in case of a hybrid architecture a good heuristic may be to just cap it at 8 or 10. On the other hand, if not clearly documented this may create problems in the future when we all have 100 core machines and wonder why it doesn't scale to the number of cores with gigantic input sizes. But I am neither a developer nor a heavy user of this library so I don't feel qualified to make such a decision.

shakedzy commented 2 years ago

@KonradHoeffner I see your points. Perhaps it will be best to simply add this test as an external script in this repo. We can refer to it in the documentation.