OpenMined / PySyft

Perform data science on data that remains in someone else's server
https://www.openmined.org/
Apache License 2.0
9.54k stars 1.99k forks source link

ObjectNotFoundError #5210

Closed shashankc28 closed 3 years ago

shashankc28 commented 3 years ago
#Now let's define our workers. You can either use remote workers or virtual workers
hook = sy.TorchHook(torch)  # <-- NEW: hook PyTorch ie add extra functionalities to support Federated Learning
alice = sy.VirtualWorker(hook, id="alice",is_client_worker= True)  
bob = sy.VirtualWorker(hook, id="bob",is_client_worker= True)  
#charlie = sy.VirtualWorker(hook, id="charlie") 

workers_virtual = [alice, bob]

#If you have your workers operating remotely, like on Raspberry PIs
#kwargs_websocket_alice = {"host": "ip_alice", "hook": hook}
#alice = WebsocketClientWorker(id="alice", port=8777, **kwargs_websocket_alice)
#kwargs_websocket_bob = {"host": "ip_bob", "hook": hook}
#bob = WebsocketClientWorker(id="bob", port=8778, **kwargs_websocket_bob)
#workers_virtual = [alice, bob]

langDataset =  LanguageDataset(array_lines_proper_dimension, categories_numpy)

#assign the data points and the corresponding categories to workers.
federated_train_loader = sy.FederatedDataLoader(
            langDataset
            .federate(workers_virtual),
            batch_size=args.batch_size)

def categoryFromOutput(output):
    top_n, top_i = output.topk(1)
    category_i = top_i[0].item()
    return all_categories[category_i], category_i

def timeSince(since):
    now = time.time()
    s = now - since
    m = math.floor(s / 60)
    s -= m * 60
    return '%dm %ds' % (m, s)

def fed_avg_every_n_iters(model_pointers, iter, federate_after_n_batches):
        models_local = {}

        if(iter % args.federate_after_n_batches == 0):
            for worker_name, model_pointer in model_pointers.items():
#                #need to assign the model to the worker it belongs to.
                models_local[worker_name] = model_pointer.copy().get()
            model_avg = utils.federated_avg(models_local)

            for worker in workers_virtual:
                model_copied_avg = model_avg.copy()
                model_ptr = model_copied_avg.send(worker) 
                model_pointers[worker.id] = model_ptr

        return(model_pointers)     

def fw_bw_pass_model(model_pointers, line_single, category_single):
    #get the right initialized model
    model_ptr = model_pointers[line_single.location.id]
    #print(model_ptr)   
    line_reshaped = line_single.reshape(max_line_size, 1, len(all_letters))
    print(line_reshaped)
    print(device)
    line_reshaped, category_single = line_reshaped.to(device), category_single.to(device)
    #Firstly, initialize hidden layer
    hidden_init = model_ptr.initHidden() 
    #And now zero grad the model
    model_ptr.zero_grad()
    hidden_ptr = hidden_init.send(line_single.location)
    amount_lines_non_zero = len(torch.nonzero(line_reshaped.copy().get()))
    #now need to perform forward passes
    for i in range(amount_lines_non_zero): 
        output, hidden_ptr = model_ptr(line_reshaped[i], hidden_ptr) 
    criterion = nn.NLLLoss()   
    loss = criterion(output, category_single) 
    loss.backward()

    model_got = model_ptr.get() 

    #Perform model weights' updates    
    for param in model_got.parameters():
        param.data.add_(-args.learning_rate, param.grad.data)

    model_sent = model_got.send(line_single.location.id)
    model_pointers[line_single.location.id] = model_sent

    return(model_pointers, loss, output)

def train_RNN(n_iters, print_every, plot_every, federate_after_n_batches, list_federated_train_loader):
    current_loss = 0
    all_losses = []    

    model_pointers = {}

    #Send the initialized model to every single worker just before the training procedure starts
    for worker in workers_virtual:
        model_copied = model.copy()
        model_ptr = model_copied.send(worker)
        print(model_ptr) 
        model_pointers[worker.id] = model_ptr

    #extract a random element from the list and perform training on it
    for iter in range(1, n_iters + 1):        
        random_index = randomTrainingIndex()
        line_single, category_single = list_federated_train_loader[random_index]
        line_name = names_list[random_index]
        model_pointers, loss, output = fw_bw_pass_model(model_pointers, line_single, category_single)
        #print("passed step")
        #model_pointers = fed_avg_every_n_iters(model_pointers, iter, args.federate_after_n_batches)
        #Update the current loss a
        loss_got = loss.get().item()
        #print("after get loss") 
        current_loss += loss_got
        print(current_loss)

        if iter % plot_every == 0:
            all_losses.append(current_loss / plot_every)
            current_loss = 0

        if(iter % print_every == 0):
            output_got = output.get()  #Without copy()
            print("after output get") 
            guess, guess_i = categoryFromOutput(output_got)
            category = all_categories[category_single.copy().get().item()]
            correct = '✓' if guess == category else '✗ (%s)' % category
            print('%d %d%% (%s) %.4f %s / %s %s' % (iter, iter / n_iters * 100, timeSince(start), loss_got, line_name, guess, correct))
    return(all_losses, model_pointers)

