baal-org / baal

Bayesian active learning library for research and industrial usecases.
https://baal.readthedocs.io
Apache License 2.0
866 stars 86 forks source link

argument of type "ModelWrapper" is not iterable #235

Closed XueQZ closed 2 years ago

XueQZ commented 2 years ago

Hi, I'm running into a strange problem when I use Baal in a Label-Studio ML backend. I am trying to retrofit pytorch_transfer_learning.py to fit a text classification task. Everything seems fine until I click the "start training" button, it raised a TypeError. Here is the error message:

ls 报错 2022-9-30

And here is my code:

import json
import requests
from time import time
from uuid import uuid4
from pprint import pprint
from typing import Callable, Optional
from collections.abc import Sequence

from label_studio_ml.model import LabelStudioMLBase
from label_studio_ml.utils import get_single_tag_keys, get_choice, DATA_UNDEFINED_NAME, get_env

import numpy as np
import torch
from torch import nn, optim
from torch.utils.data import DataLoader, Dataset
from torch.utils.data.dataloader import default_collate
from transformers import AutoModelForSequenceClassification, AutoTokenizer

from baal.utils import metrics
from baal.bayesian.dropout import patch_module
from baal.active.heuristics import BALD
from baal.modelwrapper import ModelWrapper
from baal.utils.cuda_utils import to_cuda
from baal.utils.iterutils import map_on_tensor
from baal.utils.array_utils import stack_in_memory

HOSTNAME = get_env('HOSTNAME', 'http://localhost:8080')
API_KEY = get_env('API_KEY', 'my_api_key')
use_cuda = torch.cuda.is_available()

class TransformersTextClassifierDataset(Dataset):
    def __init__(self, input_texts, labels, tokenizer):
        self.texts = input_texts
        self.targets_list = list(set(labels))
        self.label2id = {l: i for i, l in enumerate(self.targets_list)}
        self.targets = [self.label2id[label] for label in labels]
        result = tokenizer(input_texts, add_special_tokens=True, padding="max_length", truncation=True, max_length=64,
                           return_tensors='pt', return_token_type_ids=False, return_attention_mask=True)
        self.input_ids, self.attention_masks = result['input_ids'], result['attention_mask']
        del result

    @property
    def num_classes(self):
        return len(self.targets_list)

    def label(self, idx: int, value: int):
        self.targets[idx] = value

    def __getitem__(self, idx):
        label = self.targets[idx]

        return self.texts[idx], {
            "input_ids": self.input_ids[idx].flatten() if len(self.input_ids) > 0 else None,
            "attention_mask": self.attention_masks[idx].flatten() if len(self.attention_masks) > 0 else None
        }, torch.tensor(label, dtype=torch.long)

    def __len__(self):
        return len(self.texts)

class TransformersModelWrapper(ModelWrapper):
    def train_on_dataset(self, dataset, optimizer, batch_size, epoch, use_cuda=use_cuda, workers=4,
                         collate_fn: Optional[Callable] = None, regularizer: Optional[Callable] = None):
        self.train()
        history = []
        collate_fn = collate_fn or default_collate
        for _ in range(epoch):
            self._reset_metrics("train")
            for text, data, target in DataLoader(dataset, batch_size, True, num_workers=workers, collate_fn=collate_fn):
                _ = self.train_on_batch(data, target, optimizer, use_cuda, regularizer)
            history.append(self.metrics["train_loss"].value)

        optimizer.zero_grad()  # Assert that the gradient is flushed.
        print(f'Training complete.\ntrain_loss={self.metrics["train_loss"].value}')
        return history

    def train_on_batch(self, data, target, optimizer, cuda=False, regularizer: Optional[Callable] = None):
        if cuda:
            data, target = to_cuda(data), to_cuda(target)
        optimizer.zero_grad()
        output = self.model(**data)['logits']
        loss = self.criterion(output, target)

        if regularizer:
            regularized_loss = loss + regularizer()
            regularized_loss.backward()
        else:
            loss.backward()

        optimizer.step()
        self._update_metrics(output, target, loss, filter="train")
        return loss

    def predict_on_batch(self, data, iterations=1, use_cuda=use_cuda):
        with torch.no_grad():
            if use_cuda:
                data = to_cuda(data)
            if self.replicate_in_memory:
                for key in data.keys():
                    myTensor = map_on_tensor(lambda d: stack_in_memory(d, iterations), data[key])
                    data[key] = myTensor
                try:
                    kwargs = {k: data[k] for k in data.keys()}
                    out = self.model(**kwargs)
                except RuntimeError as e:
                    raise RuntimeError(
                        """CUDA ran out of memory while BaaL tried to replicate data. See the exception above.
                    Use `replicate_in_memory=False` in order to reduce the memory requirements.
                    Note that there will be some speed trade-offs"""
                    ) from e
                out = map_on_tensor(lambda o: o.view([iterations, -1, *o.size()[1:]]), out)
                out = map_on_tensor(lambda o: o.permute(1, 2, *range(3, o.ndimension()), 0), out)
            else:
                kwargs = {k: data[k] for k in data.keys()}
                out = [self.model(**kwargs) for _ in range(iterations)]
                if isinstance(out[0], Sequence):
                    out = [torch.stack(ts, dim=-1) for ts in zip(*out)]
                else:
                    out = torch.stack(out, dim=-1)
        return out['logits']

