ocean-eddy-cpt / gcm-filters

Diffusion-based Spatial Filtering of Gridded Data
https://gcm-filters.readthedocs.io/
Other
37 stars 24 forks source link

Performance evaluation #45

Open jbusecke opened 3 years ago

jbusecke commented 3 years ago

JOSS askes for If there are any performance claims of the software, have they been confirmed? (If there are no claims, please check off this item.)

What should/can we do to document performance of the package?

NoraLoose commented 3 years ago

We started the discussion on performance evaluation here and here.

jbusecke commented 3 years ago

Apologies for missing those. Should I close this issue?

NoraLoose commented 3 years ago

Not at all! I just posted these, so we don't need to start the discussion from scratch. :)

jbusecke commented 3 years ago

Perfect!

NoraLoose commented 3 years ago

Sorry, I didn't mean to kill this discussion. I will try to summarize the aspects that we had discussed to include in a performance evaluation.

I am sure this list is incomplete, so please feel free to add/delete items. Maybe @iangrooms also saved the computational section that was originally part of the JAMES paper.

Question: Do we want to include the performance evaluation

  1. in the JOSS paper,
  2. in the gcm-filters documentation,
  3. both (in some kind of combination)?

I think 3. would maybe work best.

rabernat commented 3 years ago

Thanks for the writeup. I think your list is perfect, and I favor option 3.

For me, the big question is the following. Most HPC system nodes have either ~24 CPU cores or 1 GPU. By using parallelism appropriately (e.g. parallelize across non-spatial dimensions), can the 24-core CPU speed get close to the 1 GPU speed?

I find this problem fun and would like to work on it a bit.

rabernat commented 3 years ago

I know I volunteered to do this. I still hope to work on it. Will try to squeeze it in over the next few days.

rabernat commented 3 years ago

Ok, so I started the benchmarking exercise in a a standalone repo:

https://github.com/ocean-eddy-cpt/gcm-filters-benchmarks

The repo is organized as follows:

One central goal is to figure out how many CPUs you need to use to replicate the GPU performance.

What I did so far

What I did was run the basic tutorial example on Casper with various numbers of CPUs and look at strong and weak scaling. I bypass questions about i/o performance by using random data. When x and y are contiguous (not chunked) the GCM filters approach is embarrassingly parallel, so in theory should scale perfectly (runtime inversely proportional to the number of CPUs used).

What I found was a bit different. Here is the weak scaling (from the plot_results.ipynb notebook). I used a single node of Casper, varying the number of CPUS from 1 to 36, with various combinations of workers and threads.

image

What the graph shows is decent scaling for low core counts, then a saturation at high core counts. As a reminder, this is weak scaling, which means we actually we use more cores, we make the problem bigger. The fact that the throughput saturates at a relatively low rate (200 MB/s) indicates that scaling is breaking down for some reason. Instead, as we add more cores, we should keep processing data faster. It would be great to dig into this a bit deeper to understand why this is happening. Perhaps that is something that Dask experts @gjoseph92 and @jrbourbeau could help us debug?

A useful comparison point is simply taking the mean of the same array, which exposes Dask's own scaling properties: image

Although it is somewhat sub-linear, it is better than gcm-filters.

Next steps

Compare to GPU performance

We should compare these single-node CPU results to the GPU results. Unfortunately I couldn't do this because I broke my cupy installation in my Casper python environment 😢. Ideally someone with a working cupy could rerun the basic benchmark with the two different types of Casper GPUs. Note that, with a single GPU, we can't really look at scaling proper, since we can't use multiple GPUs. But we can still locate the GPU performance on the same axis (MB/s) as the CPU performance.

If we wanted to use a multi-GPU cluster, we could get into dask-cuda, but IMO this is overkill for the JOSS paper.

Look at distributed scaling on multiple nodes

