WwZzz / easyFL

An experimental platform for federated learning.
Apache License 2.0
527 stars 88 forks source link

在iterate()中添加调度模块后,训练表现异常 #66

Open Socrates2001 opened 1 day ago

Socrates2001 commented 1 day ago

在FL-GO中,每一次全局迭代包含三个步骤,采样、训练与聚合。现在其中加入一个调度模块后,训练之后的测试精度会在第三轮之后,一直卡在11.35。代码上相应的改动如下(一些关键超参:Mnist_IID, 'num_clients': 100, fedavg, option={'num_rounds':20, "gpu": 0, 'proportion': 0.2, 'num_steps': 5, 'responsiveness': 'UNI-5-1000'}):

def iterate(self):
    """
    The standard iteration of each federated communication round that contains three
    necessary procedure in FL: client selection, communication and model aggregation.

    Returns:
        False if the global model is not updated in this iteration
    """
    # sample clients: MD sampling as default
    self.selected_clients = self.sample()
    # training
    models = self.communicate(self.selected_clients)['model']
    # scheduling
    scheduled_clients, scheduled_models = self.client_scheduling(models)
    # aggregate: pk = 1/K as default where K=len(selected_clients)
    self.model = self.aggregate(scheduled_models)

    return len(scheduled_models) > 0

def client_scheduling(self, models):
    """
    基于设定的调度率,从完成训练的客户端中再次选择部分客户端进行聚合。

    Args:
        models (list): 完成本地训练的客户端的模型列表

    Returns:
        scheduled_clients (list): 调度后的客户端列表
        scheduled_models (list): 调度后的客户端模型列表
    """
    # 调度率,决定使用多少客户端进行聚合
    print("selected_clients", self.selected_clients)
    scheduling_rate = self.option.get('client_scheduling_rate', 0.5)  # 调度率设为50%

    num_selected_clients = len(models)
    num_scheduled_clients = max(int(num_selected_clients * scheduling_rate), 1)  # 至少选1个客户端

    # 随机从训练完成的客户端中选择调度的客户端
    scheduled_indices = np.random.choice(range(num_selected_clients), num_scheduled_clients, replace=False)
    print("scheduled_indices", scheduled_indices)
    # 选择调度后的客户端和其对应的模型
    scheduled_clients = [self.selected_clients[i] for i in scheduled_indices]
    print("scheduled_clients", scheduled_clients)
    scheduled_models = [models[i] for i in scheduled_indices]

    return scheduled_clients, scheduled_models

微信图片_20240930110737 当调度率为1的时候,效果是正常的,一旦小于1之后,表现就很奇怪了,请问这样的原因是什么,是否还需要改一些底层的代码?

WwZzz commented 1 day ago

aggregate函数使用了self.selected_clients来计算聚合权重,schedule后的models长度小于self.selected_clients,权重和不对,需要把selected_clients重置成调度的用户

Socrates2001 commented 1 day ago

aggregate函数使用了self.selected_clients来计算聚合权重,schedule后的models长度小于self.selected_clients,权重和不对,需要把selected_clients重置成调度的用户

这个问题昨天被考虑到了,aggregate()有几种聚合方式,默认用的uniform,其中聚合权重的计算取决于接收到的模型长度,即调度的客户端数量或者说调度的模型数量,好像并没有调用到self.selected_clients参数: ... elif self.aggregationoption == 'uniform': return fmodule.model_average(models) ...

def _model_average(ms = [], p = []): r""" Averaging a list of models to a new one

Args:
    ms (list): a list of models (i.e. each model's class is FModule(...))
    p (list): a list of real numbers that are the averaging weights

Returns:
    The new model that is the weighted averaging of models in ms
"""
if len(ms)==0: return None
if len(p)==0: p = [1.0 / len(ms) for _ in range(len(ms))]
op_with_graph = sum([w.ingraph for w in ms]) > 0
res = ms[0].__class__().to(ms[0].get_device())
if op_with_graph:
    mlks = [get_module_from_model(mi) for mi in ms]
    mlr = get_module_from_model(res)
    for n in range(len(mlr)):
        mpks = [mlk[n]._parameters for mlk in mlks]
        rd = _modeldict_weighted_average(mpks, p)
        for l in mlr[n]._parameters.keys():
            if mlr[n]._parameters[l] is None: continue
            mlr[n]._parameters[l] = rd[l]
    res.op_with_graph()
else:
    _modeldict_cp(res.state_dict(), _modeldict_weighted_average([mi.state_dict() for mi in ms], p))
return res
WwZzz commented 18 hours ago

aggregate函数使用了self.selected_clients来计算聚合权重,schedule后的models长度小于self.selected_clients,权重和不对,需要把selected_clients重置成调度的用户

这个问题昨天被考虑到了,aggregate()有几种聚合方式,默认用的uniform,其中聚合权重的计算取决于接收到的模型长度,即调度的客户端数量或者说调度的模型数量,好像并没有调用到self.selected_clients参数: ... elif self.aggregationoption == 'uniform': return fmodule.model_average(models) ...

def _model_average(ms = [], p = []): r""" Averaging a list of models to a new one

Args:
    ms (list): a list of models (i.e. each model's class is FModule(...))
    p (list): a list of real numbers that are the averaging weights

Returns:
    The new model that is the weighted averaging of models in ms
"""
if len(ms)==0: return None
if len(p)==0: p = [1.0 / len(ms) for _ in range(len(ms))]
op_with_graph = sum([w.ingraph for w in ms]) > 0
res = ms[0].__class__().to(ms[0].get_device())
if op_with_graph:
    mlks = [get_module_from_model(mi) for mi in ms]
    mlr = get_module_from_model(res)
    for n in range(len(mlr)):
        mpks = [mlk[n]._parameters for mlk in mlks]
        rd = _modeldict_weighted_average(mpks, p)
        for l in mlr[n]._parameters.keys():
            if mlr[n]._parameters[l] is None: continue
            mlr[n]._parameters[l] = rd[l]
    res.op_with_graph()
else:
    _modeldict_cp(res.state_dict(), _modeldict_weighted_average([mi.state_dict() for mi in ms], p))
return res

后来按照fedavg更新过的论文对默认采样和聚合方式进行了更新,默认是使用uniform采样和加权聚合,以和最新的fedavg论文版本保持一致。可以直接用fmodule._model_average(models)聚合模型或是使用参数'aggregate':'uniform'。知乎教程和注释部分的说明好像没有及时修改,我改一下。