jina-ai / serve

☁️ Build multimodal AI applications with cloud-native stack
https://jina.ai/serve
Apache License 2.0
21.13k stars 2.22k forks source link

Use GPU in torch encoder with transformers==4.1.1 #2526

Closed Yongxuanzhang closed 3 years ago

Yongxuanzhang commented 3 years ago

Describe the bug

In Jina 2.0, the GPU is not working for models which need GPU in torch encoder. The error message is: RuntimeError: Cannot re-initialize CUDA in forked subprocess. To use CUDA with multiprocessing, you must use the 'spawn' start method

Code to reproduce:

from typing import Dict, Optional
import numpy as np
import torch
from jina import Executor, DocumentArray, requests, Flow, Document
from transformers import AutoModel, AutoTokenizer

class TextEncoder(Executor):
    """Transformer executor class """

    def __init__(
        self,
        pretrained_model_name_or_path: str = 'sentence-transformers/distilbert-base-nli-stsb-mean-tokens',
        base_tokenizer_model: Optional[str] = None,
        pooling_strategy: str = 'mean',
        layer_index: int = -1,
        max_length: Optional[int] = None,
        acceleration: Optional[str] = None,
        embedding_fn_name: str = '__call__',
        on_gpu: bool = True,
        *args,
        **kwargs,
    ):
        super().__init__(*args, **kwargs)
        self.pretrained_model_name_or_path = pretrained_model_name_or_path
        self.base_tokenizer_model = (
            base_tokenizer_model or pretrained_model_name_or_path
        )
        self.pooling_strategy = pooling_strategy
        self.layer_index = layer_index
        self.max_length = max_length
        self.acceleration = acceleration
        self.embedding_fn_name = embedding_fn_name
        self.tokenizer = AutoTokenizer.from_pretrained(self.base_tokenizer_model)
        self.model = AutoModel.from_pretrained(
            self.pretrained_model_name_or_path, output_hidden_states=True
        )
        self.on_gpu = on_gpu
        if self.on_gpu:
            self.model.to(torch.device('cuda:0'))
        else:
            self.model.to(torch.device('cpu'))

    def _compute_embedding(self, hidden_states: 'torch.Tensor', input_tokens: Dict):
        fill_vals = {'cls': 0.0, 'mean': 0.0, 'max': -np.inf, 'min': np.inf}
        fill_val = torch.tensor(
            fill_vals[self.pooling_strategy], device=torch.device('cpu')
        )

        layer = hidden_states[self.layer_index]
        attn_mask = input_tokens['attention_mask'].unsqueeze(-1).expand_as(layer)
        layer = torch.where(attn_mask.bool(), layer, fill_val)

        embeddings = layer.sum(dim=1) / attn_mask.sum(dim=1)
        return embeddings.cpu().numpy()

    @requests
    def encode(self, docs: 'DocumentArray', *args, **kwargs):
        if docs is None:
            return
        chunks = DocumentArray(
            list(
                filter(lambda d: d.mime_type == 'text/plain', docs.traverse_flat(['c']))
            )
        )

        texts = chunks.get_attributes('text')

        with torch.no_grad():

            if not self.tokenizer.pad_token:
                self.tokenizer.add_special_tokens({'pad_token': '[PAD]'})
                self.model.resize_token_embeddings(len(self.tokenizer.vocab))

            input_tokens = self.tokenizer(
                texts,
                max_length=self.max_length,
                padding='longest',
                truncation=True,
                return_tensors='pt',
            )
            input_tokens = {
                k: v.to(torch.device('cpu')) for k, v in input_tokens.items()
            }

            outputs = getattr(self.model, self.embedding_fn_name)(**input_tokens)
            if isinstance(outputs, torch.Tensor):
                return outputs.cpu().numpy()
            hidden_states = outputs.hidden_states

            embeds = self._compute_embedding(hidden_states, input_tokens)
            for doc, embed in zip(chunks, embeds):
                doc.embedding = embed

f = Flow().add(uses=TextEncoder, name='TextEncoder',timeout_ready=-1)
chunk=Document(text='test a mwu')
doc=Document()
doc.chunks.append(chunk)

def print_resp(resp):
    print(resp)

with f:
    f.post(on='/index', inputs=doc,on_done=print_resp)

Environment

jina==2.0.0rc2.dev57 torch==1.7.1 transformers==4.1.1 Screenshots

FionnD commented 3 years ago

@Yongxuanzhang it was suggested in the team lead meeting today to try parallel ==1, I think @florian-hoenicke might remember more?