class TransformersTextClassifier(object):
    def __init__(self, num_labels, model_path, freeze_extractor):
        self.model = AutoModelForSequenceClassification.from_pretrained(model_path, num_labels=num_labels)
        if freeze_extractor:
            print('Transfer learning with a fixed Transformer feature extractor.')
            for param in self.model.parameters():
                param.requires_grad = False
        else:
            print('Transfer learning with a full Transformer fine-tuning.')
        in_features = self.model.classifier.in_features
        self.model.classifier = nn.Linear(in_features, num_labels)

        self.criterion = nn.CrossEntropyLoss()
        self.model = patch_module(self.model)
        self.model = patch_module(self.model)
        if use_cuda:
            self.model = self.model.cuda()

        self.wrapper = TransformersModelWrapper(self.model, self.criterion)
        self.wrapper.add_metric('cls_report', lambda: metrics.ClassificationReport(num_classes=num_labels))

        if freeze_extractor:
            self.optimizer = optim.SGD(self.model.classifier.parameters(), lr=0.001, momentum=0.9)
        else:
            self.optimizer = optim.SGD(self.model.parameters(), lr=0.001, momentum=0.9)
        self.scheduler = optim.lr_scheduler.StepLR(self.optimizer, step_size=7, gamma=0.1)

    def save(self, save_path):
        torch.save(self.model.state_dict(), save_path)
        print(f"Model saved at location: {save_path}")

    def load(self, save_path):
        print(f"Loading Model from location: {save_path}")
        self.model.load_state_dict(torch.load(save_path))
        self.model.eval()
        print(f"Done.")

    def predict(self, data):
        with torch.no_grad():
            return self.wrapper.predict_on_batch(data, iterations=20, use_cuda=use_cuda).detach().cpu().numpy()

    def train(self, dataset, batch_size=32, epoch=5):
        since = time()
        self.wrapper.train_on_dataset(dataset, self.optimizer, batch_size, epoch, use_cuda, 0)
        time_elapsed = time() - since
        print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
        return self.model

