aditya-grover / climate-learn

Source code for ClimateLearn
MIT License
310 stars 50 forks source link

CMIP6 data processing #114

Closed ajikmr closed 1 year ago

ajikmr commented 1 year ago

Hello. I am modifying quickstart.ipynb for cmip6 case. I downloaded CMIP6 data using 'download_mpi_esm1_2_hr'. When I process it next *.nc files next, it throws error. It will be good to have sample scripts for data downloading and processing for other two datasets as well (cmip6 and prism).

I even tried downloading using weatherbench and there I get some other error.

cl.data.download_mpi_esm1_2_hr( dst="./dataset/cmip6/temperature", variable="temperature", )

cl.data.download_mpi_esm1_2_hr( dst="./dataset/cmip6/geopotential", variable="geopotential", )

convert_nc2npz( root_dir="./dataset/cmip6", save_dir="./dataset/cmip6/processed", variables=["temperature", "geopotential"], start_train_year=1850, start_val_year=2000, start_test_year=2005, end_year=2015, num_shards=16 )

######################### Error Message (download_mpi_esm1_2_hr): #########################

alueError Traceback (most recent call last) Cell In[4], line 1 ----> 1 convert_nc2npz( 2 root_dir="../dataset/cmip6", 3 save_dir="../dataset/cmip6/processed", 4 variables=["temperature", "geopotential"], 5 start_train_year=1850, 6 start_val_year=2000, 7 start_test_year=2005, 8 end_year=2015, 9 num_shards=16 10 )

File /#########################/climate-learn/src/climate_learn/data/processing/nc2npz.py:189, in convert_nc2npz(root_dir, save_dir, variables, start_train_year, start_val_year, start_test_year, end_year, num_shards) 185 test_years = range(start_test_year, end_year) 187 os.makedirs(save_dir, exist_ok=True) --> 189 nc2np(root_dir, variables, train_years, save_dir, "train", num_shards) 190 nc2np(root_dir, variables, val_years, save_dir, "val", num_shards) 191 nc2np(root_dir, variables, test_years, save_dir, "test", num_shards)

File /#########################/climate-learn/src/climate_learn/data/processing/nc2npz.py:58, in nc2np(path, variables, years, save_dir, partition, num_shards_per_year) 56 for var in variables: 57 ps = glob.glob(os.path.join(path, var, f"{year}.nc")) ---> 58 ds = xr.open_mfdataset( 59 ps, combine="by_coords", parallel=True 60 ) # dataset for a single variable 61 code = NAME_TO_VAR[var] 63 if len(ds[code].shape) == 3: # surface level variables

File /#########################/lib/python3.9/site-packages/xarray/backends/api.py:1046, in open_mfdataset(paths, chunks, concat_dim, compat, preprocess, engine, data_vars, coords, combine, parallel, join, attrs_file, combine_attrs, **kwargs) 1041 datasets = [preprocess(ds) for ds in datasets] 1043 if parallel: 1044 # calling compute here will return the datasets/file_objs lists, 1045 # the underlying datasets will still be stored as dask arrays -> 1046 datasets, closers = dask.compute(datasets, closers) 1048 # Combine all datasets, closing them in case of a ValueError 1049 try:

File /#########################/lib/python3.9/site-packages/dask/base.py:595, in compute(traverse, optimize_graph, scheduler, get, args, kwargs) 592 keys.append(x.dask_keys()) 593 postcomputes.append(x.dask_postcompute()) --> 595 results = schedule(dsk, keys, kwargs) 596 return repack([f(r, a) for r, (f, a) in zip(results, postcomputes)])

File /#########################/lib/python3.9/site-packages/dask/threaded.py:89, in get(dsk, keys, cache, num_workers, pool, kwargs) 86 elif isinstance(pool, multiprocessing.pool.Pool): 87 pool = MultiprocessingPoolExecutor(pool) ---> 89 results = get_async( 90 pool.submit, 91 pool._max_workers, 92 dsk, 93 keys, 94 cache=cache, 95 get_id=_thread_get_id, 96 pack_exception=pack_exception, 97 kwargs, 98 ) 100 # Cleanup pools associated to dead threads 101 with pools_lock:

