Unidata / netcdf4-python

netcdf4-python: python/numpy interface to the netCDF C library
http://unidata.github.io/netcdf4-python
MIT License
752 stars 263 forks source link

Segmentation fault when writing from multiple processes into memory #1320

Closed Paklgit closed 5 months ago

Paklgit commented 5 months ago

I encounter an segmentation fault when writing into a 100000x100000 variable of a file on memory (/tmp)

version: py-netcdf4-1.6.2 python: python-3.9.9 MPI: py-mpi4py-3.1.2

Caught signal 11 (Segmentation fault: address not mapped to object at address 0x1a2473f8)
BFD: Dwarf Error: Can't find .debug_ranges section. (multiple times)
==== backtrace (tid:1515381) ====
 0 0x0000000000012cf0 __funlockfile()  :0
 1 0x000000000015cb90 shuffle_init.constprop.0()  fcoll_vulcan_file_write_all.c:0
 2 0x000000000015fbb1 mca_fcoll_vulcan_file_write_all()  ???:0
 3 0x00000000000d8189 mca_common_ompio_file_write_at_all()  ???:0
 4 0x00000000001b4407 mca_io_ompio_file_write_at_all()  ???:0
 5 0x00000000000ad7e8 PMPI_File_write_at_all()  ???:0
 6 0x000000000012e57c H5FD__mpio_write()  H5FDmpio.c:0
 7 0x000000000012b0be H5FD_write()  ???:0
 8 0x00000000001097c3 H5F__accum_write()  ???:0
 9 0x00000000002098db H5PB_write()  ???:0
10 0x000000000011411b H5F_block_write()  ???:0
11 0x00000000000dec6d H5D__mpio_select_write()  ???:0
12 0x00000000000d8516 H5D__final_collective_io()  H5Dmpio.c:0
13 0x00000000000d86d6 H5D__inter_collective_io()  H5Dmpio.c:0
14 0x00000000000ded89 H5D__contig_collective_write()  ???:0
15 0x00000000000d57b5 H5D__write()  ???:0
16 0x00000000000d5d32 H5Dwrite()  ???:0
17 0x00000000000b69b6 NC4_put_vars()  ???:0
18 0x00000000000b5c6a NC4_put_vara()  ???:0
19 0x0000000000045286 NC_put_vara()  dvarput.c:0
20 0x0000000000046512 nc_put_vara()  ???:0
21 0x000000000009e995 __pyx_pw_7netCDF4_8_netCDF4_8Variable_79_put()  _netCDF4.c:0
22 0x00000000000fe203 cfunction_call()  methodobject.c:0
23 0x0000000000028e64 __Pyx_PyObject_Call.constprop.0()  _netCDF4.c:0
24 0x0000000000121275 __pyx_pf_7netCDF4_8_netCDF4_8Variable_56__setitem__()  _netCDF4.c:0
25 0x000000000006b975 _PyEval_EvalFrameDefault()  ???:0
26 0x0000000000068f69 function_code_fastcall()  call.c:0
27 0x0000000000070716 _PyEval_EvalFrameDefault()  ???:0
28 0x00000000001a499c _PyEval_EvalCode()  :0
29 0x00000000001a4e8e _PyEval_EvalCodeWithName()  ???:0
30 0x00000000001a4edb PyEval_EvalCodeEx()  ???:0
31 0x00000000001a4f0b PyEval_EvalCode()  ???:0
32 0x00000000001e63ae run_mod()  pythonrun.c:0
33 0x00000000001e7e71 PyRun_SimpleFileExFlags()  ???:0
34 0x0000000000205d7f Py_RunMain()  ???:0
35 0x0000000000206247 Py_BytesMain()  ???:0
36 0x000000000003ad85 __libc_start_main()  ???:0
37 0x000000000040091e _start()  ???:0
=================================
srun: error: l10740: task 2: Segmentation fault
srun: Terminating StepId=10057766.38
slurmstepd: error: *** STEP 10057766.38 ON l10740 CANCELLED AT 2024-04-28T14:54:06 ***
srun: error: l10740: tasks 0-1,3: Terminated
srun: Force Terminated StepId=10057766.38

The code was executed via srun -n 4 python script.py -s 1 --rows 100000 --cols 100000 -p z -m -f /file.nc It depends on how many processes are used, e.g. -n 1 or -n 32 raise no issues.

When I tried to reproduce the output in C I encountered a similar issue when creating the data. I could solve this by ensuring the datatype of the indexing variable would support large enough numbers (unsigned long or sth like that). As the code can be executed error-free with --rows 100000 --cols 20000 (<2147483647) but not with --rows 100000 --cols 30000 (>2147483647) I suspect an integer overflow happening here.

