HoangHoang1408 / temp

0 stars 0 forks source link

custom_classification_head #17

Open HoangHoang1408 opened 3 months ago

HoangHoang1408 commented 3 months ago

import os os.environ["CUDA_DEVICE_ORDER"]="PCI_BUS_ID" os.environ["CUDA_VISIBLE_DEVICES"]="6" import torch from torch import nn from torch.utils.data import DataLoader from torch.nn import functional as F device = 'cuda' if torch.cuda.is_available() else 'mps' if torch.backends.mps.is_available() else 'cpu' from tqdm.notebook import tqdm import evaluate import numpy as np from transformers import XLMRobertaPreTrainedModel, AutoModel, AutoConfig, XLMRobertaModel, AutoTokenizer, DataCollatorWithPadding, TrainingArguments, Trainer, XLMRobertaConfig, PretrainedConfig from transformers.modeling_outputs import SequenceClassifierOutput from torch.nn import CrossEntropyLoss from typing import Optional, Union, Tuple from glob import glob from datasets import Dataset, concatenate_datasets import numpy as np import pandas as pd from pprint import pprint import os import json import random import wandb

HoangHoang1408 commented 3 months ago

class XLMRobertaClassificationHead(nn.Module): """Head for sentence-level classification tasks."""

def __init__(self, config, num_labels):
    super().__init__()
    self.dense = nn.Linear(config.hidden_size, config.hidden_size)
    classifier_dropout = (
        config.classifier_dropout if config.classifier_dropout is not None else config.hidden_dropout_prob
    )
    self.dropout = nn.Dropout(classifier_dropout)
    self.out_proj = nn.Linear(config.hidden_size, num_labels)

def forward(self, features, **kwargs):
    x = features[:, 0, :]  # take <s> token (equiv. to [CLS])
    x = self.dropout(x)
    x = self.dense(x)
    x = torch.tanh(x)
    x = self.dropout(x)
    x = self.out_proj(x)
    return x

class XLMRobertaCustomConfig(PretrainedConfig): model_type = "xlm-roberta" def init( self, num_labels=2, vocab_size=30522, hidden_size=768, num_hidden_layers=12, num_attention_heads=12, intermediate_size=3072, hidden_act="gelu", hidden_dropout_prob=0.1, attention_probs_dropout_prob=0.1, max_position_embeddings=512, type_vocab_size=2, initializer_range=0.02, layer_norm_eps=1e-12, pad_token_id=1, bos_token_id=0, eos_token_id=2, position_embedding_type="absolute", use_cache=True, classifier_dropout=None, kwargs, ): super().init(pad_token_id=pad_token_id, bos_token_id=bos_token_id, eos_token_id=eos_token_id, kwargs) if num_labels is None: raise Exception("The number of labels must not be None") self.num_labels=num_labels self.vocab_size = vocab_size self.hidden_size = hidden_size self.num_hidden_layers = num_hidden_layers self.num_attention_heads = num_attention_heads self.hidden_act = hidden_act self.intermediate_size = intermediate_size self.hidden_dropout_prob = hidden_dropout_prob self.attention_probs_dropout_prob = attention_probs_dropout_prob self.max_position_embeddings = max_position_embeddings self.type_vocab_size = type_vocab_size self.initializer_range = initializer_range self.layer_norm_eps = layer_norm_eps self.position_embedding_type = position_embedding_type self.use_cache = use_cache self.classifier_dropout = classifier_dropout

HoangHoang1408 commented 3 months ago

class XLMRobertaForSequenceClassification(XLMRobertaPreTrainedModel): config_class=XLMRobertaCustomConfig def init(self, config): super().init(config) self.num_labels = config.num_labels self.config = config self.roberta = XLMRobertaModel(config, add_pooling_layer=False) self.classifier = XLMRobertaClassificationHead(config,config.num_labels) self.post_init()

