zarr-developers / zarr-python

An implementation of chunked, compressed, N-dimensional arrays for Python.
https://zarr.readthedocs.io
MIT License
1.53k stars 283 forks source link

How to limit memory usage for large datasets? #1687

Open gunnhildsp opened 8 months ago

gunnhildsp commented 8 months ago

Zarr version

2.16.1

Numcodecs version

0.12.1

Python Version

3.12.1

Operating System

Linux

Installation

using poetry in a virtual environment

Description

I am trying to write a zarr dataset using netcdf files. To try to limit memory usage, I am first creating daily zarr directories from hourly netcdf files, using xarray. Then I am combining the daily files into a monthly zarr directory. I finally want to write the monthly zarr to an azure blob storage. However, the process is killed (no stacktrace), I assume from running out of memory, when combining the daily zarrs to monthly. If I create a smaller final directory, for example combining two daily zarrs to one, it works fine. I am using xarray version 2024.2.0

Steps to reproduce

from datetime import datetime
from pathlib import Path

import pandas as pd
import xarray as xr
import zarr
from loguru import logger

#from met_data_to_horizon.horizon import (
#    get_blob_service_client,
#    get_container_client,
#)

weather_parameters_to_keep = [
    "air_temperature_2m",
    "air_pressure_at_sea_level",
    "altitude",
    "cloud_area_fraction",
    "precipitation_amount",
    "relative_humidity_2m",
    "wind_direction_10m",
    "wind_speed_10m",
]

# https://esipfed.github.io/cloud-computing-cluster/optimization-practices.html#chunk-size
# zarr tutorial recommends at least 1mb
# dask recommends ~100mb
# cloud usage - on the order of 10 - 100 mb
CHUNK_SIZES = {"time": 31 * 24, "x": 10, "y": 10}

def create_thredds_filename(date: pd.Timestamp) -> str:
    return f"https://thredds.met.no/thredds/dodsC/metpparchive/{date.year}/{date.month:02d}/{date.day:02d}/met_analysis_1_0km_nordic_{date.strftime('%Y%m%dT%H')}Z.nc"

def read_monthly_netcdf_and_write_zarr_to_processed(year_and_month_folder: str) -> None:
    start_date_month = pd.to_datetime(f"{year_and_month_folder}", format="%Y%m")
    days_in_month = pd.date_range(
        start=start_date_month,
        end=start_date_month + pd.tseries.offsets.MonthEnd(0) + pd.to_timedelta("23h"),
        freq="D",
    )

    zarr_filename_in_horizon = (
        f"met_analysis_1_0km_nordic_{start_date_month.year}_{start_date_month.month:02d}"
    )
    data_dir = Path(__file__).parent / "data"
    local_nc_file_dir = data_dir / "tmp_nc"
    daily_zarr_dir = data_dir / "tmp_zarr_per_day"
    local_nc_file_dir.mkdir(parents=True, exist_ok=True)
    daily_zarr_dir.mkdir(parents=True, exist_ok=True)

    for day in days_in_month:
        day_str = f"{day.day:02d}"
        day_nc_path = local_nc_file_dir / day_str

        download_nc_files_per_day(day, day_nc_path)

        write_daily_zarr_files_to_local_folder(
            daily_zarr_dir, day_nc_path, day_str, zarr_filename_in_horizon
        )
        # shutil.rmtree(day_nc_path)

    write_monthly_zarr_to_horizon_processed(daily_zarr_dir, zarr_filename_in_horizon)
    # shutil.rmtree(daily_zarr_dir)

def download_nc_files_per_day(day: datetime, nc_path_per_day: Path) -> None:
    hours_in_day = pd.date_range(start=day, end=day + pd.Timedelta("23h"), freq="h")
    nc_path_per_day.mkdir(parents=True, exist_ok=True)
    for timestamp in hours_in_day:
        filename = create_thredds_filename(timestamp)
        local_file_path = nc_path_per_day / Path(filename).name
        if not local_file_path.exists():
            # in my use case, we have a separate python service which downloads the netcdf files to
            # our azure data platform, so in this step we download the data from the azure storage,
            # but for a reproducible example, we'll download the netcdf files directly
            xr.open_dataset(filename, engine="netcdf4").to_netcdf(local_file_path)
            logger.info(f"Downloaded {local_file_path.name}")

def write_daily_zarr_files_to_local_folder(
    daily_zarr_dir: Path,
    day_nc_path: Path,
    day_str: str,
    zarr_filename_in_horizon: str,
    chunk_sizes: dict | None = None,
) -> None:
    if chunk_sizes is None:
        chunk_sizes = CHUNK_SIZES
    day_zarr_folder = daily_zarr_dir / f"{zarr_filename_in_horizon}_{day_str}"
    if not day_zarr_folder.exists():
        ds = xr.open_mfdataset(day_nc_path.glob("*.nc"), engine="netcdf4", combine="by_coords")[
            weather_parameters_to_keep
        ]
        logger.info("read dataset")
        ds = ds.chunk(chunk_sizes)
        logger.info("finished chunking")
        ds.to_zarr(day_zarr_folder)
        logger.info(f"created {day_str} zarr")

def write_monthly_zarr_to_horizon_processed(
    daily_zarr_dir: Path,
    zarr_filename_in_horizon: str,
    chunk_sizes: dict | None = None,
) -> None:
    if chunk_sizes is None:
        chunk_sizes = CHUNK_SIZES

    logger.info("Combining daily files to monthly..")
    # The line below is where the process gets killed
    ds = xr.open_mfdataset(daily_zarr_dir.glob("*"), engine="zarr", combine="by_coords")
    logger.info("Done combining")
    logger.info("Chunking data..")
    ds = ds.chunk(chunk_sizes)
    logger.info("Done chunking")

    logger.info("Writing monthly data to horizon...")

    # This part writes to our private cloud storage 
        # store = zarr.ABSStore(
    #     client=get_container_client(get_blob_service_client(layer="processed")),
    #     prefix=f"zarr/chunk_size_10/{zarr_filename_in_horizon}.zarr",
    # )
    # try:
    #     ds.to_zarr(store)
    # except Exception as e:
    #     logger.info(f"Could not write monthly data to Horizon, error was {e}")

if __name__ == "__main__":
    year_and_month_folder = "202312"
    read_monthly_netcdf_and_write_zarr_to_processed(year_and_month_folder)

Additional output

No response

d-v-b commented 8 months ago

From the zarr-python perspective, minimizing memory usage is easy to say -- "load fewer chunks, make your chunks smaller". But your problem seems like an xarray issue? I'm not familiar with what happens inside xarray.open_mfdataset. It might be helpful to open an issue or discussion over at https://github.com/pydata/xarray/