It's a mystery why gcm-filters is not scaling linearly up to 36 cores on a single node. We should debug this and try to understand why. Perhaps it's related to memory usage? (There is no disk i/o in the example, so that can't be an issue.)

We could also look at scaling across multiple different nodes. Maybe if we keep the core count low on each node (e.g. use only 4 cores per node), but spread over several nodes, we could get better scaling. We would need to figure out the best way to launch a multi-node Dask cluster on Casper or Cheyenne (probably Cheyenne). Pining @andersy005, software engineer at CISL, for help with this.

Vary some of the array parameters

I used an array with chunks of shape (10, 1024, 1024) and dtype float64. Some other things to try

My hope is that, by getting this started and giving a structure to the benchmark code, this is easy to hand off to other folks. Please feel free to make PRs to the gcm-filters-benchmarks repo, updating the notebook and / or adding more CSV results to the data/ directory.

We are flying to Italy tonight, so I will be offline for a day or two while in transit. Will check back in later in the week.

Thanks for your patience, as I have been very slow with my task.

gjoseph92 commented 3 years ago

Thanks @rabernat! I think @jrbourbeau will look into this soon. Two things that would help us investigate this faster:

1) Could you collect performance reports for some of runs? (Shameless plug: you can now use coiled.performance_report to capture and host these for you, which saves the annoyance of uploading to a gist and getting the githack link.) 2) Could you try letting threads_per_worker go up to 36? I'm curious if this could be a case of https://github.com/dask/distributed/issues/4892.

mrocklin commented 3 years ago

Checking in here. It would be useful to get a performance report of one anecdotal run. That might help us to better understand the cause of the sub-linear scaling. I would strongly recommend doing this before investing time in running larger parameter sweeps.

rabernat commented 3 years ago

Sorry for the delayed response. We just transitioned to Italy and I have fallen behind on all notifications.

@NoraLoose - since this is likely to take a while, do you think we should move forward with the JOSS submission without the detailed performance assessment?

NoraLoose commented 3 years ago

@rabernat I think we are not in a big rush. We still need to work on other things (like restructuring the documentation). So there may be time to get the performance improvement / assessment done by the time everything else is ready.

rabernat commented 3 years ago

I am having trouble with performance_report (both the coiled and dask.distributed versions).

with performance_report("unfiltered_mean_36_cores_4_workers.html"):
    unfiltered.data.mean().compute()

I am getting this error with both. I am using dask version 2021.05.01

distributed.core - ERROR - Exception while handling op performance_report
Traceback (most recent call last):
  File "/glade/work/rpa/my_npl_clone/lib/python3.7/site-packages/distributed/core.py", line 498, in handle_comm
    result = await result
  File "/glade/work/rpa/my_npl_clone/lib/python3.7/site-packages/distributed/scheduler.py", line 6663, in performance_report
    sysmon = SystemMonitor(self, last_count=last_count, sizing_mode="stretch_both")
  File "/glade/work/rpa/my_npl_clone/lib/python3.7/site-packages/distributed/dashboard/components/shared.py", line 466, in __init__
    names = worker.monitor.range_query(start=last_count)
  File "/glade/work/rpa/my_npl_clone/lib/python3.7/site-packages/distributed/system_monitor.py", line 123, in range_query
    d = {k: [v[i] for i in seq] for k, v in self.quantities.items()}
  File "/glade/work/rpa/my_npl_clone/lib/python3.7/site-packages/distributed/system_monitor.py", line 123, in <dictcomp>
    d = {k: [v[i] for i in seq] for k, v in self.quantities.items()}
  File "/glade/work/rpa/my_npl_clone/lib/python3.7/site-packages/distributed/system_monitor.py", line 123, in <listcomp>
    d = {k: [v[i] for i in seq] for k, v in self.quantities.items()}
IndexError: deque index out of range
---------------------------------------------------------------------------
IndexError                                Traceback (most recent call last)
<ipython-input-12-f054ab3a58e9> in <module>
      1 with performance_report("unfiltered_mean_36_cores_4_workers.html"):
----> 2     unfiltered.data.mean().compute()

/glade/work/rpa/my_npl_clone/lib/python3.7/site-packages/distributed/client.py in __exit__(self, typ, value, traceback)
   4726         except Exception:
   4727             code = ""
-> 4728         get_client().sync(self.__aexit__, type, value, traceback, code=code)
   4729 
   4730 

/glade/work/rpa/my_npl_clone/lib/python3.7/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    851         else:
    852             return sync(
--> 853                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    854             )
    855 

/glade/work/rpa/my_npl_clone/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    352     if error[0]:
    353         typ, exc, tb = error[0]
--> 354         raise exc.with_traceback(tb)
    355     else:
    356         return result[0]

/glade/work/rpa/my_npl_clone/lib/python3.7/site-packages/distributed/utils.py in f()
    335             if callback_timeout is not None:
    336                 future = asyncio.wait_for(future, callback_timeout)
--> 337             result[0] = yield future
    338         except Exception as exc:
    339             error[0] = sys.exc_info()

