aai-institute / pyDVL

pyDVL is a library of stable implementations of algorithms for data valuation and influence function computation
https://pydvl.org
GNU Lesser General Public License v3.0
89 stars 9 forks source link

Introduce history into executor pattern for reproducibility. #416

Closed kosmitive closed 9 months ago

kosmitive commented 10 months ago

Parallel processing can result in race conditions different orders of execution. This hinders reproducibility. In order to make our code fully reproducible, we need some kind of log. Here is a class we might be able to use:

from copy import copy
from functools import partial
from typing import Callable, Generator, Generic, List, Tuple, TypeVar

T = TypeVar("T")

def wrap_index(*args, i: int, fn: Callable, **kwargs):
    return i, fn(*args, **kwargs)

class Backlog(Generic[T]):
    """A backlog is a queue of items added in no particular order. Each item has an
    index used to determine the sequence in which the items are processed. A function
    call can be modified using the wrap method to include this index in the output.
    This modification should be applied before invoking the function."""

    def __init__(self):
        self._backlog: List[Tuple[int, T]] = []
        self._n_delivered = 0
        self._n_registered = 0
        self._n_wrapped = 0

    def add(self, item: Tuple[int, T]):
        self._backlog.append(item)
        self._backlog = sorted(self._backlog, key=lambda t: t[0])
        self._n_registered += 1

    def get(self) -> Generator[T, None, None]:
        while len(self._backlog) > 0 and self._backlog[0][0] == self._n_delivered:
            self._n_delivered += 1
            yield self._backlog[0][1]
            self._backlog = self._backlog[1:]

    def wrap(self, fn: Callable) -> Callable:
        self._n_wrapped += 1
        return partial(wrap_index, fn=fn, i=copy(self._n_wrapped - 1))

It can be applied to an executor as follows:

result = ValuationResult.zeros(
    algorithm=f"semivalue-{str(sampler)}-{coefficient.__name__}",  # type: ignore
    indices=u.data.indices,
    data_names=u.data.data_names,
)

parallel_backend = init_parallel_backend(config)
u = parallel_backend.put(u)
correction = parallel_backend.put(
    lambda n, k: coefficient(n, k) * sampler.weight(n, k)
)

max_workers = effective_n_jobs(n_jobs, config)
n_submitted_jobs = 2 * max_workers  # number of jobs in the queue

sampler_it = iter(sampler)
pbar = tqdm(disable=not progress, total=100, unit="%")
backlog = Backlog[Tuple[int, float]]()

with init_executor(
    max_workers=max_workers, config=config, cancel_futures=True
) as executor:
    pending: set[Future] = set()
    while True:
        pbar.n = 100 * done.completion()
        pbar.refresh()

        completed, pending = wait(pending, timeout=1, return_when=FIRST_COMPLETED)
        for future in completed:
            backlog.add(future.result())

        for idx, marginal in backlog.get():
            result.update(idx, marginal)
            if done(result):
                return result

        # Ensure that we always have n_submitted_jobs running
        try:
            for _ in range(n_submitted_jobs - len(pending)):
                pending.add(
                    executor.submit(
                        backlog.wrap(_marginal),
                        u=u,
                        coefficient=correction,
                        sample=next(sampler_it),
                    )
                )
        except StopIteration:
            return result
mdbenito commented 10 months ago
  1. I think you mean "History" or UpdateHistory not "Backlog"
  2. Why would the order in which results are added (a commutative operation) matter?
  3. If it did matter because for some reason ValuationResult.__add__() turned out not to be commutative (a bug), then I don't think it would be enough to save the order in which the futures are completed because your log only operates on 1 iteration of the outer loop. Note that, in addition, the list of completed futures in one iteration will typically have length 1, making the backlog superfluous most of the time
  4. The only situation that the proposed solution addresses is 2 or more completed futures with one triggering the stopping criterion but not the other. I.e. if completed=[a,b], then done(result.update(a)) == True but done(result.update(b)) == False

About the implementation:

However, I don't think that this approach is going to be very fruitful

kosmitive commented 10 months ago
  1. Sure History is the correct word, or a module which reassures the blocks are delivered in the right order.
  2. If one drops the commutative property. A simple example from reinforcement learning would be distributed Q-learning. Each worker represents an agent which unrolls a trajectory from the environment and calculates the gradient w.r.t. the Q-network. Here the order in how the gradients are supplied matters as well as order on how the new Q-network weights are distributed matter for reproducibility.
  3. This happened in the tests. We could make a simpler solution, but I thought the History class might be a good option. By using a BST one could go down further in complexity.
  4. Furthermore we should generalize the execute pattern and add it into that module. With classwise Shapley we already have three sections of code duplication here.
mdbenito commented 10 months ago
  1. Then use History.
  2. We don't "drop" the commutative property of an addition because that doesn't make sense. The example about Q-learning is irrelevant since we don't do Q-learning and because the operation we perform is commutative. The problem is with the completion check, anyway.
  3. What happened? I doubt the problem is that ValuationResult.__add__ is not commutative. What you observed is not necessarily what you diagnosed. I insist that there are other ways in which execution order affects the result. For those, if one wants full reproducibility a history which is local to one iteration of the loop is not enough, as explained. You would need to ensure that results are parsed in the exact same order, across the whole run. A tree is not going to make the situation any better and we most certainly don't want any added complexity! The only way to achieve such a thing would be to pass a counter with the input to each worker and returning this counter with the results. The main process would then pool all results and only consider them in the sequence they were sent. I am not sure we would want this, even as an option.
  4. That is an altogether different discussion which does not belong here.
mdbenito commented 9 months ago

After thinking a bit about this, I am not so sure that this would be that useful. The benefit of perfect reproducibility is not really that great. After all it's repeatability that matters (i.e. the ability to obtain the same results with a fresh experiment by someone else, where "same" doesn't mean up to the last decimal digit). Also keep in mind that different RNGs, different architectures, different optimisers, and so on will all produce slightly different results, so a full trace will never get us to 100% reproducibility (which again, is not that useful).

Also, if the variability that you observe is able to shift the boundaries of a confidence interval enough to matter, something is probably amiss. Then again, if this is not the case, and for some reason order of arrival does matter in a meaningful way, I guess we can add this

mdbenito commented 9 months ago

@kosmitive Any thoughts? I think we should close this and the associated PR.

kosmitive commented 9 months ago

Okay let's close this one. I agree for reproducibility it is not really required. If we need it later we can reopen it.