CederGroupHub / smol

Statistical Mechanics on Lattices
https://cedergrouphub.github.io/smol/
Other
65 stars 14 forks source link

Evaluator unable to pickle, causing parallelization issues #386

Closed qchempku2017 closed 1 year ago

qchempku2017 commented 1 year ago

After PR #358 , Evaluator classes are used to calculate correlation functions. However, these classes cannot be properly pickled due to lack of "reduce" method. This makes it impossible to parallelize MC in separate processes using multiprocessing or Joblib, because the workers in these libraries require all I/O to be picklable.

Note that process-wise parallelization is still desirable even if you have implemented parallel evaluation of correlations, because people would frequently want to parallelize temperatures, chemical potentials, etc.

Current Behavior

Here is the error reported by pytest:

test setup failed
joblib.externals.loky.process_executor._RemoteTraceback: 
"""
Traceback (most recent call last):
  File "C:\Users\66422\PycharmProjects\WFacer\venv\lib\site-packages\joblib\externals\loky\backend\queues.py", line 159, in _feed
    obj_ = dumps(obj, reducers=reducers)
  File "C:\Users\66422\PycharmProjects\WFacer\venv\lib\site-packages\joblib\externals\loky\backend\reduction.py", line 215, in dumps
    dump(obj, buf, reducers=reducers, protocol=protocol)
  File "C:\Users\66422\PycharmProjects\WFacer\venv\lib\site-packages\joblib\externals\loky\backend\reduction.py", line 208, in dump
    _LokyPickler(file, reducers=reducers, protocol=protocol).dump(obj)
  File "C:\Users\66422\PycharmProjects\WFacer\venv\lib\site-packages\joblib\externals\cloudpickle\cloudpickle_fast.py", line 632, in dump
    return Pickler.dump(self, obj)
  File "stringsource", line 2, in smol.utils.cluster.evaluator.ClusterSpaceEvaluator.__reduce_cython__
TypeError: no default __reduce__ due to non-trivial __cinit__
"""
​
The above exception was the direct cause of the following exception:
​
initial_document = CeOutputsDocument(project_name='ace-work', cluster_subspace=Cluster Subspace Summary
Basis/Orthogonal/Orthonormal : in...tructures=None, enumerated_matrices=None, enumerated_features=None, undecorated_entries=None, computed_properties=None)
​
    @pytest.fixture
    def enum_output(initial_document):
