SymbioticLab / FedScale

FedScale is a scalable and extensible open-source federated learning (FL) platform.
https://fedscale.ai
Apache License 2.0
389 stars 119 forks source link

Missing feedbacks when `- total_worker` is large #92

Closed DavdGao closed 2 years ago

DavdGao commented 2 years ago

I'm trying to run the example of FEMNIST with a two-layer Conv network on a ubuntu server with 8 gpus. The following is my yaml:

# Configuration file of FAR training experiment

# ========== Cluster configuration ========== 
# ip address of the parameter server (need 1 GPU process)
ps_ip: 0.0.0.0

# ip address of each worker:# of available gpus process on each gpu in this node
# Note that if we collocate ps and worker on same GPU, then we need to decrease this number of available processes on that GPU by 1
# E.g., master node has 4 available processes, then 1 for the ps, and worker should be set to: worker:3
worker_ips:
    - 0.0.0.0:[7] # worker_ip: [(# processes on gpu) for gpu in available_gpus] eg. 10.0.0.2:[4,4,4,4] This node has 4 gpus, each gpu has 4 processes.

exp_path: /mnt/gaodawei.gdw/FedScale/fedscale/core

# Entry function of executor and aggregator under $exp_path
executor_entry: executor.py

aggregator_entry: aggregator.py

auth:
    ssh_user: "root"
    ssh_private_key: ~/.ssh/id_rsa

# cmd to run before we can indeed run FAR (in order)
setup_commands:
    - source /mnt/gaodawei.gdw/miniconda3/bin/activate fedscale
    - export NCCL_SOCKET_IFNAME='enp94s0f0'         # Run "ifconfig" to ensure the right NIC for nccl if you have multiple NICs

# ========== Additional job configuration ==========
# Default parameters are specified in argParser.py, wherein more description of the parameter can be found

job_conf:
    - job_name: femnist                   # Generate logs under this folder: log_path/job_name/time_stamp
    - log_path: /mnt/gaodawei.gdw/FedScale/evals # Path of log files
    - total_worker: 100                      # Number of participants per round, we use K=100 in our paper, large K will be much slower
    - data_set: femnist                     # Dataset: openImg, google_speech, stackoverflow
    - data_dir: /mnt/gaodawei.gdw/FedScale/dataset/data/femnist    # Path of the dataset
    - data_map_file: /mnt/gaodawei.gdw/FedScale/dataset/data/femnist/client_data_mapping/train.csv              # Allocation of data to each client, turn to iid setting if not provided
    - device_conf_file: /mnt/gaodawei.gdw/FedScale/dataset/data/device_info/client_device_capacity     # Path of the client trace
    - device_avail_file: /mnt/gaodawei.gdw/FedScale/dataset/data/device_info/client_behave_trace
    - model: convnet2                            # Models: e.g., shufflenet_v2_x2_0, mobilenet_v2, resnet34, albert-base-v2
    - gradient_policy: fed-avg                 # {"fed-yogi", "fed-prox", "fed-avg"}, "fed-avg" by default
    - eval_interval: 100                     # How many rounds to run a testing on the testing set
    - epochs: 400                          # Number of rounds to run this training. We use 1000 in our paper, while it may converge w/ ~400 rounds
    - rounds: 400
    - filter_less: 0                       # Remove clients w/ less than 21 samples
    - num_loaders: 2
    - yogi_eta: 3e-3
    - yogi_tau: 1e-8
    - local_steps: 10
    - learning_rate: 0.05
    - decay_factor: 1.  # decay factor of the learning rate
    - batch_size: 20
    - test_bsz: 20
    - malicious_factor: 4
    - use_cuda: True
    - input_dim: 3
    - sample_seed: 12345
    - test_ratio: 1.
    - loss_decay: 0.

FedScale is blocked during training, so I add some logging in aggregator.py to monitor the running status as follows

    def event_monitor(self):
        logging.info("Start monitoring events ...")

        while True:
            # Broadcast events to clients
            ...

            # Handle events queued on the aggregator
            elif len(self.sever_events_queue) > 0:
                client_id, current_event, meta, data = self.sever_events_queue.popleft()
                logging.info("Receive event {} from client {}".format(current_event, client_id))
                if current_event == events.UPLOAD_MODEL:
                    self.client_completion_handler(self.deserialize_response(data))
                    #
                    logging.info("Currently {}/{} clients has finished training".format(len(self.stats_util_accumulator), self.tasks_round))
                    if len(self.stats_util_accumulator) == self.tasks_round:
                        self.round_completion_handler()

                elif current_event == events.MODEL_TEST:
                    self.testing_completion_handler(client_id, self.deserialize_response(data))

                else:
                    logging.error(f"Event {current_event} is not defined")

            else:
                # execute every 100 ms
                time.sleep(0.1)

and I obtain the following logs:

...
(05-26) 14:13:28 INFO     [client.py:17] Start to train (CLIENT: 2167) ...
(05-26) 14:13:28 INFO     [aggregator.py:628] Receive event upload_model from client 4
(05-26) 14:13:28 INFO     [aggregator.py:632] Currently 92/100 clients has finished training
(05-26) 14:13:28 INFO     [client.py:188] Training of (CLIENT: 585) completes, {'clientId': 585, 'moving_loss': 4.0690016746521, 'trained_size': 200, 'success': True, 'utility': 751.3793800173086}
(05-26) 14:13:28 INFO     [client.py:17] Start to train (CLIENT: 2305) ...
(05-26) 14:13:28 INFO     [aggregator.py:628] Receive event upload_model from client 5
(05-26) 14:13:28 INFO     [aggregator.py:632] Currently 93/100 clients has finished training
(05-26) 14:13:29 INFO     [client.py:188] Training of (CLIENT: 92) completes, {'clientId': 92, 'moving_loss': 4.070298886299133, 'trained_size': 200, 'success': True, 'utility': 740.9585433536305}
(05-26) 14:13:29 INFO     [aggregator.py:628] Receive event upload_model from client 3
(05-26) 14:13:29 INFO     [aggregator.py:632] Currently 94/100 clients has finished training
(05-26) 14:13:29 INFO     [client.py:188] Training of (CLIENT: 2305) completes, {'clientId': 2305, 'moving_loss': 4.075016450881958, 'trained_size': 200, 'success': True, 'utility': 521.201964075297}
(05-26) 14:13:29 INFO     [aggregator.py:628] Receive event upload_model from client 5
(05-26) 14:13:29 INFO     [aggregator.py:632] Currently 95/100 clients has finished training
(05-26) 14:13:31 INFO     [client.py:188] Training of (CLIENT: 1607) completes, {'clientId': 1607, 'moving_loss': 3.9613564729690554, 'trained_size': 200, 'success': True, 'utility': 322.7957952931622}
Traceback (most recent call last):
  File "/mnt/gaodawei.gdw/miniconda3/envs/fedscale/lib/python3.7/multiprocessing/queues.py", line 242, in _feed
    send_bytes(obj)
  File "/mnt/gaodawei.gdw/miniconda3/envs/fedscale/lib/python3.7/multiprocessing/connection.py", line 200, in send_bytes
    self._send_bytes(m[offset:offset + size])
  File "/mnt/gaodawei.gdw/miniconda3/envs/fedscale/lib/python3.7/multiprocessing/connection.py", line 404, in _send_bytes
    self._send(header + buf)
  File "/mnt/gaodawei.gdw/miniconda3/envs/fedscale/lib/python3.7/multiprocessing/connection.py", line 368, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
(05-26) 14:13:31 INFO     [client.py:188] Training of (CLIENT: 1821) completes, {'clientId': 1821, 'moving_loss': 3.9525305151939394, 'trained_size': 200, 'success': True, 'utility': 321.9902424194754}
(05-26) 14:13:31 INFO     [aggregator.py:628] Receive event upload_model from client 7
(05-26) 14:13:31 INFO     [aggregator.py:632] Currently 96/100 clients has finished training
(05-26) 14:13:31 INFO     [aggregator.py:628] Receive event upload_model from client 1
(05-26) 14:13:31 INFO     [aggregator.py:632] Currently 97/100 clients has finished training
(05-26) 14:13:33 INFO     [client.py:188] Training of (CLIENT: 137) completes, {'clientId': 137, 'moving_loss': 3.9952051639556885, 'trained_size': 200, 'success': True, 'utility': 643.8445367870678}
(05-26) 14:13:33 INFO     [aggregator.py:628] Receive event upload_model from client 2
(05-26) 14:13:33 INFO     [aggregator.py:632] Currently 98/100 clients has finished training
(05-26) 14:13:34 INFO     [client.py:188] Training of (CLIENT: 2167) completes, {'clientId': 2167, 'moving_loss': 4.092173397541046, 'trained_size': 200, 'success': True, 'utility': 821.8575235110941}
(05-26) 14:13:34 INFO     [aggregator.py:628] Receive event upload_model from client 4
(05-26) 14:13:34 INFO     [aggregator.py:632] Currently 99/100 clients has finished training

It seems like the server only get 99 models from the clients and the server continues to wait for the missing client. I guess maybe it is related to the report BrokenPipeError?

Traceback (most recent call last):
  File "/mnt/gaodawei.gdw/miniconda3/envs/fedscale/lib/python3.7/multiprocessing/queues.py", line 242, in _feed
    send_bytes(obj)
  File "/mnt/gaodawei.gdw/miniconda3/envs/fedscale/lib/python3.7/multiprocessing/connection.py", line 200, in send_bytes
    self._send_bytes(m[offset:offset + size])
  File "/mnt/gaodawei.gdw/miniconda3/envs/fedscale/lib/python3.7/multiprocessing/connection.py", line 404, in _send_bytes
    self._send(header + buf)
  File "/mnt/gaodawei.gdw/miniconda3/envs/fedscale/lib/python3.7/multiprocessing/connection.py", line 368, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe

So what can I do to deal with it?

fanlai0990 commented 2 years ago

Thanks for letting us know. @dywsjtu Can you please check it for now? I will double check it later today if needed.

DavdGao commented 2 years ago

Also, I have checked the log file, and found one client (whose index is 403) started to train but cannot finished the training I can only find the following logging:

(05-26) 14:13:06 INFO     [client.py:17] Start to train (CLIENT: 403) ...

but the 403 client didn't report completion or failure

AmberLJC commented 2 years ago

You can first add a if-condition before line, checking that len(client_data)>0, otherwise break the loop. Let us know whether it solves the problem.

We will find out the bug and fix it later

fanlai0990 commented 2 years ago

Hello. We tried to reproduce these issues, but our experiment runs well. Please try to pull the latest version, and follow this conf.yml to have a try. :)

DavdGao commented 2 years ago

@fanlai0990 Thanks, I know where is wrong. Since I set filter_less as 0 and FedScale drops the last epoch by default during training, some clients may have less than 20 training samples and cannot jump out the training loop. Therefore the training process is blocked.

Maybe some hints for the relation between batch_size and filter_less~