/glade/work/rpa/my_npl_clone/lib/python3.7/site-packages/tornado/gen.py in run(self)
    733 
    734                     try:
--> 735                         value = future.result()
    736                     except Exception:
    737                         exc_info = sys.exc_info()

/glade/work/rpa/my_npl_clone/lib/python3.7/site-packages/distributed/client.py in __aexit__(self, typ, value, traceback, code)
   4712                 code = ""
   4713         data = await get_client().scheduler.performance_report(
-> 4714             start=self.start, last_count=self.last_count, code=code
   4715         )
   4716         with open(self.filename, "w") as f:

/glade/work/rpa/my_npl_clone/lib/python3.7/site-packages/distributed/core.py in send_recv_from_rpc(**kwargs)
    862             name, comm.name = comm.name, "ConnectionPool." + key
    863             try:
--> 864                 result = await send_recv(comm=comm, op=key, **kwargs)
    865             finally:
    866                 self.pool.reuse(self.addr, comm)

/glade/work/rpa/my_npl_clone/lib/python3.7/site-packages/distributed/core.py in send_recv(comm, reply, serializers, deserializers, **kwargs)
    661         if comm.deserialize:
    662             typ, exc, tb = clean_exception(**response)
--> 663             raise exc.with_traceback(tb)
    664         else:
    665             raise Exception(response["text"])

/glade/work/rpa/my_npl_clone/lib/python3.7/site-packages/distributed/core.py in handle_comm()
    496                             result = asyncio.ensure_future(result)
    497                             self._ongoing_coroutines.add(result)
--> 498                             result = await result
    499                     except (CommClosedError, CancelledError) as e:
    500                         if self.status == Status.running:

/glade/work/rpa/my_npl_clone/lib/python3.7/site-packages/distributed/scheduler.py in performance_report()
   6661         from distributed.dashboard.components.shared import SystemMonitor
   6662 
-> 6663         sysmon = SystemMonitor(self, last_count=last_count, sizing_mode="stretch_both")
   6664         sysmon.update()
   6665 

/glade/work/rpa/my_npl_clone/lib/python3.7/site-packages/distributed/dashboard/components/shared.py in __init__()
    464         self.last_count = 0
    465         if last_count is not None:
--> 466             names = worker.monitor.range_query(start=last_count)
    467             self.last_count = last_count
    468         self.source = ColumnDataSource({name: [] for name in names})

/glade/work/rpa/my_npl_clone/lib/python3.7/site-packages/distributed/system_monitor.py in range_query()
    121         seq = [i for i in range(istart, len(self.cpu))]
    122 
--> 123         d = {k: [v[i] for i in seq] for k, v in self.quantities.items()}
    124         return d

/glade/work/rpa/my_npl_clone/lib/python3.7/site-packages/distributed/system_monitor.py in <dictcomp>()
    121         seq = [i for i in range(istart, len(self.cpu))]
    122 
--> 123         d = {k: [v[i] for i in seq] for k, v in self.quantities.items()}
    124         return d

/glade/work/rpa/my_npl_clone/lib/python3.7/site-packages/distributed/system_monitor.py in <listcomp>()
    121         seq = [i for i in range(istart, len(self.cpu))]
    122 
--> 123         d = {k: [v[i] for i in seq] for k, v in self.quantities.items()}
    124         return d

IndexError: deque index out of range
gjoseph92 commented 3 years ago

Can you try upgrading distributed to the latest version?

rabernat commented 3 years ago

I originally had to stick with that version because of a weird conflict with pynvml. However, after uninstalling pynvml, I was able to update to 2021.06.02 and the problem resolved.

Performance report (36 cores spread over 4 workers): https://rawcdn.githack.com/ocean-eddy-cpt/gcm-filters-benchmarks/8b03462e777a052cb6f00f7f71a3f4320c4f2d8a/performance_reports/filtered_mean_36_cores_4_workers.html

rabernat commented 3 years ago

In contrast, here is the weak scaling report for a smaller cluster (9 cores) and proportionally smaller problem: https://rawcdn.githack.com/ocean-eddy-cpt/gcm-filters-benchmarks/1251584c735dff55ccd1633a3f07b8ee8b8daff3/performance_reports/filtered_mean_9_cores_1_workers.html

Even though the chunk size is the same, the filter_func tasks are running much faster! (8 s vs 14). Tho What could be the cause of that?!? I am puzzled. But this fully explains the failure to scale out linearly.

rabernat commented 3 years ago

