openPMD / openPMD-api

:floppy_disk: C++ & Python API for Scientific I/O
https://openpmd-api.readthedocs.io
GNU Lesser General Public License v3.0
138 stars 51 forks source link

Combine multiple files into a single file #1647

Open thmihnea opened 1 month ago

thmihnea commented 1 month ago

Combining multiple openPMD files into a single one.

Hi, I am trying to combine multiple openPMD files into a single one. Currently, I have wrote the piece of code as seen below:

from openpmd_viewer import OpenPMDTimeSeries
import openpmd_api as api

import os
import numpy as np

from koios import util

coordinates = ['x', 'y', 'z']

class SimulationCompressor(object):
    """
    This is a utility class designed specifically for
    WarpX post-processing.

    Whenever a simulation is running on a node, the idea is
    that we want to automatically cleanup data so that we
    never run out of memory.

    Our simulations currently generate openpmd files in
    the .h5 file format. The idea is to gather all these
    files, combine them into a single file, delete the
    initial files, and compress this so that it can be saved.
    """

    def __init__(self, project_path='.', fields_path='diags/field_diag'):
        """
        This is the constructor for the SimulationCompressor
        object. It populates the object's fields with the
        project path and the fields path. Also, it creates
        an OpenPMDTimeSeries object that allows us to
        view the data.

        Args:
            project_path (str): The path to the project.
            This defaults to '.'.
            fields_path (str): The path to the fields
            file. This defaults to 'diags/field_diag'.
        """
        self.project_path = project_path
        self.fields_path = fields_path

        self.data_access = OpenPMDTimeSeries(
            path_to_dir=os.path.join(project_path, fields_path),
            check_all_files=True
        )

    def delete_files(self):
        """
        This function deletes all the previous openPMD
        files within the simulation's directory.
        """
        diags_path = os.path.join(
            self.project_path,
            self.fields_path
        )
        to_delete = []
        for file in os.listdir(diags_path):
            if file.endswith('.h5') or 'openpmd' in file:
                to_delete.append(file)
        for file in to_delete:
            if file in os.listdir(diags_path):
                os.remove(
                    os.path.join(diags_path, file)
                )

    def _is_scalar(self, field):
        """
        Checks whether or not a field is scalar.

        Args:
            field (str): The field to check for.

        Returns:
            bool: Whether or not the field is scalar.
        """
        return self.data_access.fields_metadata[field]['type'] == 'scalar'

    def _get_index(self, field, coord):
        """
        Returns the mesh index for a field.

        Args:
            field (str): The field.
            coord (str): The coordinate

        Returns:
            str: Returns "" if scalar and the
            said coordinate if not scalar.
        """
        return "" if self._is_scalar(field) else coord

    def _get_field(self, field, coord, iteration):
        """
        Utility function to obtain a certain field.
        Note that this works for both scalar or vector
        fields. One can set coord=None if the field
        is scalar.

        Args:
            field (str): The field we wish to obtain.
            coord (str): The coordinate or None.
            iteration (int64): Current interation.

        Returns:
            NDArray64: Array containing field data.
        """
        return self.data_access.get_field(
            field=field,
            iteration=iteration
        ) if self._is_scalar(field) else self.data_access.get_field(
            field=field,
            iteration=iteration,
            coord=coord
        )

    def combine(self):
        """
        This function combines all the openPMD files into
        a single file.
        """
        output_file = "simulation.h5"
        series = api.Series(
            output_file,
            api.Access.create
        )

        for it in self.data_access.iterations:
            it_new = series.iterations[it]

            for field in self.data_access.avail_fields:
                for coord in coordinates:
                    data = self._get_field(
                        field,
                        coord,
                        it
                    )[0]
                    dataset = api.Dataset(
                        data.dtype, 
                        data.shape
                    )
                    array = it_new.meshes[field][self._get_index(field, coord)]
                    array.reset_dataset(dataset)
                    array[:, :] = data

        series.flush()
        series.close()

However, whenever I attempt to create the file using flush(), openPMD cannot create the file:

HDF5-DIAG: Error detected in HDF5 (1.12.2) thread 0:
  #000: H5F.c line 532 in H5Fcreate(): unable to create file
    major: File accessibility
    minor: Unable to open file
  #001: H5VLcallback.c line 3282 in H5VL_file_create(): file create failed
    major: Virtual Object Layer
    minor: Unable to create file
  #002: H5VLcallback.c line 3248 in H5VL__file_create(): file create failed
    major: Virtual Object Layer
    minor: Unable to create file
  #003: H5VLnative_file.c line 63 in H5VL__native_file_create(): unable to create file
    major: File accessibility
    minor: Unable to open file
  #004: H5Fint.c line 1898 in H5F_open(): unable to lock the file
    major: File accessibility
    minor: Unable to lock file
  #005: H5FD.c line 1625 in H5FD_lock(): driver lock request failed
    major: Virtual File Layer
    minor: Unable to lock file
  #006: H5FDsec2.c line 1002 in H5FD__sec2_lock(): unable to lock file, errno = 11, error message = 'Resource temporarily unavailable'
    major: Virtual File Layer
    minor: Unable to lock file
[AbstractIOHandlerImpl] IO Task CREATE_FILE failed with exception. Clearing IO queue and passing on the exception.

Does anybody know why exactly this fails? I am essentially trying to copy off data from each iteration within the OpenPMDTimeSeries object into a new object and then save it.

franzpoeschel commented 1 month ago

Hey @thmihnea, so what you have is multiple files such as simData_0.h5, simData_10.h5, simData_20.h5 and you want to merge them into a single one, e.g. simData.h5?

We already have a command-line tool that can do exactly that, it is installed along with openPMD-api. You can look up its documentation via openpmd-pipe --help. Something like openpmd-pipe --infile simData_%T.h5 --outfile simData.h5 should do the trick.

I'm not sure what causes the error that you see. In any case, I'd suggest running it_new.close() at the end of each Iteration to avoid building up too much memory. If you're then still interested in debugging this, the environment variable export OPENPMD_VERBOSE=1 will give more information on what is internally happening.

thmihnea commented 1 month ago

Hey @franzpoeschel, Thank you very much for the prompt response.

I have devised a workaround using h5py directly, and I achieved the result I wanted. For now, it seems that it is working properly - I will come back if I encounter any bugs and I choose to use the command.

Thank you again!