File /#########################/lib/python3.9/site-packages/dask/local.py:511, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs) 509 _execute_task(task, data) # Re-execute locally 510 else: --> 511 raise_exception(exc, tb) 512 res, worker_id = loads(res_info) 513 state["cache"][key] = res

File /#########################/lib/python3.9/site-packages/dask/local.py:319, in reraise(exc, tb) 317 if exc.traceback is not tb: 318 raise exc.with_traceback(tb) --> 319 raise exc

File /#########################/lib/python3.9/site-packages/dask/local.py:224, in execute_task(key, task_info, dumps, loads, get_id, pack_exception) 222 try: 223 task, data = loads(task_info) --> 224 result = _execute_task(task, data) 225 id = get_id() 226 result = dumps((result, id))

File /#########################/lib/python3.9/site-packages/dask/core.py:121, in _execute_task(arg, cache, dsk) 117 func, args = arg[0], arg[1:] 118 # Note: Don't assign the subtask results to a variable. numpy detects 119 # temporaries by their reference count and can execute certain 120 # operations in-place. --> 121 return func(*(_execute_task(a, cache) for a in args)) 122 elif not ishashable(arg): 123 return arg

File /#########################/lib/python3.9/site-packages/dask/utils.py:73, in apply(func, args, kwargs) 42 """Apply a function given its positional and keyword arguments. 43 44 Equivalent to func(*args, **kwargs) (...) 70 >>> dsk = {'task-name': task} # adds the task to a low level Dask task graph 71 """ 72 if kwargs: ---> 73 return func(*args, *kwargs) 74 else: 75 return func(args)

File /#########################/lib/python3.9/site-packages/xarray/backends/api.py:547, in open_dataset(filename_or_obj, engine, chunks, cache, decode_cf, mask_and_scale, decode_times, decode_timedelta, use_cftime, concat_characters, decode_coords, drop_variables, inline_array, chunked_array_type, from_array_kwargs, backend_kwargs, **kwargs) 544 kwargs.update(backend_kwargs) 546 if engine is None: --> 547 engine = plugins.guess_engine(filename_or_obj) 549 if from_array_kwargs is None: 550 from_array_kwargs = {}

File /#########################/lib/python3.9/site-packages/xarray/backends/plugins.py:197, in guess_engine(store_spec) 189 else: 190 error_msg = ( 191 "found the following matches with the input file in xarray's IO " 192 f"backends: {compatible_engines}. But their dependencies may not be installed, see:\n" 193 "https://docs.xarray.dev/en/stable/user-guide/io.html \n" 194 "https://docs.xarray.dev/en/stable/getting-started-guide/installing.html" 195 ) --> 197 raise ValueError(error_msg)

ValueError: did not find a match in any of xarray's currently installed IO backends ['netcdf4', 'scipy']. Consider explicitly selecting one of the installed engines via the engine parameter, or installing additional IO dependencies, see: https://docs.xarray.dev/en/stable/getting-started-guide/installing.html https://docs.xarray.dev/en/stable/user-guide/io.html

The downloaded and processed data is loaded into a PyTorch Lightning data module. In the following code cell, we use the following settings:

######################### Error Message (weatherbench): #########################

File /#########################/climate-learn/src/climate_learn/data/processing/nc2npz.py:189, in convert_nc2npz(root_dir, save_dir, variables, start_train_year, start_val_year, start_test_year, end_year, num_shards) 185 test_years = range(start_test_year, end_year) 187 os.makedirs(save_dir, exist_ok=True) --> 189 nc2np(root_dir, variables, train_years, save_dir, "train", num_shards) 190 nc2np(root_dir, variables, val_years, save_dir, "val", num_shards) 191 nc2np(root_dir, variables, test_years, save_dir, "test", num_shards)

File /#########################/climate-learn/src/climate_learn/data/processing/nc2npz.py:95, in nc2np(path, variables, years, save_dir, partition, num_shards_per_year) 93 else: # pressure-level variables 94 assert len(ds[code].shape) == 4 ---> 95 all_levels = ds["level"][:].to_numpy() 96 all_levels = np.intersect1d(all_levels, DEFAULT_PRESSURE_LEVELS) 97 for level in all_levels:

File /#########################/lib/python3.9/site-packages/xarray/core/dataset.py:1473, in Dataset.getitem(self, key) 1471 return self.isel(**key) 1472 if utils.hashable(key): -> 1473 return self._construct_dataarray(key) 1474 if utils.iterable_of_hashable(key): 1475 return self._copy_listed(key)

File /#########################/lib/python3.9/site-packages/xarray/core/dataset.py:1384, in Dataset._construct_dataarray(self, name) 1382 variable = self.variables[name] 1383 except KeyError: -> 1384 , name, variable = _get_virtual_variable(self._variables, name, self.dims) 1386 needed_dims = set(variable.dims) 1388 coords: dict[Hashable, Variable] = {}

File /#########################/lib/python3.9/site-packages/xarray/core/dataset.py:196, in _get_virtual_variable(variables, key, dim_sizes) 194 split_key = key.split(".", 1) 195 if len(split_key) != 2: --> 196 raise KeyError(key) 198 ref_name, var_name = split_key 199 ref_var = variables[ref_name]

KeyError: 'level'

jasonjewik commented 1 year ago

Hi @ajikmr, thank you for using our package. Can you confirm that you have installed netCDF4? It should be installed along with ClimateLearn's other dependencies (see pyproject.toml).

If you do not have netCDF4 in your Python environment, it can be installed via pip or conda.

pip install netCDF4
conda install -c conda-forge netCDF4

This package is a dependency for xarray, which is used by ClimateLearn to process .nc files. Please see the netCDF4 documentation for further details. Once you have done that, please try to process the WeatherBench data again.

ajikmr commented 1 year ago

Thanks for your response @jasonjewik . I am able to run the example Quickstart.ipynb, and so I think my basic set-up is OK. I have been able to run 'era5' dataset using scripts as well. Only when I try to run 'cmip6' dataset, I get some error. I have netCDF4 installed. Let me try to figure it out and I will reach out again if error persists.

tung-nd commented 1 year ago

Hi @ajikmr

Were you able to figure out the issue? If not, please post a screenshot of the error here so we can investigate. Thanks!

ajikmr commented 1 year ago

Hello @tung-nd and @jasonjewik . First of all many thanks to your team for both climate-learn and ClimaX packages.

Once I got your message, I gave it a try again. And now 'cmip6' run is working. Dataset was downloaded using 'download_weatherbench' ( 'download_mpi-esm1-2-hr' did not work as .nc files downloaded are too small - 401bytes . 'update': It is working now. I was not passing correct variable name earlier. Downloads data in similar chunks of 5 years.) . I had to make few changes which I am listing below, as first issue might need to be fixed in the package.

  1. Dataset source error: In file experiments/forecasting/cmip6_cmip6_deep_learning.py added at line 61 src="cmip6", In file src/climate_learn/data/iterdataset.py elif src == "mpi-esm1-2-hr" or src == "cmip6": # added cmip6

  2. Single variable error in dataset processing (post downloading). Temporary fix for now In file experiments/forecasting/cmip6_cmip6_deep_learning.py, comment out air_temperature (line 33), and change in_channels and out channel to 35 and 2 respectively

    "air_temperature",

  3. Early stopping callback error: Temporary fix In file experiments/forecasting/cmip6_cmip6_deep_learning.py early_stopping = "train/lat_mse:aggregate" # val throws error, hence train

ajikmr commented 1 year ago

Processing 'cmip6' data from 'download_weatherbench' required some modifications.

In file src/climate_learn/data/processing/nc2npz.py . As five year data is in one file, at line 57 year_tmp=year - (year%5)

year_tmp=year

        ps = glob.glob(os.path.join(path, var, f"*_{year_tmp}*.nc"))

change level to plev and divide by 100, line 95

all_levels = ds["level"][:].to_numpy()

            all_levels = ds["plev"][:].to_numpy()/100.

change level to plev , line 98

ds_level = ds.sel(level=[level])

                ds_level = ds.sel(plev=[level*100.])

For air_temperature , downloaded file is a zip file? It does not unzip but has *.zip extension. I checked at the web-site. It seems they do not have air_temperature varibale (download_weatherbench).

ajikmr commented 1 year ago

Hello @tung-nd and @jasonjewik . I have been able to make some more runs. Thanks. I came across you 'climax-all' repo where you have files to process different data sources.