meteofrance / meteonet

MeteoNet's toolbox and documentation
Other
118 stars 22 forks source link

Optimize the storage #8

Open vk496 opened 4 years ago

vk496 commented 4 years ago

I would suggest you to move all the datasets to other more structured file format (NetCDF for example) instead of having csv, npz, etc.

Meanwhile, for the data that you currently have, two recommendations:

larvorg commented 4 years ago

Hello,

thank you for these advices. We will follow them and perform tests. We have to take in mind we can have potentially users with a windows environment.

larvorg commented 4 years ago

Hello,

I perform a test on observation data (NW2016). I fix the problem lat&lon (cf the other issue), I put all the data into a netcdf file with a compression level 4 (you can find it via this link : https://we.tl/t-u0WL8VJn82). I optimised also the data types. You can find the following compression parameters for each meteorological parameter (I use the xarray library which rely on the netcdf4 library) :

level = 4 param_compr = {'dim_0': {'zlib': True, 'complevel' : level, 'shuffle': True,'fletcher32' : True}, 'number_sta': {'zlib': True, 'complevel' : level, 'shuffle': True,'fletcher32' : True}, 'lat': {'zlib': True, 'complevel' : level, 'shuffle': True,'fletcher32' : True, 'least_significant_digit':2}, 'lon': {'zlib': True, 'complevel' : level, 'shuffle': True,'fletcher32' : True, 'least_significant_digit':2}, 'height_sta': {'zlib': True, 'complevel' : level, 'shuffle': True,'fletcher32' : True}, 'date': {'zlib': True, 'complevel' : level, 'shuffle': True,'fletcher32' : True},
'dd': {'zlib': True, 'complevel' : level, 'shuffle': True,'fletcher32' : True}, 'ff': {'zlib': True, 'complevel' : level, 'shuffle': True,'fletcher32' : True, 'least_significant_digit':1}, 'precip': {'zlib': True, 'complevel' : level, 'shuffle': True,'fletcher32' : True, 'least_significant_digit':1}, 'hu': {'zlib': True, 'complevel' : level, 'shuffle': True,'fletcher32' : True}, 'td': {'zlib': True, 'complevel' : level, 'shuffle': True,'fletcher32' : True, 'least_significant_digit':2},
't': {'zlib': True, 'complevel' : level, 'shuffle': True,'fletcher32' : True, 'least_significant_digit':2},
'psl': {'zlib': True, 'complevel' : level, 'shuffle': True,'fletcher32' : True}
} I remark there are other parameters like chunksizes, endian or contiguous. I don't really know how to setting them. Do you have ideas ?

vk496 commented 4 years ago

First, a better description of the sample you generated:

$ ncdump -sh NW2016_test.nc
netcdf NW2016_test {
dimensions:
        dim_0 = 21921197 ;
variables:
        int dim_0(dim_0) ;
                dim_0:_Storage = "chunked" ;
                dim_0:_ChunkSizes = 1043867 ;
                dim_0:_DeflateLevel = 4 ;
                dim_0:_Shuffle = "true" ;
                dim_0:_Fletcher32 = "true" ;
                dim_0:_Endianness = "little" ;
        int number_sta(dim_0) ;
                number_sta:_Storage = "chunked" ;
                number_sta:_ChunkSizes = 1043867 ;
                number_sta:_DeflateLevel = 4 ;
                number_sta:_Shuffle = "true" ;
                number_sta:_Fletcher32 = "true" ;
                number_sta:_Endianness = "little" ;
        float lat(dim_0) ;
                lat:_FillValue = NaNf ;
                lat:least_significant_digit = 2 ;
                lat:_Storage = "chunked" ;
                lat:_ChunkSizes = 1043867 ;
                lat:_DeflateLevel = 4 ;
                lat:_Shuffle = "true" ;
                lat:_Fletcher32 = "true" ;
                lat:_Endianness = "little" ;
        float lon(dim_0) ;
                lon:_FillValue = NaNf ;
                lon:least_significant_digit = 2 ;
                lon:_Storage = "chunked" ;
                lon:_ChunkSizes = 1043867 ;
                lon:_DeflateLevel = 4 ;
                lon:_Shuffle = "true" ;
                lon:_Fletcher32 = "true" ;
                lon:_Endianness = "little" ;
        ushort height_sta(dim_0) ;
                height_sta:_Storage = "chunked" ;
                height_sta:_ChunkSizes = 1992837 ;
                height_sta:_DeflateLevel = 4 ;
                height_sta:_Shuffle = "true" ;
                height_sta:_Fletcher32 = "true" ;
                height_sta:_Endianness = "little" ;
        int64 date(dim_0) ;
                date:units = "minutes since 2016-01-01 00:00:00" ;
                date:calendar = "proleptic_gregorian" ;
                date:_Storage = "chunked" ;
                date:_ChunkSizes = 521934 ;
                date:_DeflateLevel = 4 ;
                date:_Shuffle = "true" ;
                date:_Fletcher32 = "true" ;
                date:_Endianness = "little" ;
        float dd(dim_0) ;
                dd:_FillValue = NaNf ;
                dd:_Storage = "chunked" ;
                dd:_ChunkSizes = 1043867 ;
                dd:_DeflateLevel = 4 ;
                dd:_Shuffle = "true" ;
                dd:_Fletcher32 = "true" ;
                dd:_Endianness = "little" ;
        float ff(dim_0) ;
                ff:_FillValue = NaNf ;
                ff:least_significant_digit = 1 ;
                ff:_Storage = "chunked" ;
                ff:_ChunkSizes = 1043867 ;
                ff:_DeflateLevel = 4 ;
                ff:_Shuffle = "true" ;
                ff:_Fletcher32 = "true" ;
                ff:_Endianness = "little" ;
        float precip(dim_0) ;
                precip:_FillValue = NaNf ;
                precip:least_significant_digit = 1 ;
                precip:_Storage = "chunked" ;
                precip:_ChunkSizes = 1043867 ;
                precip:_DeflateLevel = 4 ;
                precip:_Shuffle = "true" ;
                precip:_Fletcher32 = "true" ;
                precip:_Endianness = "little" ;
        float hu(dim_0) ;
                hu:_FillValue = NaNf ;
                hu:_Storage = "chunked" ;
                hu:_ChunkSizes = 1043867 ;
                hu:_DeflateLevel = 4 ;
                hu:_Shuffle = "true" ;
                hu:_Fletcher32 = "true" ;
                hu:_Endianness = "little" ;
        float td(dim_0) ;
                td:_FillValue = NaNf ;
                td:least_significant_digit = 2 ;
                td:_Storage = "chunked" ;
                td:_ChunkSizes = 1043867 ;
                td:_DeflateLevel = 4 ;
                td:_Shuffle = "true" ;
                td:_Fletcher32 = "true" ;
                td:_Endianness = "little" ;
        float t(dim_0) ;
                t:_FillValue = NaNf ;
                t:least_significant_digit = 2 ;
                t:_Storage = "chunked" ;
                t:_ChunkSizes = 1043867 ;
                t:_DeflateLevel = 4 ;
                t:_Shuffle = "true" ;
                t:_Fletcher32 = "true" ;
                t:_Endianness = "little" ;
        float psl(dim_0) ;
                psl:_FillValue = NaNf ;
                psl:_Storage = "chunked" ;
                psl:_ChunkSizes = 1043867 ;
                psl:_DeflateLevel = 4 ;
                psl:_Shuffle = "true" ;
                psl:_Fletcher32 = "true" ;
                psl:_Endianness = "little" ;

// global attributes:
                :_NCProperties = "version=2,netcdf=4.7.3,hdf5=1.10.4" ;
                :_SuperblockVersion = 0 ;
                :_IsNetcdf4 = 1 ;
                :_Format = "netCDF-4" ;
}

The first and most important, is the data structure itself. Since NetCDF allow you to structure the data, I would not store it as line by line (as some kind of CSV inside NetCDF). Instead, I would organize it by other way:

Remark that probably you would add new points in the future, so I would set as unlimited dimensions not only the time, but also position

Regarding to the variables itself:

The compression level (4 by default) is ok. Since this would be more for archival purposes (I guess) and not operational, I would try higher levels to see if you save more space.

vk496 commented 4 years ago

If it helps, I did quickly a python script to test more or less what I described above.

import netCDF4 as nc
import csv
import numpy as np
import progressbar
from datetime import datetime

def get_params(file):

    gmp_num=set()
    time_num=set()

    print("Obtaining dimension of lat/lon")

    with open(file, mode='rt') as csv_file:
        csv_reader = csv.reader(csv_file)
        line_count = 1
        verify_points=dict()

        next(csv_reader)

        for row in progressbar.progressbar(csv_reader):
            if not row: # The file finalizes each line with \r\r\n, which is not standard. Skip when interprets new line with no info
                line_count += 1
                continue

            number_sta=row[0]
            lat=row[1]
            lon=row[2]
            # time = int(nc.date2num(datetime.strptime(row[4], '%Y%m%d %H:%M'), 'seconds since 1970-01-01 00:00'))
            time = row[4]

            gmp = (lat,lon)
            time_num.add(time)
            gmp_num.add(gmp)

            if number_sta in verify_points:
                if verify_points[number_sta] != gmp:
                    raise ValueError(f"Error!. The station ({number_sta}) already have a position ({verify_points[number_sta]}) and a new position ({gmp}) is not the same.")
            else:
                verify_points[number_sta] = gmp

            if line_count == 100000:
                break

            line_count += 1

    del verify_points
    time_total= len(time_num)
    print(f"time: {time_total}; gmp: {len(gmp_num)}")

    return gmp_num, time_num, line_count

file_test="NW2016.csv"

gmp_num, time_num, total_csv = get_params(file_test)

total_pos = len(gmp_num)

gmp_num = sorted(gmp_num)
gmp_indexes = dict()
for idx, val in enumerate(gmp_num):
    gmp_indexes[val] = idx

time_num = sorted(time_num)
time_indexes = dict()
for idx, val in enumerate(time_num):
    time_indexes[val] = idx

# total_lat, total_lon = 238, 238

file2data = nc.Dataset("DATA.nc", 'w',  format='NETCDF4')

dtime  = file2data.createDimension('time', None)
dlat  = file2data.createDimension('position', total_pos)

vtime  = file2data.createVariable('time', 'u4', ('time'), zlib=True)
vtime[:] = [ int(nc.date2num(datetime.strptime(val, '%Y%m%d %H:%M'), 'seconds since 1970-01-01 00:00')) for val in time_num]

nc_number_sta  = file2data.createVariable('number_sta', 'i4', ('position'), zlib=True)
nc_height_sta  = file2data.createVariable('height_sta', 'u2', ('position'), zlib=True)

nc_wind_direction  = file2data.createVariable('dd', 'u2', ('time','position'), zlib=True)
nc_wind_speed  = file2data.createVariable('ff', 'f', ('time','position'), least_significant_digit=1, zlib=True)
nc_precip  = file2data.createVariable('precip', 'f', ('time','position'), least_significant_digit=1, zlib=True)
nc_hu  = file2data.createVariable('hu', 'u1', ('time','position'), zlib=True)
nc_td  = file2data.createVariable('td', 'f', ('time','position'), least_significant_digit=2, zlib=True)
nc_t  = file2data.createVariable('t', 'f', ('time','position'), least_significant_digit=2, zlib=True)
nc_psl  = file2data.createVariable('psl', 'u4', ('time','position'), zlib=True)

with open(file_test, mode='rt') as csv_file:
    csv_reader = csv.reader(csv_file)
    line_count = 1
    next(csv_reader)
    with progressbar.ProgressBar(max_value=total_csv) as bar:
        dim1_number_sta=set()
        dim1_height_sta=set()

        for row in csv_reader:

            if not row:
                line_count += 1
                continue

            dd = row[5]
            ff = row[6]
            precip = row[7]
            hu = row[8]
            td = row[9]
            t = row[10]
            psl = row[11]

            time_index = time_indexes[row[4]]
            gmp_index = gmp_indexes[(row[1],row[2])]

            if row[0] not in dim1_number_sta:
                dim1_number_sta.add(row[0])
                nc_number_sta[gmp_index] = int(row[0])

            if row[3] not in dim1_height_sta:
                dim1_height_sta.add(row[3])
                nc_height_sta[gmp_index] = int(row[3].split(".")[0])

            if dd:
                nc_wind_direction[time_index,gmp_index] = int(dd.split(".")[0])

            if ff:
                nc_wind_speed[time_index,gmp_index] = float(ff)

            if precip:
                nc_precip[time_index,gmp_index] = float(precip)

            if hu:
                nc_hu[time_index,gmp_index] = int(hu.split(".")[0])

            if td:
                nc_td[time_index,gmp_index] = float(td)

            if t:
                nc_t[time_index,gmp_index] = float(t)

            if psl:
                nc_psl[time_index,gmp_index] = int(psl.split(".")[0])

            # if line_count == 100000:
            #     break

            line_count += 1
            bar.update(line_count)

    print(f'Processed {line_count} lines.')

    file2data.close()

A lot of things are missing and/or are wrong. But it can give you a general idea of how treat the data.

Is important remark that I treat the file line by line, which can be REALLY INEFFICIENT. Consider to do it in a more vectorized way (with Pandas, for example)

larvorg commented 4 years ago

Thank you for your feedback. On further consideration, we think it is better to keep the csv format about point data (ground station observations) because it a more widely used format. We want to reach as many people as possible with our dataset. The csv is really accessible for everyone and our observation data volume is not disadvantageous about the csv format. However we want to use the netcdf format about grid data (weather models data, radar data). So your remarks about data types are useful for us and these about netcdf storage will be useful for grid data.