Open valegui opened 1 year ago
Hi @valegui. Thanks for the issue. I need some more time to think about what you're asking.
Initially, I would say that the fact that the code is mapping every visibility to every pixel means that there is a many-to-many communication pattern which is very challenging for dask schedulers. This is probably what leads to the Out of Memory situations.
One way of "solving" this might be to assume that the image is small enough such that it can cloned and fed into each visibility chunk.
But I'll try find some more time to think about your code in more detail.
Computing the gradient (also the negative of the residual image) using a direct Fourier transform (DFT) is never going to be fast or memory efficient. Have you considered using a degridder (eg. the wgridder)? There are some utilities in codex-africanus that will do this for you, especially the residual function
https://github.com/ratt-ru/codex-africanus/blob/master/africanus/gridding/wgridder/im2residim.py
You can see how to call it by looking at this test
Basically the idea is that, since the image is usually much smaller than the visibilities, you hold the full image in memory and chunk over the visibilities along row and chan axes (the latter needs to be aligned with the number of imaging bands you are using). You can get almost arbitrarily accurate (up to 1e-12) results with this approach as long as your pixels lie on a grid. If they do not, I would compute the residual visibilities and just apply the DFT to those. You can use the vis_to_im function (also in codex-africanus)
for that. All the for loops for this implementation are in the numba layer so you shouldn't be afraid of them. It is still going to be very slow compared to the gridded version
Hi @landmanbester , thank you for your answer.
I believe your answer is not exactly what I need, as I'm looking to compute the gradient with respect to the image using every pixel. I do not want to apply gridding.
This is similar to what is done in the vis_to_im
function. It's not exactly the same, but the code you suggested to use has served as a guide to continue looking at what can be done with blockwise
.
To provide further context, I've already used a degridder to obtain the residual visibilities. The degridding is done using map_blocks
over the uv array, with the entire image in memory. As you know, this does not cause memory nor time problems due to only indexing/using a minimal slice of the image for each uv pair. The problem starts in the ms_gradient
function (in the op code) when uv is broadcasted, especially if the image has a number of pixels equal to or greater than 1024x1024.
The degridding is done using
map_blocks
over the uv array, with the entire image in memory. As you know, this does not cause memory nor time problems due to only indexing/using a minimal slice of the image for each uv pair.
You've managed to solve the problem in the degridding step by passing the image in to each chunk, now we just need to apply the same idea in the other direction.
The problem starts in the
ms_gradient
function (in the op code) when uv is broadcasted, especially if the image has a number of pixels equal to or greater than 1024x1024.
When broadcasting, far more memory is allocated than is strictly needed for computing the result. I've tried to re-express the general outline of your problem by:
The following keeps 16 cores busy adding 1's at a fairly stable 8GB of memory usage, but you'll probably want to make it do the actual gradient calculation. If you want to increase your image size, you may need to correspondingly decrease your row chunk size in order to keep the total amount of used memory stable. Experimenting with chunk sizes is somewhat of an art.
import astropy.units as un
import dask.array as da
import numba
import numpy as np
if __name__ == "__main__":
chunks = {
"row": (50,) * 2000,
"chan": (64,) * 8,
"corr": (4,),
"uvw": (3,),
}
shapes = {k: sum(v) for k, v in chunks.items()}
shape = lambda dims: tuple(shapes[d.strip()] for d in dims.split(","))
chunk = lambda dims: tuple(chunks[d.strip()] for d in dims.split(","))
uvw = da.random.random(shape("row,uvw"), chunks=chunk("row,uvw")) * 10_000
flag = da.random.choice([0, 1], shape("row,chan,corr"), chunks=chunk("row,chan,corr"))
model = (da.random.random(shape("row,chan,corr"), chunks=chunk("row,chan,corr")) +
da.random.random(shape("row,chan,corr"), chunks=chunk("row,chan,corr"))*1j)
observed = (da.random.random(shape("row,chan,corr"), chunks=chunk("row,chan,corr")) +
da.random.random(shape("row,chan,corr"), chunks=chunk("row,chan,corr"))*1j)
image = np.zeros((1024, 1024))
@numba.njit(nogil=True)
def block_gradient(uvw, model, observed, flag, image, deltax=None, deltay=None):
assert uvw.ndim == 2
assert flag.ndim == model.ndim == observed.ndim == 3
assert image.ndim == 2
nrow, nchan, ncorr = model.shape
nx, ny = image.shape
gradient = np.zeros((nrow, nx, ny))
for r in range(nrow):
u = uvw[r, 0]
v = uvw[r, 1]
w = uvw[r, 2]
for c in range(nchan):
for co in range(ncorr):
residual = model[r, c, co] - observed[r, c, co]
for yi in range(ny):
y = deltay * (yi - ny // 2)
for xi in range(nx):
x = deltax * (xi - nx // 2)
ux = u * x
vy = v * y
uv = ux + vy
gradient[r, xi, yi] += 1
return gradient
grad = da.blockwise(block_gradient, ("row", "x", "y"),
uvw, ("row", "uvw"),
model, ("row", "chan", "corr"),
observed, ("row", "chan", "corr"),
flag, ("row", "chan", "corr"),
image, ("x", "y"),
deltax=(-1*un.rad).value,
deltay=(-1*un.rad).value,
concatenate=True,
meta=np.empty((0,)*3, np.complex64))
grad.sum().compute()
I hope this bare outline captures your idea and is enough of a base for you to take the solution further.
Hi there, I'm working with Dask Array and numba to implement the gradient of $\chi²$ in a chunked/parallel way, but I'm having some memory related problems when working with a "big" image. The $\chi²$ gradient equation is:
, where $V_k^R$ is the $k$ -th residual visibility, $(x_i, y_i)$ is the image coordinate of the $i$ -th pixel according to the direction cosines of the phase tracking center, and $(u_k, v_k)$ is the sampling coordinate of the $k$ -th visibility. The current version that works for images of size 256x256, uses
blockwise
over blocks of data that are aligned on the ROW coordinate. While the image (or its indices which are the ones really used) can be a dask array, it's treated as an in-memory numpy array, and the data from the MS is broadcasted to add the 2 dimensions of the image. That is why this version does not work for bigger images, like 2048x2048 or 1024x1024. While working in a jupyter notebook, the kernel just dies because it runs out of memory. One of the problems faced to parallelize the proposed solution (with a chunked image) is that for every pixel/cell of the image array, we have to compute the gradient with every visibility, but the blocks of the data from the MS are not aligned to those of the image. This is a problem because we cannot usemap_blocks
orblockwise
to work in every chunk of the image per every chunk of the data. From the current state of the solution, I need to find a way to divide the load per chunks of the MS arrays but also per chunk of image, ideally without falling back to the use of a for-loop to iterate in every chunk of the image.What I have tried until now
Here's a simplified version of the strategy that has been working without problems for images of size 256x256. The steps are the following:
This works because the size of each block of the broadcasted array is small enough to fit in memory. Changing the size of the image with this version does not work due to memory limits (the process gets killed).
What do I need
The expected result is to get the resulting image, without memory problems. One option is to modify what has been working with smaller images, but as I mentioned before, the idea is to avoid a for-loop over the image/image-chunks, and I don't know how to do it without the loop. Any suggestion or modification on the existing code is greatly appreciated.