JoanFM commented 3 years ago

Hey @Yongxuanzhang, can u try setting cuda as device instead of cuda:0?

Yongxuanzhang commented 3 years ago

@Yongxuanzhang it was suggested in the team lead meeting today to try parallel ==1, I think @florian-hoenicke might remember more?

Not this reason I think, for the code I put in this ticket there's no parallel right?

Yongxuanzhang commented 3 years ago

Hey @Yongxuanzhang, can u try setting cuda as device instead of cuda:0?

Sure I will try that

Yongxuanzhang commented 3 years ago

Upgrading transformers>4.2.0 will solve this issue

Yongxuanzhang commented 3 years ago

To solve this, we could either upgrade the transformers version or do the importing of transformers inside the init() before using it instead of importing at the beginning.

Working examples:

from typing import Dict, Optional
import numpy as np
import torch
from jina import Executor, DocumentArray, requests, Flow, Document
if False:
    from transformers import AutoModel, AutoTokenizer

class TextEncoder(Executor):
    """Transformer executor class """

    def __init__(
        self,
        pretrained_model_name_or_path: str = 'sentence-transformers/distilbert-base-nli-stsb-mean-tokens',
        base_tokenizer_model: Optional[str] = None,
        pooling_strategy: str = 'mean',
        layer_index: int = -1,
        max_length: Optional[int] = None,
        acceleration: Optional[str] = None,
        embedding_fn_name: str = '__call__',
        on_gpu: bool = True,
        *args,
        **kwargs,
    ):
        super().__init__(*args, **kwargs)
        from transformers import AutoModel, AutoTokenizer
        self.pretrained_model_name_or_path = pretrained_model_name_or_path
        self.base_tokenizer_model = (
                base_tokenizer_model or self.pretrained_model_name_or_path
        )
        self.pooling_strategy = pooling_strategy
        self.layer_index = layer_index
        self.max_length = max_length
        self.acceleration = acceleration
        self.embedding_fn_name = embedding_fn_name
        self.on_gpu = on_gpu
        self.tokenizer = AutoTokenizer.from_pretrained(self.base_tokenizer_model)
        self.model = AutoModel.from_pretrained(
            self.pretrained_model_name_or_path, output_hidden_states=True
        )
        if self.on_gpu:
            self.device=torch.device('cuda:0')
            self.model.to(self.device)
        else:
            self.device=torch.device('cpu')
            self.model.to(self.device)

    def _compute_embedding(self, hidden_states: 'torch.Tensor', input_tokens: Dict):
        fill_vals = {'cls': 0.0, 'mean': 0.0, 'max': -np.inf, 'min': np.inf}
        fill_val = torch.tensor(
            fill_vals[self.pooling_strategy], device=self.device
        )

        layer = hidden_states[self.layer_index]
        attn_mask = input_tokens['attention_mask'].unsqueeze(-1).expand_as(layer)
        layer = torch.where(attn_mask.bool(), layer, fill_val)

        embeddings = layer.sum(dim=1) / attn_mask.sum(dim=1)
        return embeddings.cpu().numpy()

    @requests(on=['/index', '/search'])
    def encode(self, docs: 'DocumentArray', *args, **kwargs):
        if docs is None:
            return
        chunks = DocumentArray(
            list(
                filter(lambda d: d.mime_type == 'text/plain', docs.traverse_flat(['c']))
            )
        )

        texts = chunks.get_attributes('text')

        with torch.no_grad():

            if not self.tokenizer.pad_token:
                self.tokenizer.add_special_tokens({'pad_token': '[PAD]'})
                self.model.resize_token_embeddings(len(self.tokenizer.vocab))

            input_tokens = self.tokenizer(
                texts,
                max_length=self.max_length,
                padding='longest',
                truncation=True,
                return_tensors='pt',
            )
            input_tokens = {
                k: v.to(self.device) for k, v in input_tokens.items()
            }

            outputs = getattr(self.model, self.embedding_fn_name)(**input_tokens)
            if isinstance(outputs, torch.Tensor):
                return outputs.cpu().numpy()
            hidden_states = outputs.hidden_states

            embeds = self._compute_embedding(hidden_states, input_tokens)
            for doc, embed in zip(chunks, embeds):
                doc.embedding = embed

f = Flow().add(uses=TextEncoder, name='TextEncoder',timeout_ready=-1)
chunk=Document(text='test a mwu')
doc=Document()
doc.chunks.append(chunk)

def print_resp(resp):
    print(resp)

with f:
    f.post(on='/index', inputs=doc,on_done=print_resp)