The segmentation fault does not occur when the line 191 (v[start_for_rank+steps[step]:start_for_rank+steps[step+1],:] = values[steps[step]:steps[step+1],:]) where the values are written into the netCDF variable is passed instead.

Script:

import time
import os
from mpi4py import MPI
import numpy as np
from netCDF4 import Dataset
import argparse
import csv

rank = MPI.COMM_WORLD.rank  # The process ID (integer 0-3 for 4-process run)
n_procs = MPI.COMM_WORLD.size  # The number of processes (integer 4 for 4-process run)

parser = argparse.ArgumentParser(description="testing the output of a netcdf file")
parser.add_argument("-i", "--info", action="store_true", help="1st row for table is printed, no computation")
parser.add_argument("-m", "--memory", action="store_true", help="file will be stored on memory disk")
parser.add_argument("--CR", help="path and name (path/name), exports the average compression ratio only, appending to csv")
parser.add_argument("--CS", help="path and name (path/name), exports the average compression speed only, appending to csv")
parser.add_argument("--WS", help="path and name (path/name), exports the average writing speed only, appending to csv")
parser.add_argument("--plotsize", help="path and name (path/name), exports the complete lists of filesizes")
parser.add_argument("--plottime", help="path and name (path/name), exports the complete lists of needed times")
parser.add_argument("--plotwspeeds", help="path and name (path/name), exports the complete lists of writing speeds")
parser.add_argument("--plotcspeeds", help="path and name (path/name), exports the complete lists of compression speeds")
parser.add_argument("--plotcratios", help="path and name (path/name), exports the complete lists of compression ratios")
parser.add_argument("--independent", action="store_true", help="tasks will perform outout independently (only non-compressed variable)")
parser.add_argument("-f", "--filepath", help="path and name (path/name.nc) of the saved netCDF4 file", default="/scratch/b/b382462/test_file.nc")
parser.add_argument("-s", "--samplesize", help="sample size, how often a new netCDF4 file is created for measurement", type=int ,default=10)
parser.add_argument("-t", "--timesteps", help="number of divided sections appended successively (per rank)", type=int ,default=1)
parser.add_argument("--rows", help="number of array rows", type=int, default=100000)
parser.add_argument("--cols", help="number of array columns", type=int, default=100000)
parser.add_argument("--divcol", action="store_true", help="divide the columns onto the processes, rows are used by default")
parser.add_argument("-d", "--datatype", help="datatype of variables", default="np.float32")
parser.add_argument("-p", "--pattern", help="pattern of the array variable", default="random")
parser.add_argument("--chunks", help="chunksizes", type=int)
parser.add_argument("-c", "--compression", help="compression algorithm/compressor")
parser.add_argument("-l", "--complevel", help="compression level", type=int, default=4)
parser.add_argument("--shuffle", help="HDF5 shuffle filter (only applied if zlib compressor is used)", action="store_true")
parser.add_argument("--bloscshuffle", help="0 (no shuffle), 1 (byte-wise shuffle) or 2 (bit-wise shuffle) (only applied if blosc compressor is used)", type=int, choices=[0, 1, 2], default=2)
args = parser.parse_args()

if args.memory:
    file_path = "/tmp/" + args.filepath.rsplit('/',1)[1]
else:
    file_path = args.filepath

columns = args.cols
rows = args.rows

if args.chunks == None:
    chunks = None
else:
    chunks = (args.chunks, )

if args.divcol:
    start_for_rank = int((columns * rank)/np.double(n_procs))
    end_for_rank = int((columns * (rank + 1))/np.double(n_procs))
    length_for_rank = end_for_rank - start_for_rank
else:
    start_for_rank = int((rows * rank)/np.double(n_procs))
    end_for_rank = int((rows * (rank + 1))/np.double(n_procs))
    length_for_rank = end_for_rank - start_for_rank

floats = ["float", "floats", "float32", "floats32", "f"]
ints = ["int", "ints", "int32", "ints32", "i"]

if args.datatype in floats:
    datatype = np.float32
elif args.datatype in ints:
    datatype = np.int32
else:
    datatype = np.float32

array_size = np.dtype(datatype).itemsize * columns * rows

zeros = ["zero", "zeros", "0", "z"]
increasing = ["increasing", "growing", "rising", "incr", "grow", "i", "g"]
random = ["random", "rand", "r"]
thousands = ["thousands", "1000", "t"]
samples = ["pr", "tas"]

sample_size = args.samplesize

def get_zeros(height, width, datatype):
    return np.zeros((height, width), dtype=datatype), "zeros"

def get_thousands(height, width, datatype):
    return np.full((height, width), 1000, dtype=datatype), "thousands"