class TransformersTextClassifierAPI(LabelStudioMLBase):
    def __init__(self, model_path=r'C:\Users\admin\.cache\transformers\roberta-base-finetuned-dianping-chinese',
                 freeze_extractor=True, **kwargs):
        super(TransformersTextClassifierAPI, self).__init__(**kwargs)
        self.from_name, self.to_name, self.value, self.labels = get_single_tag_keys(self.parsed_label_config, 'Choices', 'Text')
        self.model_path = model_path
        self.freeze_extractor = freeze_extractor

        if self.train_output:
            self.labels = self.train_output['labels']
            self.trainer = TransformersTextClassifier(len(self.labels), model_path, freeze_extractor)
            self.trainer.load(self.train_output['model_file'])
        else:
            self.trainer = TransformersTextClassifier(len(self.labels), model_path, freeze_extractor)

        self.tokenizer = AutoTokenizer.from_pretrained(model_path)
        self.bald = BALD()

    def predict(self, tasks, **kwargs):
        input_texts = []
        for task in tasks:
            input_text = task['data'].get(self.value) or task['data'].get(DATA_UNDEFINED_NAME)
            input_texts.append(input_text)
        results = self.tokenizer(input_texts, add_special_tokens=True, padding=True, truncation=True, max_length=64,
                                 return_tensors='pt')
        logits = self.trainer.predict(results)
        bma = logits.mean(-1)
        predicted_label_indices = np.argmax(bma, axis=1)
        predicted_scores = self.bald.get_uncertainties(logits)
        predictions = []
        for idx, score in zip(predicted_label_indices, predicted_scores):
            predicted_label = self.labels[idx]
            # prediction result for the single task
            result = [{
                'from_name': self.from_name,
                'to_name': self.to_name,
                'type': 'choices',
                'value': {'choices': [predicted_label]}
            }]
            # expand predictions with their scores for all tasks
            predictions.append({'result': result, 'score': float(score)})
        return predictions

    def fit(self, annotations, workdir=None, **kwargs):
        if kwargs.get('data'):
            project_id = kwargs['data']['project']['id']
            tasks = self._get_annotated_dataset(project_id)
        else:
            tasks = annotations

        input_texts = []
        output_labels = []

        print("Start collecting annotations...")
        for task in tasks:
            if not task.get('annotations'):
                continue
            annotation = task['annotations'][0]
            if annotation.get('skipped') or annotation.get('was_cancelled'):
                continue

            input_text = task['data'].get(self.value) or task['data'].get(DATA_UNDEFINED_NAME)
            input_texts.append(input_text)

            output_label = get_choice(task)
            output_labels.append(output_label)

        new_labels = set(output_labels)
        if len(new_labels) != len(self.labels):
            self.labels = list(sorted(new_labels))
            self.trainer = TransformersTextClassifier(len(self.labels), self.model_path, self.freeze_extractor)
            print('Label set has been changed. New label set: ' + str(self.labels))

        train_dataset = TransformersTextClassifierDataset(input_texts, output_labels, self.tokenizer)

        print(f'Start training on {len(input_texts)} samples. ')
        self.trainer = TransformersTextClassifier(len(self.labels), self.model_path, self.freeze_extractor)
        self.trainer.train(train_dataset)
        pprint(self.trainer.wrapper.metrics['train_cls_report'].value)

        print('Save model...')
        workdir = workdir or os.getenv('MODEL_DIR')
        model_name = str(uuid4())[:8]
        if workdir:
            model_file = os.path.join(workdir, f'{model_name}.pkl')
        else:
            model_file = f'{model_name}.pkl'
        self.trainer.save(model_file)

        train_output = {
            'labels': self.labels,
            'model_file': model_file
        }
        return train_output

    def _get_annotated_dataset(self, project_id):
        download_url = f'{HOSTNAME.rstrip("/")}/api/projects/{project_id}/export'
        response = requests.get(download_url, headers={'Authorization': f'Token {API_KEY}'})
        if response.status_code != 200:
            raise Exception(f"Can't load task data using {download_url}, "
                            f"response status_code = {response.status_code}")
        return json.loads(response.content)

Did I miss something? I am using the latest release version of Baal and Label-Studio 1.6.0

Dref360 commented 2 years ago

Hello! Looking at the codebase, I think this refers to label-studio Model Wrapper and not Baal.

https://github.com/heartexlabs/label-studio-ml-backend/blob/master/label_studio_ml/model.py#L42

In any case, I was already in the process of writing a new tutorial on how to use Baal with HuggingFace on LabelStudio. I should be done by Monday.

Dref360 commented 2 years ago

For what it's worth, export LABEL_STUDIO_ML_BACKEND_V2=True fixed the issue for me.

I made a gist with my script if you want to take a look.

It finetunes a Distilbert model on your labels.

Let me know if this fixes your issue.