def forward(
    self,
    input_ids: Optional[torch.LongTensor] = None,
    attention_mask: Optional[torch.FloatTensor] = None,
    token_type_ids: Optional[torch.LongTensor] = None,
    position_ids: Optional[torch.LongTensor] = None,
    head_mask: Optional[torch.FloatTensor] = None,
    inputs_embeds: Optional[torch.FloatTensor] = None,
    labels: Optional[torch.LongTensor] = None,
    output_attentions: Optional[bool] = None,
    output_hidden_states: Optional[bool] = None,
    return_dict: Optional[bool] = None,
) -> Union[Tuple[torch.Tensor], SequenceClassifierOutput]:
    r"""
    labels (`torch.LongTensor` of shape `(batch_size,)`, *optional*):
        Labels for computing the sequence classification/regression loss. Indices should be in `[0, ...,
        config.num_labels - 1]`. If `config.num_labels == 1` a regression loss is computed (Mean-Square loss), If
        `config.num_labels > 1` a classification loss is computed (Cross-Entropy).
    """
    return_dict = return_dict if return_dict is not None else self.config.use_return_dict

    outputs = self.roberta(
        input_ids,
        attention_mask=attention_mask,
        token_type_ids=token_type_ids,
        position_ids=position_ids,
        head_mask=head_mask,
        inputs_embeds=inputs_embeds,
        output_attentions=output_attentions,
        output_hidden_states=output_hidden_states,
        return_dict=return_dict,
    )
    sequence_output = outputs[0]
    logits = self.classifier(sequence_output)

    loss = None
    if labels is not None:
        # move labels to correct device to enable model parallelism
        labels = labels.to(logits.device)
        if self.config.problem_type is None:
            if self.num_labels == 1:
                self.config.problem_type = "regression"
            elif self.num_labels > 1 and (labels.dtype == torch.long or labels.dtype == torch.int):
                self.config.problem_type = "single_label_classification"
            else:
                self.config.problem_type = "multi_label_classification"

        if self.config.problem_type == "regression":
            loss_fct = MSELoss()
            if self.num_labels == 1:
                loss = loss_fct(logits.squeeze(), labels.squeeze())
            else:
                loss = loss_fct(logits, labels)
        elif self.config.problem_type == "single_label_classification":
            loss_fct = CrossEntropyLoss()
            loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))
        elif self.config.problem_type == "multi_label_classification":
            loss_fct = BCEWithLogitsLoss()
            loss = loss_fct(logits, labels)

    if not return_dict:
        output = (logits,) + outputs[2:]
        return ((loss,) + output) if loss is not None else output

    return SequenceClassifierOutput(
        loss=loss,
        logits=logits,
        hidden_states=outputs.hidden_states,
        attentions=outputs.attentions,
    )
HoangHoang1408 commented 3 months ago

train_task_id = 7 task_order = ["C-STANCE", "FOMC", "MeetingBank", "Py150", "ScienceQA", "NumGLUE-cm", "NumGLUE-ds", "20Minuten"] SETTING_PATH = "/workspace/home/hoangpv4/ts/code/notebooks/classification_data/classification_data/training_1500_50.json" NUM_EPOCH=1

base_model_id = "/workspace/home/hoangpv4/models/xlmr_base"

base_model_id = "/workspace/home/hoangpv4/ts/code/notebooks/custom_classification_head_checkpoints/1/training_1500_50/6_NumGLUE-ds/checkpoint-113"

SETTING = SETTING_PATH.split("/")[-1].split(".json")[0] assert SETTING in base_model_id

with open(SETTING_PATH) as f: data = json.load(f)

if train_task_id == 0: num_labels = 2 id2label = { 0: "other", 1: "C-STANCE" } else: num_labels = train_task_id + 1 id2label = {i: task_order[i] for i in range(num_labels)}

label2id = {v: k for k, v in id2label.items()} SAVE_FOLDER = os.path.join("/workspace/home/hoangpv4/ts/code/notebooks/custom_classification_head_checkpoints", str(NUM_EPOCH), SETTING) if not os.path.exists(SAVE_FOLDER): os.makedirs(SAVE_FOLDER,exist_ok=True) save_path = os.path.join(SAVE_FOLDER,f"{train_taskid}{task_order[train_task_id]}")

config = XLMRobertaConfig.from_pretrained("/workspace/home/hoangpv4/models/xlmr_base") config_dict = config.to_dict() config_dict['num_labels'] = num_labels config = XLMRobertaCustomConfig(**config_dict) config.label2id = label2id config.id2label = id2label tokenizer = AutoTokenizer.from_pretrained(base_model_id) model = XLMRobertaForSequenceClassification.from_pretrained(base_model_id, config=config, ignore_mismatched_sizes=True) model

HoangHoang1408 commented 3 months ago

def preprocess_function(examples): return tokenizer(examples["text"], truncation=True, max_length=256)

data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