Just for due diligence, here is the report for the smaller cluster with the bigger array: https://rawcdn.githack.com/ocean-eddy-cpt/gcm-filters-benchmarks/6ebd46258c97e48f8ad5dad1dba5f32022b9c5c1/performance_reports/filtered_mean_9_cores_1_workers_bigger.html

The tasks are still running faster (8s).

If this were GIL related, wouldn't it run faster using processes rather than threads? but that's not what we saw above.

gjoseph92 commented 3 years ago

I don't have an immediate explanation for this. It's also odd to me that the worker profiles look almost exactly the same, just the times are larger with more processes.

Can you try 1 worker, 36 processes for comparison? Also 36 processes with 1 thread each? Not sure it'll tell us that much, but I'm just curious.

rabernat commented 3 years ago

Can you try 1 worker, 36 processes for comparison?

https://rawcdn.githack.com/ocean-eddy-cpt/gcm-filters-benchmarks/910d3aa483d546ad38608de4d1779994dca1d2ec/performance_reports/filtered_mean_36_cores_1_workers.html

About 16s per task

Also 36 processes with 1 thread each?

https://rawcdn.githack.com/ocean-eddy-cpt/gcm-filters-benchmarks/910d3aa483d546ad38608de4d1779994dca1d2ec/performance_reports/filtered_mean_36_cores_36_workers.html

Marginally faster, about 14 s per task

rabernat commented 3 years ago

And just for fun, here is a report with 4 workers, 1 thread each: https://rawcdn.githack.com/ocean-eddy-cpt/gcm-filters-benchmarks/2dfc4118ad20b7c903d4ab05672645b068737301/performance_reports/filtered_mean_4_cores_4_workers.html

filter_func clocks in at a reliable 6.7 s.

I suppose another possibility is that the system is not actually giving me the 36 cores I am asking for. I will investigate this.

gjoseph92 commented 3 years ago

Yeah, these reports certainly look like the problem is significant, consistent resource contention (versus, say, bad scheduling). The question is, what resource? If it's possible you don't have the cores you think, that would line up very well with what we're seeing.

I do notice that the amount of available memory changes a lot between reports. Not that there's memory pressure, but just that it might indicate something is inconsistent about the system.

andersy005 commented 3 years ago

I suppose another possibility is that the system is not actually giving me the 36 cores I am asking for. I will investigate this.

@rabernat, I'm curious... at the beginning of the notebook you mention "Run on Casper full node (36 cores)", is this running on a compute node? If so, how are these resources being requested i.e. the PBS job submission?

andersy005 commented 3 years ago

is this running on a compute node? If so, how are these resources being requested i.e. the PBS job submission?

Nevermind :)... Found your most recent job:

Job ID    User            Queue      Nodes NCPUs NGPUs Finish Time  Req Mem Used Mem(GB)  Avg CPU (%)  Elapsed (h) Job Name
509908    rpa             htc            1    36     0 06-30T12:09    360.0         37.5          8.4         4.02 jhcrbatch

If it's possible you don't have the cores you think, that would line up very well with what we're seeing.

Provided I am looking at the right job, the PBS report shows that your job has the requested 36 CPUs. However, it appears that these are underutilized (see the Avg CPU column above). not sure this relates to the performance issues you are noticing in your benchmarks...

rabernat commented 3 years ago

Thanks for checking on this Anderson!

I launched the job via https://jupyterhub.hpc.ucar.edu/ on the "Casper PBS Batch" option.

...it appears that these are underutilized (see the Avg CPU column above).

Since this is an interactive job (JupyterLab), I wouldn't read too much into the PBS job stats. Most of my walltime was spent typing code. More useful is the CPU usage pane from the dask performance report, which shows about 20% efficiency. We should be trying to hit 100 for compute-bound code such as this.

image

mrocklin commented 3 years ago

I would encourage folks to remove dask from the picture here and try using this function within a ThreadPoolExecutor.

I also recommend the %ptime magic available here, which helps to measure parallelizability of code: https://pypi.org/project/ptime/ (made by the wonderful @jcrist)

rabernat commented 3 years ago

Thanks for the tips Matt!

jrbourbeau commented 3 years ago

+1 for Matt's suggestion about ptime

More useful is the CPU usage pane from the dask performance report, which shows about 20% efficiency. We should be trying to hit 100 for compute-bound code such as this.

I just wanted to point out that this is the CPU load on the scheduler, not the workers. The CPU load for workers is available in the "Workers" tab of the dashboard