def get_increasing(height, width, datatype):
    value_row = np.arange(0, width, dtype=datatype)
    return np.tile(value_row, (height, 1)), "increasing"

def get_random01(height, width):
    return np.random.random(height*width).reshape((height, width)), "random01"

def get_random02(height, width):
    return np.random.randint(0, high=1000, size=height*width).reshape((height, width)), "random02"

def get_sample(name, height, width, offset, start):
    nc = Dataset(filename = "/home/b/b382462/sample_data/" + name + ".nc", mode = 'r', format='NETCDF4', parallel=n_procs>1, comm=MPI.COMM_WORLD,
                info=MPI.Info())
    sample_height = len(nc[name])
    sample_width = len(nc[name][0])
    startrow = offset+start
    if startrow%sample_height + height <= sample_height and width <= sample_width:
        values = nc[name][startrow%sample_height:startrow%sample_height+height,0:width]
        nc.close()
    elif width > sample_width:
        values = np.empty((height, width), dtype = np.float32)
        if height < sample_height:
            if startrow%sample_height + height < sample_height:
                values[:,0:sample_width] = nc[name][startrow%sample_height:startrow%sample_height+height,0:width]
            else:
                values[0:sample_height-startrow%sample_height,0:sample_width] = nc[name][startrow%sample_height:sample_height,0:sample_width]
                values[sample_height-startrow%sample_height:height,0:sample_width] = nc[name][0:height-(sample_height-startrow%sample_height),0:sample_width]
        else:
            values[0:sample_height-startrow%sample_height,0:sample_width] = nc[name][startrow%sample_height:sample_height,0:sample_width]
            values[sample_height-startrow%sample_height:sample_height,0:sample_width] = nc[name][0:startrow%sample_height,0:sample_width]
        nc.close()
        current_width = sample_width
        while current_width + sample_width < width:
            values[0:sample_height,current_width:current_width + sample_width] = values[0:sample_height,current_width - sample_width:current_width]
            current_width += sample_width
        for x in range(current_width, width):
            values[0:sample_height,x] = values[0:sample_height,x-sample_width]
        current_height = sample_height
        while current_height + sample_height < height:
            values[current_height:current_height + sample_height,0:width] = values[current_height - sample_height:current_height,0:width]
            current_height += sample_height
        for y in range(current_height, height):
            values[y,0:width] = values[y-sample_height,0:width]
    else:
        print('Bad Array shape for sample input')
    return values, name

def createFile(sample):
    global datatype
    global pattern 
    pattern = args.pattern
    # zeros
    if pattern.lower() in zeros:
        values, pattern = get_zeros(length_for_rank, columns, datatype)
    # thousands
    elif pattern.lower() in thousands:
        values, pattern = get_thousands(length_for_rank, columns, datatype)
    # growing Integers
    elif pattern.lower() in increasing:
        values, pattern = get_increasing(length_for_rank, columns, datatype)
    # Random Numbers
    elif pattern.lower() in random:
        if datatype == np.float32:
            values, pattern = get_random01(length_for_rank, columns)
        elif datatype == np.int32:
            values, pattern = get_random02(length_for_rank, columns)
    elif pattern == "random01":
        datatype = np.float32
        values, pattern = get_random01(length_for_rank, columns)
    elif pattern == "random02":
        datatype = np.int32
        values, pattern = get_random02(length_for_rank, columns)
    elif pattern == "pr":
        values, pattern = get_sample("pr", length_for_rank, columns, sample, start_for_rank)
    elif pattern == "tas":
        values, pattern = get_sample("tas", length_for_rank, columns, sample, start_for_rank)
    # Random Numbers if nothing else specified
    else:
        datatype = np.float32
        values, pattern = get_random01(length_for_rank, columns)

    timesteps = args.timesteps
    steps = []
    for step in range (timesteps):
        steps.append(int((length_for_rank * step)/np.double(timesteps)))
    steps.append(length_for_rank)
    MPI.COMM_WORLD.Barrier()

    if rank == 0:
        start_time = time.time()
    nc = Dataset(filename = file_path, mode = 'w', format='NETCDF4', parallel=n_procs>1, comm=MPI.COMM_WORLD,
                info=MPI.Info())
    y = nc.createDimension('height', rows)
    x = nc.createDimension('width', columns)
    v = nc.createVariable('var', datatype, (y,x), 
                          compression=args.compression, complevel=args.complevel, shuffle=args.shuffle, blosc_shuffle=args.bloscshuffle, chunksizes=chunks)

    if n_procs > 1:
        if args.independent and args.compression == None:
            v.set_collective(False)
        else:
            v.set_collective(True)

    for step in range (timesteps):
        v[start_for_rank+steps[step]:start_for_rank+steps[step+1],:] = values[steps[step]:steps[step+1],:]
    nc.close()

    MPI.COMM_WORLD.Barrier()

    if rank == 0:
        end_time = time.time()
        filesize = os.stat(file_path).st_size
        return end_time - start_time, filesize
    return 0, 0

