underworldcode / UWGeodynamics

Underworld Geodynamics
Other
81 stars 32 forks source link

Hill slope diffusion of surface over multiple CPUs #182

Closed bknight1 closed 4 years ago

bknight1 commented 4 years ago

Hi @julesghub @rbeucher,

I've managed to get a basic hill slope diffusion working on a single cpu, however it fails when running on multiple processors. I'm looking into parallelising it but can't seem to get it to work. Any help on getting it to run would be great.

Here's the working code in serial:

def Hillslope_diffusion_basic():
    from scipy.interpolate import interp1d
    from scipy.interpolate import InterpolatedUnivariateSpline

    ### Copies the new surface from the old eroded surface
    surface_tracers_pre_erosion.data[:,0] = surface_tracers_erosion.data[:,0].copy()
    surface_tracers_pre_erosion.data[:,1] = surface_tracers_erosion.data[:,1].copy()

    ### gets the x and y coordinates from the tracers
    x = GEO.dimensionalise(surface_tracers_erosion.data[:,0], u.meter).magnitude
    z = GEO.dimensionalise(surface_tracers_erosion.data[:,1], u.meter).magnitude

    #### gets the distance between particles, estimate due to changing evolving surface
    dx = ((model_length.to(u.meter).magnitude) / npoints)

    ### time to diffuse surface based on model dt
    total_time = (GEO.dimensionalise((Model._dt or 0), u.year)).magnitude

    #### diffusion constant for surface, m^2/year
    D = 0.5
    dt = min((0.2 * dx * dx / D), total_time)

    nts = int(round(total_time/dt))

    print('timestep ', dt)
    print('Number of its ', nts)

    z_orig = z.copy()

    ### Hillslope diffusion
    for i in range(nts):
        qs = -D * np.diff(z)/dx
        dzdt = -np.diff(qs)/dx
        z[1:-1] += dzdt*dt

    z_nd = GEO.nd(z*u.meter)

    # # ### creates a new function for the eroded surface to use to reassign the swarm variables every 1 km
    x_new = np.linspace(GEO.nd(Model.minCoord[0]+1.*u.kilometer), GEO.nd(Model.maxCoord[0]), npoints)
    f = interp1d(x_new, z_nd, fill_value='extrapolate')
    # f = InterpolatedUnivariateSpline(x, z, k=1.)

    y_eroded_surface = f(x_new)

    surface_tracers_erosion.data[:,0] = x_new
    surface_tracers_erosion.data[:,1] = y_eroded_surface

    Model.materialField.data[(Model.swarm.data[:,1] > f(Model.swarm.data[:,0])) & (Model.materialField.data[:,0] != air.index)] = air.index
    Model.materialField.data[(Model.swarm.data[:,1] < f(Model.swarm.data[:,0])) & (Model.materialField.data[:,0] == air.index)] = Sediment.index

Cheers

bknight1 commented 4 years ago

Forgot to add the output, the error is coming from the interp1d as the particle tracers on the surface are over 2 (or more CPUs) so the length of z_nd (which is split over the CPUs) and x_new (the new x coordinates to interpolate over) are different.

I guess I can either (1) somehow gather the data on 1 CPU, do the calculation for the whole surface, and send it back out. or (2) do it over each seperate section to create the own surface on each section of the cpu, but might cause issues on the boundary between nodes.

surface calculation
total time: 160074.55343578712 timestep: 5760.0 No. of its: 28

x_new shape: 10000, y_nd shape: 4987