>       return enumerate_structures(initial_document)
​
test_jobs.py:95: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
..\WFacer\jobs.py:239: in enumerate_structures
    new_structures, new_sc_matrices, new_features = _enumerate_structures(
..\WFacer\jobs.py:97: in _enumerate_structures
    new_structures, new_sc_matrices, new_features = generate_training_structures(
..\WFacer\enumeration.py:469: in generate_training_structures
    results = par(
..\venv\lib\site-packages\joblib\parallel.py:1944: in __call__
    return output if self.return_generator else list(output)
..\venv\lib\site-packages\joblib\parallel.py:1587: in _get_outputs
    yield from self._retrieve()
..\venv\lib\site-packages\joblib\parallel.py:1691: in _retrieve
    self._raise_error_fast()
..\venv\lib\site-packages\joblib\parallel.py:1726: in _raise_error_fast
    error_job.get_result(self.timeout)
..\venv\lib\site-packages\joblib\parallel.py:735: in get_result
    return self._return_or_raise()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
​
self = <joblib.parallel.BatchCompletionCallBack object at 0x00000120EE572220>
​
    def _return_or_raise(self):
        try:
            if self.status == TASK_ERROR:
>               raise self._result
E               _pickle.PicklingError: Could not pickle the task to send it to the workers.
​
..\venv\lib\site-packages\joblib\parallel.py:753: PicklingError

The cython class ClusterSpaceEvaluator does not have a reduce method.

Possible Solution

Implement a reduce method for all added cython classes in smol.util and test out process-wise parallelization again.

Steps to Reproduce

The problematic codes:

def generate_training_structures(
    ce,
    enumerated_matrices,
    enumerated_counts,
    previous_sampled_structures=None,
    previous_feature_matrix=None,
    keep_ground_states=True,
    num_structs=60,
    mc_generator_kwargs=None,
    n_parallel=None,
    duplicacy_criteria="correlations",
    **kwargs,
):
    """Generate training structures at the first iteration.

    Args:
        ce(ClusterExpansion):
            ClusterExpansion object initialized as null. If charge decorated,
            will contain an ewald contribution at 100%
        enumerated_matrices(list[3*3 ArrayLike[int]]):
            Previously enumerated supercell matrices. Must be the same super-cell
            size.
        enumerated_counts(list[1D ArrayLike]):
            Previously enumerated compositions in "counts" format. Must fit in
            the super-cell size.
            Note: Different super-cell sizes not supported!
        previous_sampled_structures(list[Structure]): optional
            Sample structures already calculated in past iterations.
            If given, that means you will add structures to an existing
            training set.
        previous_feature_matrix(list[list[[float]]): optional
            Correlation vectors of structures already calculated in past iterations.
        keep_ground_states(bool): optional
            Whether always to include the electrostatic ground states.
            Default to True.
        num_structs(int): optional
            Number of training structures to add at the iteration.
            At least 2~3 structures should be enumerated for each composition.
            And it is recommended that num_structs_init * 10 > 2 *
            len(supercell_and_counts).
            Default is 60.
        mc_generator_kwargs(dict): optional
            Keyword arguments for McSampleGenerator, except num_samples.
            Note: currently only support Canonical.
        n_parallel(int): optional
            Number of generators to run in parallel. Default is to use
            a quarter of cpu count.
        duplicacy_criteria(str):
            The criteria when to consider two structures as the same and
            old to add one of them into the candidate training set.
            Default is "correlations", which means to assert duplication
            if two structures have the same correlation vectors. While
            "structure" means two structures must be symmetrically equivalent
            after being reduced. No other option is allowed.
            Note that option "structure" might be significantly slower since
            it has to attempt reducing every structure to its primitive cell
            before matching. It should be used with caution.
        kwargs:
            Keyword arguments for utils.selection.select_initial_rows.
    Returns:
        list[Structure], list[3*3 list[list[int]]], list[list[float]]:
            Initial training structures, super-cell matrices,
            and normalized correlation vectors.
    """
    mc_generator_args = mc_generator_kwargs or {}
    n_parallel = n_parallel or min(cpu_count() // 4, len(enumerated_counts))
    if n_parallel == 0:
        if cpu_count() // 4 == 0:
            warn(
                f"Number of CPUs found on the executing environment: {cpu_count()} might"
                f" not be enough for parallelization! Setting parallel processes to 1."
            )
            n_parallel = 1

    previous_sampled_structures = previous_sampled_structures or []
    previous_feature_matrix = np.array(previous_feature_matrix).tolist() or []
    if len(previous_feature_matrix) != len(previous_sampled_structures):
        raise ValueError(
            "Must provide a feature vector for each" " structure passed in!"
        )

    # Scale the number of structures to select for each comp.
    num_samples = get_num_structs_to_sample(
        [counts for _ in enumerated_matrices for counts in enumerated_counts],
        num_structs,
    )

    with Parallel(n_jobs=n_parallel) as par:
        gs_id = 0
        keeps = []
        structures = []
        femat = []
        sc_matrices = []
        sc_matrix_indices = []

        for mid, sc_matrix in enumerate(enumerated_matrices):
            # This should work on pytest.
            results = par(
                delayed(_sample_single_generator)(
                    ce,
                    previous_sampled_structures + structures,
                    previous_feature_matrix + femat,
                    mc_generator_args,
                    sc_matrix,
                    counts,
                    num_sample,
                    duplicacy_criteria=duplicacy_criteria,
                )
                for counts, num_sample in zip(
                    enumerated_counts,
                    num_samples[
                        mid
                        * len(enumerated_counts) : (mid + 1)
                        * len(enumerated_counts)
                    ],
                )
            )

            for (
                gs_struct,
                gs_occu,
                gs_feat,
                samples,
                samples_occu,
                samples_feat,
                gs_dupe,
            ) in results:
                if gs_dupe:
                    structures.extend(samples)
                    femat.extend(samples_feat)
                    sc_matrices.extend([sc_matrix for _ in samples])
                    sc_matrix_indices.extend([mid for _ in samples])
                    gs_id += len(samples)
                else:
                    structures.extend([gs_struct] + samples)
                    femat.extend([gs_feat] + samples_feat)
                    sc_matrices.extend([sc_matrix for _ in range(len(samples) + 1)])
                    sc_matrix_indices.extend([mid for _ in range(len(samples) + 1)])
                    if keep_ground_states:
                        keeps.append(gs_id)
                    gs_id += len(samples) + 1

    femat = np.array(femat)

    # External terms such as the ewald term should not be taken into comparison,
    # when selecting structures
    num_external_terms = len(ce.cluster_subspace.external_terms)

    if len(previous_sampled_structures) == 0:
        # Start from scratch.
        selected_row_ids = select_initial_rows(
            femat,
            n_select=num_structs,
            keep_indices=keeps,
            num_external_terms=num_external_terms,
            **kwargs,
        )
    else:
        # Add to existing:
        selected_row_ids = select_added_rows(
            femat,
            np.array(previous_feature_matrix),
            n_select=num_structs,
            keep_indices=keeps,
            num_external_terms=num_external_terms,
            **kwargs,
        )

    # Must sort to ensure the same ordering between feature rows and structures.
    selected_row_ids = sorted(selected_row_ids)
    selected_structures = [s for i, s in enumerate(structures) if i in selected_row_ids]
    selected_matrices = [m for i, m in enumerate(sc_matrices) if i in selected_row_ids]
    selected_femat = femat[selected_row_ids, :].tolist()
    if len(selected_row_ids) < num_structs:
        warn(
            f"Expected to add {num_structs} new structures,"
            f" but only {len(selected_row_ids)}"
            f" non duplicate structures could be added."
        )
    return selected_structures, selected_matrices, selected_femat

In this function I tried to initialize a MC sampler in every parallel process.

qchempku2017 commented 1 year ago

See discussion here: (1) Usage of reduce method (2) Pickling required for process-wise parallelization

lbluque commented 1 year ago

more context here