mlbench / mlbench-core

MLBench Framework Core Python Library
https://mlbench.github.io
Apache License 2.0
16 stars 10 forks source link

Customize Communication Scheme For Sparsified/Quantizatized/Decentralized scenarios #12

Closed liehe closed 4 years ago

liehe commented 5 years ago

For the moment, when we aggregate the gradients, we use something like

model = ...
output = model(inputs)
loss = loss_function(output, target)
loss.backward()

# Aggregate Gradients from all Nodes using all reduce.
aggregate_gradients(model, config)

where the aggregate_gradients is an all-reduce

def aggregate_gradients(model, config):
    """Average gradients of models across all processes."""
    # all_reduce the gradients.
    for ind, param in enumerate(model.parameters()):
        # all reduce.
        dist.all_reduce(param.grad.data, op=dist.reduce_op.SUM)

        if config.average_models:
            param.grad.data /= config.world_size

The communication is fixed to be an all_reduce of gradients. We may want to customize it be sparsified/quantized/decentralized aggregation.

Describe the solution you'd like

Customize the aggregate_gradients to allow for:

  1. Sparsified
  2. Quantizatized
  3. Decentralized

updates. The aggregate_gradients will have one more argument

def aggregate_gradients(model, config, agg_fn):
    """Average gradients of models across all processes."""
    for ind, param in enumerate(model.parameters()):
        param.grad.data = agg_fn(param.grad.data, op='avg')

For example, the agg_fn can be a subclass of:

class Aggregation(object):
    def __call__(self, data, op):
        """Aggregate data using `op` operation.

        Args:
            data (:obj:`torch.Tensor`): A Tensor to be aggragated.
            op (str): Aggregation methods like `avg`, `sum`, `min`, `max`, etc.

        Returns:
            :obj:`torch.Tensor`: An aggregated tensor.
        """
        raise NotImplementedError

For decentralized case it can be

class DecentralizedAggregation(Aggregation):
    def __init__(self, neighbors, rank):
        self.rank = rank
        self.neighbors = neighbors

    def __init__(self, data, op):
        local_data = {self.rank: data}

        reqs = []
        for node in self.neighbors:
            reqs.append(dist.isend(tensor=local_data[self.rank], dst=node))
            reqs.append(dist.irecv(tensor=local_data[node], src=node))

        for req in reqs:
            req.wait()

        # Aggregate local_data
        if op == 'max':
            pass
        ...
        return ....

And for sparsified case

class SparsifiedAggregation(Aggregation):
    def __init__(self, K, block_size, random):
        # K is the number of elements to collect.
        self.K = K
        self.block_size = block_size
        self.random = random
        ...

    def __init__(self, data, op):
        # use Sparsified traning.
        ...

etc.

Panaetius commented 5 years ago

Good idea.

We don't really need a aggregate_gradients(model, config, agg_fn) function if we have Aggregation classes anyways, the Aggregation.__call__()can just take the model as input directly.

liehe commented 5 years ago

Good idea.

We don't really need a aggregate_gradients(model, config, agg_fn) function if we have Aggregation classes anyways, the Aggregation.__call__()can just take the model as input directly. You are right. We can use it directly.

martinjaggi commented 5 years ago

looks good! best to make sure it can support aggregating both cases, models or gradients

liehe commented 5 years ago

@martinjaggi For models, do you mean we also synchronize things like moving average in Batch Normalization etc?

martinjaggi commented 5 years ago

sometimes yes. though the more common case will be sending gradients (as then we can do sparsification etc, as you mention). for that, signSGD is a simple example

liehe commented 5 years ago

OK. @negar-foroutan already has code for Sparsified SGD. The quantized sgd would be similar. We can include both in the repo once we fix their signatures.

negar-foroutan commented 5 years ago

It's a good idea. It makes it easy to have different kinds of aggregation for both open and closed divisions.

Panaetius commented 5 years ago

This issue is done, right? Can this be closed?