ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.9k stars 5.76k forks source link

can't pickle SwigPyObject objects when transfering data from GPU to CPU #14100

Closed abarbosa94 closed 3 years ago

abarbosa94 commented 3 years ago

What is the problem?

I'm using https://github.com/huggingface/datasets and I'm trying to implement https://huggingface.co/transformers/model_doc/dpr.html model to the problem that I'm facing. Due to complexity reasons, I'm trying to use Ray for Distributed Retrieval, ispired by the idea from this blogpost: https://medium.com/distributed-computing-with-ray/retrieval-augmented-generation-with-huggingface-transformers-and-ray-b09b56161b1e

I'm applying model inference in GPU and providing dataset retrieval in CPU, but when I do this, I recieve can't pickle SwigPyObject objects

Ray version and other system information (Python version, TensorFlow version, OS): 1.2.0

Reproduction (REQUIRED)

Please provide a short code snippet (less than 50 lines if possible) that can be copy-pasted to reproduce the issue. The snippet should have no external library dependencies (i.e., use fake or mock data / environments):

ray.init(num_cpus=5, num_gpus=1)

@ray.remote(num_cpus=1, num_gpus=0.2, max_calls=1)
def test(example):
    question_text = example['question']
    for option in example['options']:
        question_with_option = question_text + " " + option['option_text']
        tokenize_text =  question_tokenizer(question_with_option, return_tensors="pt")
        tokenize_text.to(device)
        question_embed = (
            question_encoder(**tokenize_text)
        )[0][0].cpu().detach().numpy()
        _, retrieved_examples = dpr_dataset.get_nearest_examples(
            "embeddings", question_embed, k=10
        )
    return question_embed

tmp = test.remote(dataset['train'][0])

The log trace error is the following:

--------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-23-87609b591951> in <module>
----> 1 tmp = test.remote(dataset['train'][0])

~/.cache/pypoetry/virtualenvs/masters-utTTC0p8-py3.7/lib/python3.7/site-packages/ray/remote_function.py in _remote_proxy(*args, **kwargs)
     99         @wraps(function)
    100         def _remote_proxy(*args, **kwargs):
--> 101             return self._remote(args=args, kwargs=kwargs)
    102 
    103         self.remote = _remote_proxy

~/.cache/pypoetry/virtualenvs/masters-utTTC0p8-py3.7/lib/python3.7/site-packages/ray/remote_function.py in _remote(self, args, kwargs, num_returns, num_cpus, num_gpus, memory, object_store_memory, accelerator_type, resources, max_retries, placement_group, placement_group_bundle_index, placement_group_capture_child_tasks, override_environment_variables, name)
    199             # first driver. This is an argument for repickling the function,
    200             # which we do here.
--> 201             self._pickled_function = pickle.dumps(self._function)
    202 
    203             self._function_descriptor = PythonFunctionDescriptor.from_function(

~/.cache/pypoetry/virtualenvs/masters-utTTC0p8-py3.7/lib/python3.7/site-packages/ray/cloudpickle/cloudpickle_fast.py in dumps(obj, protocol, buffer_callback)
     71                 file, protocol=protocol, buffer_callback=buffer_callback
     72             )
---> 73             cp.dump(obj)
     74             return file.getvalue()
     75 

~/.cache/pypoetry/virtualenvs/masters-utTTC0p8-py3.7/lib/python3.7/site-packages/ray/cloudpickle/cloudpickle_fast.py in dump(self, obj)
    572     def dump(self, obj):
    573         try:
--> 574             return Pickler.dump(self, obj)
    575         except RuntimeError as e:
    576             if "recursion" in e.args[0]:

TypeError: can't pickle SwigPyObject objects

If the code snippet cannot be run by itself, the issue will be closed with "needs-repro-script".

I guess that this discussion here: https://github.com/huggingface/datasets/issues/1805 can help you

rkooo567 commented 3 years ago

If your remote task has an object that is not serializable, it cannot be pickled (for example, you cannot serialize thread's locks). This normally happens when you capture the object like this;

lock = threading.Lock()
@ray.remote
def f():
    a = lock # Here, lock is captured, and it is not seriaizable, so you cannot pickled this function to run remotely.
    # do something

Instead, you can do something like this;

@ray.remote
def f():
    lock = threading.Lock()
    a = lock # Here, lock doesn't have to be serialized because it is created inside a task.
    # do something

I recommend you to check out this document and see if you capture any object that is not serializable. https://docs.ray.io/en/latest/serialization.html#troubleshooting

abarbosa94 commented 3 years ago

Thanks a lot for the quick reply :)

By following the troubleshooting link, I was able to find out that HuggingFace datasets are not serializable.

I created an actor that stores the dataset as an attribute of the class. I'm able to perform retrieval in a distributed way successfully right now :)

I'll let my code here if someone faces this challenge in the future.

@ray.remote(num_gpus=0.125)
class DPRRetrieval(object):
    def __init__(self):
        self.dpr_dataset = load_dataset("text",
            data_files=ARC_CORPUS_TEXT,
            cache_dir=CACHE_DIR,
            split="train[:100%]"
        )
        self.dpr_dataset.load_faiss_index("embeddings", ARC_CORPUS_FAISS)
        torch.set_grad_enabled(False)

    def generate_context(self, example):
        question_text = example['question']
        for option in example['options']:
            question_with_option = question_text + " " + option['option_text']
            tokenize_text =  question_tokenizer(question_with_option, return_tensors="pt")
            tokenize_text.to(device)
            question_embed = (
                question_encoder(**tokenize_text)
            )[0][0].cpu().numpy()
            _, retrieved_examples = self.dpr_dataset.get_nearest_examples(
                "embeddings", question_embed, k=10
            )
            option["option_context"] = retrieved_examples["text"]
            option["option_context"] = " ".join(option["option_context"]).strip()
        return example

obj_ids = [DPRRetrieval.remote() for _ in  range(num_cpus)]
pool = ActorPool(obj_ids)
examples = [example for example in dataset['validation']]
parallel_result = pool.map(
    lambda a, v: a.generate_context.remote(v), examples
    )
result = list(tqdm(parallel_result, total=len(examples)))

I'm able to scale a lot of the computation right now!

Closing this issue.

Thanks a lot!