#This may take a few seconds to complete.
print("Generating list of batches for the workers...")
list_federated_train_loader = list(federated_train_loader)

start = time.time()
all_losses, model_pointers = train_RNN(args.epochs, args.print_every, args.plot_every, args.federate_after_n_batches, list_federated_train_loader)

the above code runs a few cycles and then gives the below error :


KeyError Traceback (most recent call last) /usr/local/lib/python3.6/dist-packages/syft/generic/object_storage.py in get_obj(self, obj_id) 73 try: ---> 74 obj = self._objects[obj_id] 75 except KeyError as e:

KeyError: 92210858066

During handling of the above exception, another exception occurred:

ObjectNotFoundError Traceback (most recent call last) 22 frames

in () 1 start = time.time() ----> 2 all_losses, model_pointers = train_RNN(args.epochs, args.print_every, args.plot_every, args.federate_after_n_batches, list_federated_train_loader) in train_RNN(n_iters, print_every, plot_every, federate_after_n_batches, list_federated_train_loader) 81 line_single, category_single = list_federated_train_loader[random_index] 82 line_name = names_list[random_index] ---> 83 model_pointers, loss, output = fw_bw_pass_model(model_pointers, line_single, category_single) 84 #print("passed step") 85 #model_pointers = fed_avg_every_n_iters(model_pointers, iter, args.federate_after_n_batches) in fw_bw_pass_model(model_pointers, line_single, category_single) 35 print(line_reshaped) 36 print(device) ---> 37 line_reshaped, category_single = line_reshaped.to(device), category_single.to(device) 38 #Firstly, initialize hidden layer 39 hidden_init = model_ptr.initHidden() /usr/local/lib/python3.6/dist-packages/syft/generic/frameworks/hook/hook.py in overloaded_native_method(self, *args, **kwargs) 464 # Send the new command to the appropriate class and get the response 465 method = getattr(new_self, method_name) --> 466 response = method(*new_args, **new_kwargs) 467 468 # For inplace methods, just directly return self /usr/local/lib/python3.6/dist-packages/syft/generic/frameworks/hook/hook.py in overloaded_pointer_method(self, *args, **kwargs) 626 command = (attr, self, args, kwargs) 627 --> 628 response = owner.send_command(location, command) 629 630 # For inplace methods, just directly return self /usr/local/lib/python3.6/dist-packages/syft/workers/base.py in send_command(self, recipient, message, return_ids, return_value) 636 name, target, args_, kwargs_, return_ids, return_value 637 ) --> 638 ret_val = self.send_msg(message, location=recipient) 639 except ResponseSignatureError as e: 640 ret_val = None /usr/local/lib/python3.6/dist-packages/syft/workers/base.py in send_msg(self, message, location) 288 289 # Step 2: send the message and wait for a response --> 290 bin_response = self._send_msg(bin_message, location) 291 292 # Step 3: deserialize the response /usr/local/lib/python3.6/dist-packages/syft/workers/virtual.py in _send_msg(self, message, location) 13 sleep(self.message_pending_time) 14 ---> 15 return location._recv_msg(message) 16 17 def _recv_msg(self, message: bin) -> bin: /usr/local/lib/python3.6/dist-packages/syft/workers/virtual.py in _recv_msg(self, message) 17 def _recv_msg(self, message: bin) -> bin: 18 """receive message""" ---> 19 return self.recv_msg(message) /usr/local/lib/python3.6/dist-packages/syft/workers/base.py in recv_msg(self, bin_message) 315 316 # Step 0: deserialize message --> 317 msg = sy.serde.deserialize(bin_message, worker=self) 318 319 if self.verbose: /usr/local/lib/python3.6/dist-packages/syft/serde/serde.py in deserialize(binary, worker, strategy) 67 object: the deserialized form of the binary input. 68 """ ---> 69 return strategy(binary, worker) /usr/local/lib/python3.6/dist-packages/syft/serde/msgpack/serde.py in deserialize(binary, worker) 380 381 simple_objects = _deserialize_msgpack_binary(binary, worker) --> 382 return _deserialize_msgpack_simple(simple_objects, worker) 383 384 /usr/local/lib/python3.6/dist-packages/syft/serde/msgpack/serde.py in _deserialize_msgpack_simple(simple_objects, worker) 371 # as msgpack's inability to serialize torch tensors or ... or 372 # python slice objects --> 373 return _detail(worker, simple_objects) 374 375 /usr/local/lib/python3.6/dist-packages/syft/serde/msgpack/serde.py in _detail(worker, obj, **kwargs) 497 """ 498 if type(obj) in (list, tuple): --> 499 val = detailers[obj[0]](worker, obj[1], **kwargs) 500 return _detail_field(obj[0], val) 501 else: /usr/local/lib/python3.6/dist-packages/syft/messaging/message.py in detail(worker, msg_tuple) 154 simplified_action = msg_tuple[0] 155 --> 156 detailed_action = sy.serde.msgpack.serde._detail(worker, simplified_action) 157 158 return TensorCommandMessage(detailed_action) /usr/local/lib/python3.6/dist-packages/syft/serde/msgpack/serde.py in _detail(worker, obj, **kwargs) 497 """ 498 if type(obj) in (list, tuple): --> 499 val = detailers[obj[0]](worker, obj[1], **kwargs) 500 return _detail_field(obj[0], val) 501 else: /usr/local/lib/python3.6/dist-packages/syft/execution/computation.py in detail(worker, msg_tuple) 96 return_value = msg_tuple[2] 97 ---> 98 detailed_msg = sy.serde.msgpack.serde._detail(worker, message) 99 detailed_ids = sy.serde.msgpack.serde._detail(worker, return_ids) 100 detailed_return_value = sy.serde.msgpack.serde._detail(worker, return_value) /usr/local/lib/python3.6/dist-packages/syft/serde/msgpack/serde.py in _detail(worker, obj, **kwargs) 497 """ 498 if type(obj) in (list, tuple): --> 499 val = detailers[obj[0]](worker, obj[1], **kwargs) 500 return _detail_field(obj[0], val) 501 else: /usr/local/lib/python3.6/dist-packages/syft/serde/msgpack/native_serde.py in _detail_collection_tuple(worker, my_tuple, shallow) 148 # Step 1: deserialize each part of the collection 149 for part in my_tuple: --> 150 pieces.append(serde._detail(worker, part)) 151 152 return tuple(pieces) /usr/local/lib/python3.6/dist-packages/syft/serde/msgpack/serde.py in _detail(worker, obj, **kwargs) 497 """ 498 if type(obj) in (list, tuple): --> 499 val = detailers[obj[0]](worker, obj[1], **kwargs) 500 return _detail_field(obj[0], val) 501 else: /usr/local/lib/python3.6/dist-packages/syft/generic/pointers/pointer_tensor.py in detail(worker, tensor_tuple) 494 # If the pointer received is pointing at the current worker, we load the tensor instead 495 if worker_id == worker.id: --> 496 tensor = worker.get_obj(id_at_location) 497 498 if point_to_attr is not None and tensor is not None: /usr/local/lib/python3.6/dist-packages/syft/workers/base.py in get_obj(self, obj_id) 666 obj_id: A string or integer id of an object to look up. 667 """ --> 668 obj = super().get_obj(obj_id) 669 670 # An object called with get_obj will be "with high probability" serialized /usr/local/lib/python3.6/dist-packages/syft/generic/object_storage.py in get_obj(self, obj_id) 75 except KeyError as e: 76 if obj_id not in self._objects: ---> 77 raise ObjectNotFoundError(obj_id, self) 78 else: 79 raise e ObjectNotFoundError: Object "92210858066" not found on worker!!! You just tried to interact with an object ID:92210858066 on which does not exist!!! Use .send() and .get() on all your tensors to make sure they're on the same machines. If you think this tensor does exist, check the ._objects dictionary on the worker and see for yourself!!! The most common reason this error happens is because someone calls .get() on the object's pointer without realizing it (which deletes the remote object and sends it to the pointer). Check your code to make sure you haven't already called .get() on this pointer!!!
tudorcebere commented 3 years ago

Hi! 0.2 hit EOL with the release of 0.5.0rc1, no issues/PRs are going to target this specific version anymore, but checkout 0.5.0rc1, as it's close to feature parity with 0.2.x.