An uncaught exception appears to have been raised by all processes. Set the 'UW_ALL_MESSAGES' environment variable to see all messages. Rank 0 message is:
Traceback (most recent call last):
  File "Conv_One_Layer-27Apr20-Erosion-1-LR.py", line 1012, in <module>
    Model.run_for(Total_Time * u.megayears, checkpoint_interval=The_Checkpoint_interval*u.megayears)
  File "/opt/venv/lib/python3.7/site-packages/UWGeodynamics/_model.py", line 1651, in run_for
    self._post_solve()
  File "/opt/venv/lib/python3.7/site-packages/UWGeodynamics/_model.py", line 1669, in _post_solve
    val()
  File "Conv_One_Layer-27Apr20-Erosion-1-LR.py", line 736, in Hillslope_diffusion_basic
    f = interp1d(x_new, z_nd, kind='cubic', fill_value='extrapolate')
  File "/usr/lib/python3.7/site-packages/scipy/interpolate/interpolate.py", line 431, in __init__
    _Interpolator1D.__init__(self, x, y, axis=axis)
  File "/usr/lib/python3.7/site-packages/scipy/interpolate/polyint.py", line 60, in __init__
    self._set_yi(yi, xi=xi, axis=axis)
  File "/usr/lib/python3.7/site-packages/scipy/interpolate/polyint.py", line 125,in _set_yi
    raise ValueError("x and y arrays must be equal in length along "
ValueError: x and y arrays must be equal in length along interpolation axis.
application called MPI_Abort(comm=0x84000002, 1) - process 1
application called MPI_Abort(comm=0x84000004, 1) - process 0
jmansour commented 4 years ago

Hey Ben

I think the best starting place would be (as you suggest) to gather the required data to the root process, and then broadcast the required array back out at the end of the processing.

Note however that the ordering of gathered data can be problematic as you are stitching a bunch of arrays from different procs together. At tracer particle creation time, you might like to add a unique identifier (@rbeucher, is this built into UWGeo?) which allows you to fix the ordered after the data has been gathered back to root.

For the broadcast part, the ordering is not an issue as you will take a fixed array and send it out to all procs.

You will use mpi4py to achieve all this. There's a bunch of tutorials about, but let us know if you can't find anything suitable. mpi4py is quite nice to use as it integrates with numpy so much of the fiddly details are sorted. It's worth your time in any case learning this tool.

rbeucher commented 4 years ago

Hi have something half baked to do that.

The ordering pb is already solved in UWGeo. The next pb is that as you deform the model some particles may get quite far apart. I want to introduce a new class call Strings that take care of the pb. Basically a set of tracers that forms a line, the line is re-interpolated and re-sampled after n-steps by injecting new points where needed...or removing some. You would be able to apply any function to it, including diffusion. Then erosion / sedimentation is just a matter of checking what is above and below..

I have something half-baked. Happy to get some help on that @bknight1 if you need it.

rbeucher commented 4 years ago

So in UWGeo, Passive tracers have a unique id at creation time. You can use that to sort the points.

rbeucher commented 4 years ago

Note that @arijitlaik did something like what you are trying to do for his research project. It worked well

bknight1 commented 4 years ago

Nice one thanks @jmansour, I've had a try with np arrays but can't seem to get my head around the gathering and re-scattering the data.

I agree @rbeucher, I was having the issue with tracking the surface with an inflow boundary condition as you couldn't track the surface near the boundary. I managed to fix it by resetting the x coordinates each timestep and then extrapolating the line across the new coordinates and it worked well in serial. I guess the shapes of the np arrays will also be an issue when re projecting the values back. I'll have a look at how @arijitlaik implemented it too. Thanks for your help

tingyang2004 commented 4 years ago

Hi all,

the scripts I put in this issue https://github.com/underworldcode/underworld2/issues/282 should be able to tackle surface processes in multiple CPUs well.

Best regards, Ting

tingyang2004 commented 4 years ago

Once the array that records surface topography 'gridt' is generated, its horizontal coordinate does not change with time, only the vertical coordinate changes in response to surface processes and topographic advection. This is done on the root processor as mentioned by @jmansour

bknight1 commented 4 years ago

Awesome, thanks for that @tingyang2004 I'll have a look into it!

bknight1 commented 4 years ago

Hi @rbeucher, I was just looking at the PassiveTracers class and seen it also takes a zOnly parameter, so I assume that just moves the surface up and down? Similar to what @tingyang2004 had implemented for the surface. I'll see how if it wrks and report back

rbeucher commented 4 years ago

Yes exactly

bknight1 commented 4 years ago

okay awesome seems to be working okay now, will run some tests. I'll close this for now.

Here's the code if anyone is looking to implement something similar

### point every 1 km
npoints = int(Model.maxCoord[0].to(u.kilometer).magnitude) # This is the number of points used to define the surface
x_surface = np.linspace(GEO.nd(Model.minCoord[0]), GEO.nd(Model.maxCoord[0]), npoints)
y_surface = -0.01 * u.kilometer

surface_tracers_no_erosion = Model.add_passive_tracers(name="Surface-NoErosion", vertices=[x_surface,y_surface], zOnly=True)
surface_tracers_erosion = Model.add_passive_tracers(name="Surface-Erosion", vertices=[x_surface,y_surface], zOnly=True)

def Hillslope_diffusion_basic():
    from scipy.interpolate import interp1d
    from scipy.interpolate import InterpolatedUnivariateSpline

    ### gets the x and y coordinates from the tracers
    x = GEO.dimensionalise(surface_tracers_erosion.data[:,0], u.meter).magnitude
    z = GEO.dimensionalise(surface_tracers_erosion.data[:,1], u.meter).magnitude

    #### gets the distance between particles
    dx = (Model.maxCoord[0].to(u.meter).magnitude)/npoints

    ### time to diffuse surface based on model dt
    total_time = (GEO.dimensionalise(Model._dt, u.year)).magnitude

    #### diffusion constant for surface, m^2/year
    D = ((1.0e3 * u.meter**2 / u.year).to(u.meter**2 / u.year)).magnitude

    dt = min((0.2 * dx * dx / D), total_time)

    nts = int(round(total_time/dt))

    print('total time:', total_time, 'timestep:', dt, 'No. of its:', nts)

    z_orig = z.copy()

    ### Basic Hillslope diffusion
    for i in range(nts):
        qs = -D * np.diff(z)/dx
        dzdt = -np.diff(qs)/dx
        z[1:-1] += dzdt*dt

    x_nd = GEO.nd(x*u.meter)

     ### nd the diffused surface
    z_nd = GEO.nd(z*u.meter)

    ### creates function for the new surface that has eroded, only if there are x coords
    if x_nd.shape[0] > 0.:
        f1 = interp1d(x_nd, z_nd, fill_value='extrapolate', kind='nearest')

        # f = InterpolatedUnivariateSpline(x, z_nd, k=1.)

        ### reasigns the surface
        y_eroded_surface = f1(x_nd)

        #### Gets messed up near to the inflow
        y_eroded_surface[x_nd < GEO.nd(Update_material_LHS_Length*u.kilometer )] = 0.

        surface_tracers_erosion.data[:,1] = y_eroded_surface

        Model.materialField.data[(Model.swarm.data[:,1] > f1(Model.swarm.data[:,0])) & (Model.materialField.data[:,0] != air.index)] = air.index
        Model.materialField.data[(Model.swarm.data[:,1] < f1(Model.swarm.data[:,0])) & (Model.materialField.data[:,0] == air.index)] = Sediment.index
bknight1 commented 4 years ago

I've managed to get it working on 2 processors locally, however when running on the cluster, the MPI broadcast doesn't send the data to each node. Here's the code:

def Erosion_diffusion_multiple_cpu():
    from scipy.interpolate import interp1d
    from scipy.interpolate import InterpolatedUnivariateSpline
    from mpi4py import MPI

    ### collect surface data on each node
    data = np.copy(np.ascontiguousarray(surface_tracers_erosion.data[:]))
    ### sort by ascending x values
    data = data[np.argsort(data[:, 0])]

    #### empty numpy array to copy to create the spline function
    ### empty variables
    surface_data = np.zeros((npoints,2))

    # if uw.mpi.rank == 0:
    #     ### create buffer for the non deformed surface data
    #     surface_data = np.zeros((npoints,2))

    uw.mpi.comm.barrier()

    uw.mpi.comm.Gather(data, surface_data, root=0)
    print('gathered from ranks:', uw.mpi.rank, flush=True)

    if uw.mpi.rank == 0:

        ### sort data into ascending x coord
        surface_data = surface_data[np.argsort(surface_data[:, 0])]

        ### Erosion surface on the gathered data

        ### gets the x and y coordinates from the tracers
        x = GEO.dimensionalise(surface_data[:,0], u.meter).magnitude
        z = GEO.dimensionalise(surface_data[:,1], u.meter).magnitude

        #### gets the distance between particles, estimate due to changing evolving surface
        dx = (Model.maxCoord[0].to(u.meter).magnitude)/npoints

        ### time to diffuse surface based on model dt
        total_time = (GEO.dimensionalise(Model._dt, u.year)).magnitude

        #### diffusion constant for surface, m^2/year
        D = ((1.0e2 * u.meter**2 / u.year).to(u.meter**2 / u.year)).magnitude

        dt = min((0.2 * dx * dx / D), total_time)

        # nts = int(round(total_time/dt))
        nts = int(round(total_time/dt))

        print('total time:', total_time, 'timestep:', dt, 'No. of its:', nts, flush=True)

        z_orig = z.copy()

        ### Basic Hillslope diffusion
        for i in range(nts):
            qs = -D * np.diff(z)/dx
            dzdt = -np.diff(qs)/dx
            z[1:-1] += dzdt*dt

        x_nd = GEO.nd(x*u.meter)

        z_nd = GEO.nd(z*u.meter)

        ### creates function for the new surface that has eroded
        f1 = interp1d(x_nd, z_nd, fill_value='extrapolate', kind='nearest')

        # f = InterpolatedUnivariateSpline(x, z_nd, k=1.)

        ### reasigns the surface
        y_eroded_surface = f1(x_nd)

        #### Gets messed up near to the inflow
        y_eroded_surface[x_nd < GEO.nd(Update_material_LHS_Length*u.kilometer )] = 0.

        surface_data[:,0] = x_nd
        surface_data[:,1] = y_eroded_surface

        print('finished surface process on rank:', uw.mpi.rank, flush= True)

    #
    # # print('pre broadcast', flush=True)
    # # ### broadcast spline to every node to update material
    # # f1 = uw.mpi.comm.bcast(f1, root=0)
    #

    uw.mpi.comm.Bcast([surface_data, MPI.DOUBLE], root=0)
    print('broadcast to rank:', uw.mpi.rank, flush=True)

    uw.mpi.comm.barrier()

and here's the output:

Step:     1 Model Time: 184823.5 year dt: 184823.5 year (2020-05-09 12:32:59)
gathered from ranks: 1
gathered from ranks: 2
gathered from ranks: 3
gathered from ranks: 0
total time: 184823.47748009962 timestep: 20.0 No. of its: 9241
finished surface process on rank: 0
broadcast to rank: 0
broadcast to rank: 1
broadcast to rank: 2

so it's hanging at the barrier as rank # 3 hasn't received the data. Any ideas would be great as the issue only occurs when on a larger number of cpus.

Cheers

bknight1 commented 4 years ago

All good, realised I couldn't gather from certain processors that didn't have any data stored. Created a sub communicator group to handle only the nodes that had data on them:

def Erosion_diffusion_multiple_cpu():
    from scipy.interpolate import interp1d
    from scipy.interpolate import InterpolatedUnivariateSpline
    from mpi4py import MPI

    ### collect surface data on each node
    data = np.copy(np.ascontiguousarray(surface_tracers_erosion.data[:]))
    ### sort by ascending x values
    data = data[np.argsort(data[:, 0])]

    #### empty numpy array to copy to create the spline function
    ### empty variables
    surface_data = np.zeros((npoints,2))

    ### split comms up between 1 and 2, 1 with data 2 without
    if data.shape[0] > 0.:
        color = 1
        f1 = None
        print(uw.mpi.rank, color)
    else:
        color = 2
        print(uw.mpi.rank, color)

    uw.mpi.comm.barrier()

    local_comm = uw.mpi.comm.Split(color)
    local_rank = local_comm.Get_rank()

    # print('local_comm', local_comm, 'main comm', uw.mpi.comm)
    # print('local_rank', local_rank, 'main rank', uw.mpi.rank)
    if color == 1:

        local_comm.Gather(data, surface_data, root=0)

        if local_rank == 0:

            # if surface_data.shape[0]:

            ### sort data into ascending x coord
            surface_data = surface_data[np.argsort(surface_data[:, 0])]

            ### Erosion surface on the gathered data

            ### gets the x and y coordinates from the tracers
            x = GEO.dimensionalise(surface_data[:,0], u.meter).magnitude
            z = GEO.dimensionalise(surface_data[:,1], u.meter).magnitude

            #### gets the distance between particles, estimate due to changing evolving surface
            dx = (Model.maxCoord[0].to(u.meter).magnitude)/npoints

            ### time to diffuse surface based on model dt
            total_time = (GEO.dimensionalise(Model._dt, u.year)).magnitude

            #### diffusion constant for surface, m^2/year
            D = ((1.0e2 * u.meter**2 / u.year).to(u.meter**2 / u.year)).magnitude

            dt = min((0.2 * dx * dx / D), total_time)

            # nts = int(round(total_time/dt))
            nts = int(round(total_time/dt))

            print('total time:', total_time, 'timestep:', dt, 'No. of its:', nts, flush=True)

            z_orig = z.copy()

            ### Basic Hillslope diffusion
            for i in range(nts):
                qs = -D * np.diff(z)/dx
                dzdt = -np.diff(qs)/dx
                z[1:-1] += dzdt*dt

            x_nd = GEO.nd(x*u.meter)

            z_nd = GEO.nd(z*u.meter)

            ### creates function for the new surface that has eroded
            f1 = interp1d(x_nd, z_nd, fill_value='extrapolate', kind='nearest')

            # f = InterpolatedUnivariateSpline(x, z_nd, k=1.)

            ### reasigns the surface
            y_eroded_surface = f1(x_nd)

            #### Gets messed up near to the inflow
            y_eroded_surface[x_nd < GEO.nd(Update_material_LHS_Length*u.kilometer )] = 0.

            surface_data[:,0] = x_nd
            surface_data[:,1] = y_eroded_surface

            print('finished surface process on rank:', uw.mpi.rank, flush= True)

        local_comm.barrier()

        f1 = local_comm.bcast(f1, root=0)

        local_comm.barrier()

        local_comm.Bcast([surface_data, MPI.DOUBLE], root=0)
        print('broadcast to rank:', local_rank, uw.mpi.rank, flush=True)

        local_comm.barrier()

        print('replace data on:',local_rank, uw.mpi.rank, flush=True)
        ### replaces the new diffused surface data
        # surface_tracers_erosion.data[:,0] = surface_data[:, 0][(surface_data[:, 0] >= min(surface_tracers_erosion.data[:,0])) &  (surface_data[:, 0] <= max(surface_tracers_erosion.data[:,0]))]
        surface_tracers_erosion.data[:,1] = f1(surface_tracers_erosion.data[:,0]) #surface_data[:, 1][(surface_data[:, 0] >= min(surface_tracers_erosion.data[:,0])) &  (surface_data[:, 0] <= max(surface_tracers_erosion.data[:,0]))]

        print('update material on',local_rank, uw.mpi.rank, flush=True)
        Model.materialField.data[(Model.swarm.data[:,1] > f1(Model.swarm.data[:,0])) & (Model.materialField.data[:,0] != air.index)] = air.index
        Model.materialField.data[(Model.swarm.data[:,1] < f1(Model.swarm.data[:,0])) & (Model.materialField.data[:,0] == air.index)] = Sediment.index

    #
        print('finished updating material on local rank:', local_rank, 'global rank:', uw.mpi.rank, flush=True)

    else:
        pass

    uw.mpi.comm.barrier()
jmansour commented 4 years ago

Oh, that's disappointing the mpi4py doesn't handle this naturally. Good work with the solution in any case. If it gets unwieldy, a further option might be to simply send 1 dummy bit of data from all procs, and simply discard that the root proc.

tingyang2004 commented 4 years ago

I have only worked on surface processes for models on one node (up to 24 processors) and did not find any issue. But thanks for pointing this out and solve it.

bknight1 commented 4 years ago

Yeah was a struggle trying to work out why it was hanging. Thanks everyone for your help