if not args.info:
    needed_times = []
    filesizes = []
    for sample in range (sample_size):
        current_time, current_filesize = createFile(sample)
        needed_times.append(current_time)
        filesizes.append(current_filesize)
    avg_time = sum(needed_times)/sample_size
    avg_filesize = sum(filesizes)/sample_size

# print row: number of tasks, average time needed for output (s), arraysize (uncompressed, Megabytes), average filesize (Megabytes), compression ratio, Speed (MB/s)
if rank == 0:
    if args.memory:
        target = "memory\n"
    else:
        target = "Lustre\n"
    if args.compression == None:
        compression = "None"
    else:
        compression = args.compression + '(' + str(args.complevel) + ';' + str(args.bloscshuffle) + ')'
    if args.info:
        print("number of processes,average time needed[s],variablesize,datatype,pattern,arraysize[MB],average filesize[MB],compressor(complevel;shuffle),CR,CS[MB/s],WS[MB/s],location")
    else:
        result = (n_procs, avg_time, rows*columns, datatype.__name__, pattern, array_size/1000000, avg_filesize/1000000, compression, array_size/avg_filesize, array_size*sample_size/sum(needed_times)/1000000, sum(filesizes)/sum(needed_times)/1000000, target)
        print(str(n_procs) + ',' + str(round(avg_time, 3)) + ','+ str(columns) + 'x' + str(rows) + ',' + datatype.__name__ + ',' + pattern + ',' + str(round(array_size/1000000, 3)) + ',' + str(round(avg_filesize/1000000, 3)) + ',' + compression + ',' + str(round(array_size/avg_filesize, 3)) + ',' + str(round(array_size*sample_size/sum(needed_times)/1000000, 3)) + ','+ str(round(sum(filesizes)/sum(needed_times)/1000000, 3)) + ',' + target)
        if args.plottime != None:
            with open(args.plottime, 'w') as f:
                writer = csv.writer(f)
                writer.writerow(needed_times)
        if args.plotwspeeds != None:
            with open(args.plotwspeeds, 'w') as f:
                writer = csv.writer(f)
                writer.writerow(np.divide(np.divide(filesizes,needed_times), float(1000000)))
        if args.plotsize != None:
            with open(args.plotsize, 'w') as f:
                writer = csv.writer(f)
                writer.writerow(filesizes)
        if args.plotcspeeds != None:
            with open(args.plotcspeeds, 'w') as f:
                writer = csv.writer(f)
                writer.writerow(np.divide(np.divide(np.full(sample_size, array_size),needed_times), float(1000000)))
        if args.plotcratios != None:
            with open(args.plotcratios, 'w') as f:
                writer = csv.writer(f)
                writer.writerow(np.divide(np.full(sample_size, array_size),filesizes))
        if args.CR != None:
            with open(args.CR, 'a') as f:
                f.write(str(result[8]) + ',')
        if args.CS != None:
            with open(args.CS, 'a') as f:
                f.write(str(result[9]) + ',')
        if args.WS != None:
            with open(args.WS, 'a') as f:
                f.write(str(result[10]) + ',')
jswhit commented 5 months ago

can you clarify what you mean by "The segmentation fault does not occur when the line 191 (v[start_for_rank+steps[step]:start_for_rank+steps[step+1],:] = values[steps[step]:steps[step+1],:]) where the values are written into the netCDF variable is passed instead."?

Paklgit commented 5 months ago

I mean to use literally pass inside of the loop instead. I just wanted to make clear that the fault is happening inside of this line of my script and not elsewhere.

jswhit commented 5 months ago

netcdf4-python (and netcdf-c) uses size_t for array indices. I wonder if size_t is 32 bits on your platform?

Paklgit commented 5 months ago

Well, this could be possible, but it seems strange to me that there is a different behavior when using a different number of mpi processes. Nonetheless it still might be an issue on my platform/setup. The output is performed for testing purposes, so it is noncritical as it is not a common use case.

Paklgit commented 5 months ago

I could reproduce the error in C, same behavior. The segmentation fault only occurs for certain process numbers. size_t is stored in 64 bits on my system. Whatever is happening, it is not about netcdf4-python though.

jswhit commented 5 months ago

ok thanks for confirming that @Paklgit. Closing now, but feel free to reopen if need be.