rabernat commented 3 years ago

I just tried ptime.

shape = (160, 1024, 1024)
chunks = (10, 1024, 1024)

nt, ny, nx = shape
da = xr.DataArray(dsa.random.random(shape, chunks=chunks), dims=['time', 'y', 'x'])
mask_data = dsa.ones((ny, nx))
mask_data[(ny // 4):(3 * ny // 4), (nx // 4):(3 * nx // 4)] = 0
wet_mask = xr.DataArray(mask_data, dims=['y', 'x'])

da_masked = da.where(wet_mask)

filter = gf.Filter(
    filter_scale=4,
    dx_min=1,
    filter_shape=gf.FilterShape.TAPER,
    grid_type=gf.GridType.REGULAR_WITH_LAND,
    grid_vars={'wet_mask': wet_mask}
)

da_filtered = filter.apply(da_masked, dims=['y', 'x'])
filter

filter_func = gf.filter._create_filter_func(filter.filter_spec, filter.Laplacian)

# synthetic numpy data
data = np.random.rand(10, 1024, 1024)
data_mask = np.random.rand(1, 1024, 1024)

%ptime filter_func(data, data_mask)
Total serial time:   12.88 s
Total parallel time: 6.63 s
For a 1.94X speedup across 2 threads

notebook is here

This result is consistent with the experience above that the scaling is good for small core counts (1-8 cores out of 36) but then levels off for larger core counts.

image

I am not leaving for one week of offline vacation. I won't have time to pursue this further for several weeks. The package is pip installable, and all the examples in this repo use synthetic data, so if anyone else wants to investigate further, I welcome your help. I have not tried with the ThreadPoolExecutor.

mrocklin commented 3 years ago

ptime uses a threadpoolexecutor. Mostly we've verified that this is unrelated to Dask.

It's useful to recognize that there are several things other than your CPU cores that can restrict parallelism.

  1. GIL
  2. IO, Disk
  3. Memory hierarchy
  4. Some lock within some library that you're using
  5. Python overhead

As an example, consider FFT. Functions like np.fft certainly release the GIL, but FFT's memory access pattern is so spread out (by necessity) that a modern CPU is bound by memory access rather than CPU operations. If using processes over threads doesn't help, then maybe there is some other issue at hand. If you're curious about memory hierarchy in particular then you could see how that curve depends on chunk size.

I am now leaving for one week of offline vacation. I won't have time to pursue this further for several weeks

Same! I'm also going to bow out of this converation a bit and go focus on Dask things

rabernat commented 3 years ago

Mostly we've verified that this is unrelated to Dask.

For the sake of educating us, can you explain how you reached that conclusion based on the information posted in the thread. What was the "smoking gun" that showed that it's not related to Dask?

mrocklin commented 3 years ago

Well, first, we saw that the task stream plot was packed solid, so this isn't due to any kind of inefficiency in task scheduling. Beyond that Dask is just a glorified distributed thread pool. It doesn't change the way that your code is run, except to run it in many threads.

Then, you used ptime and, I believe, showed that you experienced the same slowdown (or at least this is my read of what you were saying above)

This result is consistent with the experience above that the scaling is good for small core counts (1-8 cores out of 36) but then levels off for larger core counts.

Although now that I look at your code snippet I see that you saw good parallelism for two-cores. Did you try also with a machine with more cores?

I'll rephrase my statement to say that if we're able to see the same lack of speedup on a machine without using Dask then it is unlikely that Dask itself is related to the slowdown. Given the packed task stream plot this would be my guess.

At this stage we're much more interested in the code that is running on the CPU than in the orchestration system (dask) that causes it to be run on many CPUs.

mrocklin commented 3 years ago

Looking very briefly at the code snippet that you have I see that you're doing some sort of spatial filtering

filter = gf.Filter(
    filter_scale=4,
    dx_min=1,
    filter_shape=gf.FilterShape.TAPER,
    grid_type=gf.GridType.REGULAR_WITH_LAND,
    grid_vars={'wet_mask': wet_mask}
)

One might ask the question "how is this implemented?" Is it implemented in a way that tries to reduce memory access? For example if this is a convolution with FFT then I would expect very bad CPU scaling due to memory access patterns.

Seeing a lack of scalability on a modern CPU is not surprising for workloads that jump around a lot in an array. I don't know much about hardware virtualization technologies, but this is the kind of thing where you might try renting 100 2-core VMs rather than 20 10-core VMs and seeing if that has an impact (although I wouldn't expect this in theory, you're still renting the same physical box with the same limitations on memory bandwidth.

mrocklin commented 3 years ago

In contrast, the scalability for dask.array.mean is likely due to Dask's coordination ovehead. If we were to look at a task stream then we would not see a wall of solid color in the same way that we do here. At the same time, mean is an algorithm where the CPU can burn through data in a single pass, and so it's less likely to be bottlenecked by the memory hierarchy as soon.

dask.array.mean's sub-linear scalability is more likely due to the coordination of working on distributed hardware. Filtering's sub-linear scalability doesn't seem to be due to this. They have similar curves, but I suspect due to very different reasons.

gjoseph92 commented 3 years ago

From what I understand of the filter implementation, they all tend to involve multiple np.rolls, like https://github.com/ocean-eddy-cpt/gcm-filters/blob/65294c7cf3c5ed18eaa9476b44d1dc93068ad575/gcm_filters/kernels.py#L70-L81

From what I understand, the rolls are 4 copies of the data in 4 different memory locations. Additionally, depending on the size of data, the first pages of the intermediate output array might be evicted from CPU cache by the time we have to use them again for the next +.

I'd be curious to compare to a Numba or Cython implementation of this algorithm. The fact that it should be a local operation with two pointers (one for each axis) jumping one index forward and back at a time means an efficient, indexing-based implementation should require just one pass over the data and be very efficient in terms of CPU cache.

mrocklin commented 3 years ago

Yeah, I recommend numba.stencil. Making four copies of the array would definitely explain sub-optimal scaling.

rabernat commented 3 years ago

Although now that I look at your code snippet I see that you saw good parallelism for two-cores. Did you try also with a machine with more cores?

The machine has 36 cores, but I only ran ptime with the default settings. In retrospect, I should have read the docs better and tried %ptime -n=36 to verify that the same saturation occurs. I may try to do this, but I anticipate we would see the same result.

One might ask the question "how is this implemented?" Is it implemented in a way that tries to reduce memory access?

We are basically solving the diffusion equation repeatedly (like solving the heat equation).

don't know much about hardware virtualization technologies, but this is the kind of thing where you might try renting 100 2-core VMs rather than 20 10-core VMs and seeing if that has an impact (although I wouldn't expect this in theory, you're still renting the same physical box with the same limitations on memory bandwidth.

I tried this on Pangeo cloud (using Dask Gateway with many 2-core workers) and in fact the scaling looks much better!

image

mrocklin commented 3 years ago

Here is a maybe useful example: https://examples.dask.org/applications/stencils-with-numba.html

mrocklin commented 3 years ago

There is also scipy.ndimage.uniform_filter

mrocklin commented 3 years ago

We are basically solving the diffusion equation repeatedly (like solving the heat equation).

Right. My question though is what implementation are you using in order to solve the heat equation. Based on @gjoseph92 's answer above it looks like the implementation is less than optimal and likely to stress memory. Using something like scipy.ndimage.uniform_filter or numba will likely give you a very significant speed boost. (as much as 4x)

rabernat commented 3 years ago

@gjoseph92 thanks a lot for digging into the guts of the code!

One reason we implemented it this way was so that we could easily use GPUs. That code runs well in cupy, and is about 10x faster than on CPU.

Since we have a pretty comprehensive test suite for all these kernels, reimplementing them in numba would not be too hard.

My question though is what implementation are you using in order to solve the heat equation. Based on @gjoseph92 's answer above it looks like the implementation is poor and likely to stress memory. Using something like scipy.ndimage.uniform_filter or numba will likely give you a very significant speed boost. (as much as 4x)

@mrocklin - we can't just use off-the-shelf filters for this package. The uniform stencil we are using for this benchmarking is really just a reference implementation. Most real-world use cases for this package use more complex curvilinear grids which require us to consider finite-volume geometry in defining our Laplacians (example). That's why we created this package in the first place; as far as we know, such filters are unique to the finite volume models used in earth-system modeling. More background and theory is available in the GCM filters documentation.

Our first step in the development of this package has been to ensure correctness of the algorithm. There are very precise numerical details that are implemented very carefully (e.g. ensuring conservation properties), following our domain-specific conventions. Now that that is basically done, we are stating to look at optimization. In looking at optimization, we are starting with benchmarks. That is what this thread is about. So my understanding is that our order of operations (first get the algorithm working correctly, then benchmark, then optimize) is aligned with best practices.

The advice to try implementing the kernels is numba is a good one. We should consider pursuing it.

NoraLoose commented 3 years ago

Thanks @rabernat for your work on the performance evaluation framework and the reports, and thanks everyone for your ideas on how we can improve performance!

I just tried numba for our filter kernels using this branch. It does seem like numba improves filter performance: Throughput increases by ~25% compared to the original implementation. But the sub-linear scalability issue is not solved.

Here is the weak scaling. First plot: original implementation; second plot: with numba

weak_scaling weak_scaling_numba

The strong scaling shows a similar picture.

strong_scaling strong_scaling_numba

Also, the numba implementation decreases in efficiency with increasing the threads per worker.

rabernat commented 3 years ago

Thanks for checking on this @NoraLoose! I really l like the way you used the numba stencil decorator! From glancing though your numba implementation, I see some opportunities to go further. Right now the code is a sort of mixed numba / numpy approach, which will still result in many copies of the variables. If we want to go the numba route, I think we should consider going even further and making then entire Laplacian function a numba guvectorized function (with nopython=True).

Another very useful experiment to try would be to see if the Laplacian functions themselves show the same scaling behavior as the filter_func.

FWIW, I am convinced that %ptime is probably a better way to test raw single-machine scaling, since it takes the complexity of Dask out of the picture.

When I get back to work next week I would enjoy spending some time playing around with this again. Or you may beat me to it.

NoraLoose commented 3 years ago

From glancing though your numba implementation, I see some opportunities to go further.

I totally agree. What I tried here was rather a quick hack to get a first impression. Besides being a mixed numba / numpy approach, the current hack has two further issues:

rabernat commented 3 years ago

I have been working on this today. What I did was step back a bit and strip away some of the layers of the problem. I have a new notebook - https://github.com/ocean-eddy-cpt/gcm-filters-benchmarks/blob/master/benchmark_kernels.ipynb - which just focuses on benchmarking the kernels: their raw speed (with %timeit) and their single-machine scaling (with %ptime).

Some relevant results:

I'm going to push forward a bit with numba, using this new, simpler, benchmarking framework.

rabernat commented 3 years ago

I did some simple, self-contained numba experiments with reimplementing the regular_laplacian kernel and am getting some puzzling results. This cell can easily be copy-pasted by anyone looking to give it a try.

import numpy as np
from numba import stencil, njit
%load_ext ptime

@stencil
def regular_laplacian_numba_stencil(a):
    # Does not handle boundaries correctly
    return -4 * a[0, 0] + a[0, 1] + a[0, -1] + a[1, 0] + a[-1, 0]

# equivalent to @jit(nopython=True)
@njit
def regular_laplacian_numba_jit(a):
    # Does handle boundaries correctly
    ny, nx = a.shape
    out = np.zeros_like(a)
    for j in range(ny):
        for i in range(nx):
            out[j, i] = -4 * a[j, i] + a[j-1, i] + a[j+1, i] + a[j, i+1] + a[j, i-1]
    return out

shape = (1024, 1024)
data = np.random.rand(*shape)

# call everything once to trigger compilation
# verify results identical outside the boundary
np.testing.assert_allclose(
    regular_laplacian_numba_stencil(data)[1:-1, 1:-1],
    regular_laplacian_numba_jit(data)[1:-1, 1:-1]
)

%timeit -n10 regular_laplacian_numba_stencil(data)
%ptime -n4 regular_laplacian_numba_stencil(data)

%timeit -n10 regular_laplacian_numba_jit(data)
%ptime -n4 regular_laplacian_numba_jit(data)

stencil results

255 ms ± 4.88 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
Exception in thread Thread-5:
Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/srv/conda/envs/notebook/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/ptime/__init__.py", line 14, in _exec
    exec(stmt, glob, local)
  File "<string>", line 1, in <module>
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/numba/stencils/stencil.py", line 762, in __call__
    (real_ret, typemap, calltypes) = self.get_return_type(array_types)
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/numba/stencils/stencil.py", line 337, in get_return_type
    typemap, return_type, calltypes, _ = typed_passes.type_inference_stage(
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/numba/core/typed_passes.py", line 67, in type_inference_stage
    with typingctx.callstack.register(infer, interp.func_id, args):
  File "/srv/conda/envs/notebook/lib/python3.8/contextlib.py", line 113, in __enter__
    return next(self.gen)
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/numba/core/typing/context.py", line 66, in register
    raise RuntimeError(msg)
RuntimeError: compiler re-entrant to the same function signature
Total serial time:   1.18 s
Total parallel time: 0.93 s
For a 1.26X speedup across 4 threads

(not sure what is causing the error or whether it matters)

njit results

2.53 ms ± 174 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
Total serial time:   0.01 s
Total parallel time: 0.01 s
For a 0.87X speedup across 4 threads

Conclusions

I am truly puzzled why the numba functions don't parallelize well. I though the whole point of @njit (or equivalently @jit(nopython=True) was to release the GIL an enable these sorts of speedup. Would greatly appreciate if @gjoseph92 or @mrocklin could follow up, now that we have tried your suggestion, and help us understand the results.

It's very likely I have made a n00b mistake here, so thanks for your patience...

NoraLoose commented 3 years ago

Have you tried to JIT compile your regular_laplacian_numba_stencil function with numba?

@numba.njit
def regular_laplacian_numba_stencil_jit(a):
    return regular_laplacian_numba_stencil(a)

In this example, this additional step gives a speedup of factor 1000.

rabernat commented 3 years ago

Good catch Nora! Definitely did not read the documentation clearly enough 🤦 . Redefining the stencil function as

@stencil
def _regular_laplacian_numba_stencil(a):
    # Does not handle boundaries correctly
    return -4 * a[0, 0] + a[0, 1] + a[0, -1] + a[1, 0] + a[-1, 0]

@njit
def regular_laplacian_numba_stencil(a):
    return _regular_laplacian_numba_stencil(a)

Definitely brings the stencil performance on par with the standalone njit performance.

stencil + njit

2.38 ms ± 155 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
Total serial time:   0.01 s
Total parallel time: 0.01 s
For a 0.95X speedup across 4 threads

njit standalone

1.59 ms ± 35 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
Total serial time:   0.01 s
Total parallel time: 0.01 s
For a 0.86X speedup across 4 threads

However, the core problem of negative parallel scaling remains!

mrocklin commented 3 years ago

cc @seibert in case he or Siu want to get engaged

mrocklin commented 3 years ago

(no expectation though)

rabernat commented 3 years ago

Final update for the day: boundary conditions. It turns out that my njit standalone version did not handle boundaries properly because numba performs no range checking on indexing.

Here is some code that properly handles the wrap boundary conditions for both stencil and njit standalone.

@stencil
def _regular_laplacian_numba_stencil(a):
    # Does not handle boundaries correctly
    return -4 * a[0, 0] + a[0, 1] + a[0, -1] + a[1, 0] + a[-1, 0]

@njit
def regular_laplacian_numba_stencil(a):
    return _regular_laplacian_numba_stencil(a)

@njit
def pad_array(a):
    ny, nx = a.shape
    padded = np.empty((ny+2, nx+2), a.dtype)
    padded[1:-1, 1:-1] = a
    padded[0, 1:-1] = a[-1, :]
    padded[-1, 1:-1] = a[0, :]
    padded[1:-1, 0] = a[:, -1,]
    padded[1:-1, -1] = a[:, 0]
    return padded

@njit
def regular_laplacian_numba_stencil_fix_boundary(a):
    padded = pad_array(a)
    b = _regular_laplacian_numba_stencil(padded)
    return b[1:-1, 1:-1]

@njit
def regular_laplacian_numba_jit(a):
    # Does handle boundaries correctly
    ny, nx = a.shape
    out = np.zeros_like(a)
    for j in range(ny):
        for i in range(nx):
            # make sure boundaries are handled right
            ileft = (i - 1) % nx
            iright = (i + 1) % nx
            jleft = (j - 1) % ny
            jright = (j + 1) % ny
            out[j, i] = -4 * a[j, i] + a[jleft, i] + a[jright, i] + a[j, ileft] + a[j, iright]
    return out

Timing

%timeit -n10 regular_laplacian_numba_stencil(data)
%ptime -n4 regular_laplacian_numba_stencil(data)
# 2.99 ms ± 363 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
# For a 0.98X speedup across 4 threads

%timeit -n10 regular_laplacian_numba_stencil_fix_boundary(data)
%ptime -n4 regular_laplacian_numba_stencil_fix_boundary(data)
# 3.79 ms ± 222 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
# For a 0.77X speedup across 4 threads

%timeit -n10 regular_laplacian_numba_jit(data)
%ptime -n4 regular_laplacian_numba_jit(data)
# 7.13 ms ± 314 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
# For a 1.01X speedup across 4 threads