training_args = TrainingArguments( output_dir=save_path, learning_rate=2e-5, per_device_train_batch_size=16, num_train_epochs=NUM_EPOCH, weight_decay=0.01, save_strategy="epoch", report_to=None, save_total_limit=1, ) trainer = Trainer( model=model, args=training_args, train_dataset=Dataset.from_list(data[task_order[train_task_id]]).map(preprocess_function, batched=True), tokenizer=tokenizer, data_collator=data_collator, ) wandb.init(mode="disabled", allow_val_change=True) trainer.train()

HoangHoang1408 commented 3 months ago

SETTING_FOLDER = "/workspace/home/hoangpv4/ts/code/notebooks/custom_classification_head_checkpoints/1/training_1500_10" task_order = ["C-STANCE", "FOMC", "MeetingBank", "Py150", "ScienceQA", "NumGLUE-cm", "NumGLUE-ds", "20Minuten"] paths = sorted(glob(os.path.join(SETTING_FOLDER, "*/checkpoint-"))) TEST_SETTING = { "C-STANCE": { "test_path": "/workspace/home/hoangpv4/ts/dataset/TRACE/LLM-CL-Benchmark_5000/C-STANCE/test.json", }, "FOMC": { "test_path": "/workspace/home/hoangpv4/ts/dataset/TRACE/LLM-CL-Benchmark_5000/FOMC/test.json", }, "MeetingBank": { "test_path": "/workspace/home/hoangpv4/ts/dataset/TRACE/LLM-CL-Benchmark_5000/MeetingBank/test.json", }, "Py150": { "test_path": "/workspace/home/hoangpv4/ts/dataset/TRACE/LLM-CL-Benchmark_5000/Py150/test.json", }, "ScienceQA": { "test_path": "/workspace/home/hoangpv4/ts/dataset/TRACE/LLM-CL-Benchmark_5000/ScienceQA/test.json", }, "NumGLUE-cm": { "test_path": "/workspace/home/hoangpv4/ts/dataset/TRACE/LLM-CL-Benchmark_5000/NumGLUE-cm/test.json", }, "NumGLUE-ds": { "test_path": "/workspace/home/hoangpv4/ts/dataset/TRACE/LLM-CL-Benchmark_5000/NumGLUE-ds/test.json", }, "20Minuten": { "test_path": "/workspace/home/hoangpv4/ts/dataset/TRACE/LLM-CL-Benchmark_5000/20Minuten/test.json", }, } for i, p in enumerate(paths): TEST_SETTING[list(TEST_SETTING.keys())[i]]['checkpoint'] = p

pprint(TEST_SETTING)

def calculate_acc(test_data_path, model, tokenizer): gold_label = test_data_path.split("/")[-2] preds = [] with open(test_data_path) as f: for point in tqdm(json.load(f)): try: inputs = tokenizer(point["prompt"], return_tensors="pt", truncation=True, max_length=256).to("cuda") with torch.no_grad(): logits = model(**inputs).logits label = model.config.id2label[logits.argmax().item()] preds.append(label) except Exception as e: print(e) pass return { "acc": sum([1 for p in preds if p == gold_label]) / len(preds), "preds": preds }

overall_results = {} for i, task_name in enumerate(TEST_SETTING): result = {} config = AutoConfig.from_pretrained(TEST_SETTING[task_name]["checkpoint"]) current_model = XLMRobertaForSequenceClassification.from_pretrained(TEST_SETTING[task_name]["checkpoint"], config=config, ignore_mismatched_sizes=True).to("cuda") current_model.eval() current_tokenizer = AutoTokenizer.from_pretrained(TEST_SETTING[task_name]["checkpoint"]) for pev_task_name in tqdm(task_order[: i + 1]): result[pev_task_name] = calculate_acc(TEST_SETTING[pev_task_name]["test_path"], current_model, current_tokenizer) overall_results[task_name] = result

with open(os.path.join(SETTING_FOLDER, "results.json"), 'w') as f:
    json.dump(overall_results, f)

for key, val in overall_results.items(): scores = [v['acc'] for v in val.values()] print(key, sum(scores) / len(scores))

HoangHoang1408 commented 3 months ago

paths = glob("/workspace/home/hoangpv4/ts/code/notebooks/custom_classification_head_checkpoints/1//results.json") for path in paths: setting = path.split("/")[-2] print(setting + "\n") with open(path) as f: results = json.load(f) for key, val in results.items(): scores = [v['acc'] for v in val.values()] print(key, sum(scores) / len(scores)) print("="*50)