darshan-hpc / darshan

Darshan I/O characterization tool
Other
55 stars 27 forks source link

Concurrent get_heatmap_df #798

Open tylerjereddy opened 1 year ago

tylerjereddy commented 1 year ago

As a quick experiment, if we use more than 1 processor in get_heatmap_df() we can save ~8 seconds in the generation of the HTML report for e3sm_io_heatmap_and_dxt.darshan. This is in branch treddy_dxt_html_speedup, which itself branches off of gh-784. It has no effect on the processing time of snyder_acme.exe_id1253318_9-27-24239-1515303144625770178_2.darshan though, and the drop from 42 to 34 seconds isn't enough to justify the complexity that would be required to handle the concurrency properly (heuristic for size at which to use > 1 core, how much work per process, testing the concurrent vs. serial code, having concurrency off by default/opt-in, etc.).

Nonetheless, I'll note this here for now since the interpolation takes up 27 seconds of the 42 total for e3sm...

--- a/darshan-util/pydarshan/darshan/experimental/plots/heatmap_handling.py
+++ b/darshan-util/pydarshan/darshan/experimental/plots/heatmap_handling.py
@@ -3,6 +3,7 @@ Module of data pre-processing functions for constructing the heatmap figure.
 """

 from typing import Dict, Any, Tuple, Sequence, TYPE_CHECKING
+import concurrent.futures

 import sys

@@ -369,9 +370,26 @@ def get_heatmap_df(agg_df: pd.DataFrame, xbins: int, nprocs: int) -> pd.DataFram
     else:
         null_mask = cats.notna().sum(axis=1) > 1
         null_mask = null_mask.loc[null_mask == True].index
-        cats_vals_to_interp = pd.DataFrame(cats.iloc[null_mask].values)
-        cats_vals_to_interp.interpolate(method="nearest", axis=1, inplace=True)
-        cats.iloc[null_mask] = cats_vals_to_interp
+        list_futures = []
+        num_workers = 4
+        if len(null_mask) > 10000:
+            work_inds = np.array_split(null_mask, num_workers)
+
+            with concurrent.futures.ProcessPoolExecutor(max_workers=num_workers) as executor:
+                for ind, work_ind in enumerate(work_inds):
+                    list_futures.append(executor.submit(interpolator,
+                                                        cats,
+                                                        work_ind,
+                                                        ind))
+
+            for future in concurrent.futures.as_completed(list_futures):
+                cats_vals_to_interp, ind = future.result()
+                cats.iloc[work_inds[ind]] = cats_vals_to_interp
+        else:
+            cats_vals_to_interp = pd.DataFrame(cats.iloc[null_mask].values)
+            cats_vals_to_interp.interpolate(method="nearest", axis=1, inplace=True)
+            cats.iloc[null_mask] = cats_vals_to_interp
+
     # each time bin containing an event has a 1 in it, otherwise NaN
     # store mask for restoring fully occupied bins
     mask_occ = cats == 2
@@ -396,3 +414,9 @@ def get_heatmap_df(agg_df: pd.DataFrame, xbins: int, nprocs: int) -> pd.DataFram
     hmap_df = cats.groupby("rank").sum()
     hmap_df = hmap_df.reindex(index=range(nprocs), fill_value=0.0)
     return hmap_df
+
+
+def interpolator(cats, work_ind, ind):
+    cats_vals_to_interp = pd.DataFrame(cats.iloc[work_ind].values)
+    cats_vals_to_interp.interpolate(method="nearest", axis=1, inplace=True)
+    return cats_vals_to_interp, ind
tylerjereddy commented 1 year ago

Two other things to check here: 1) separate out the interpolate reproducer and drill down in pandas or possibly SciPy interp1d under the hood to see if there's any room for perf bumping there 2) What if we didn't interpolate at all, and instead used a series of NumPy vectorized ops to find the locations to fill in with 1s? Some NumPy operations are SIMD-accelerated for example, so could be worth checking on that.

tylerjereddy commented 1 year ago

The example below the fold appears to be 100X faster than pandas interpolation for our use case with 80,000 rows x 200 columns, and it doesn't even use Cython, just NumPy and Python. Let's see if I can do two things now: (1) demonstrate a genuine speedup for the e3sm_io_heatmap_and_dxt.darshan log, without slowing other stuff down; (2) perhaps touch base with pandas upstream to see if this "interpolation" use case merits a fast path, or if it is too specialized for that?

```python import time import numpy as np import pandas as pd from pandas.testing import assert_frame_equal def setup_problem(): # make a DataFrame where you want to interpolate # massive numbers of rows with structure like # [np.nan, 1, np.nan, np.nan, 1, np.nan] # to # [np.nan, 1, 1, 1, 1, np.nan] # So, fill 1 between 1s arr = np.zeros((80000, 200)) arr[..., 1] = 1 arr[..., 100] = 1 df = pd.DataFrame(arr) df.replace(0, np.nan, inplace=True) return df def interplate_with_pandas(df): start = time.perf_counter() # this is a convenient single liner, but it does # pure Python calls to _interpolate_1d in # pandas/core/missing.py a LOT df.interpolate(method="nearest", axis=1, inplace=True) end = time.perf_counter() print("interp time for pandas (s):", end - start) return df def interplate_with_numpy(df): start = time.perf_counter() arr = df.values a = ~np.isnan(arr) b = np.where(a) num_rows = len(df) start_index = 0 row = 0 # no cheating here--we can't use the knowledge # that start:end bounds don't change, b/c real # data will have them in different columns # This appears to be 100X faster, even without # using i.e., Cython while start_index < (2 * num_rows): start_bound = b[1][start_index] end_bound = b[1][start_index + 1] arr[row, start_bound:end_bound] = 1 start_index += 2 row += 1 end = time.perf_counter() print("custom interp time (s):", end - start) return df if __name__ == "__main__": df = setup_problem() df = interplate_with_pandas(df=df) df_custom = setup_problem() df_custom = interplate_with_numpy(df=df_custom) assert_frame_equal(df, df_custom) ```
tylerjereddy commented 1 year ago

For upstream query see https://github.com/pandas-dev/pandas/issues/48236

tylerjereddy commented 1 year ago

I should also note that the faster interpolation algorithm is probably a route to avoiding early conversion to the float64 datatype--without the need to use the pandas interpolate, we can just place 1 between the start/end in each row without needing a float-based NaN sentinel, which should mean that a smaller unsigned (integer) byte type (np.unit8) could suffice to hold the "masks."

Not sure how much that helps memory/performance overall, since we'd likely eventually have to produce an array of doubles down the line anyway, but doing the manual interpolation op on a smaller data type/array could keep the calculation closer to the processor (i.e., could fit in a closer memory location).