Tongjilibo / bert4torch

An elegent pytorch implement of transformers
https://bert4torch.readthedocs.io/
MIT License
1.25k stars 155 forks source link

单机多卡BaseModelDDP初始化错误,AttributeError: can't set attribute #146

Closed jjRen-xd closed 1 year ago

jjRen-xd commented 1 year ago

提问时请尽可能提供如下信息:

基本信息

核心代码

#! -*- coding:utf-8 -*-
# bert+crf用来做实体识别
# 数据集:http://s3.bmio.net/kashgari/china-people-daily-ner-corpus.tar.gz
# [valid_f1]  token_level: 97.06; entity_level: 95.90

import os
import numpy as np
import argparse
import torch
from torch.utils.data import DataLoader
import torch.nn as nn
import torch.optim as optim
from bert4torch.callbacks import Callback
from bert4torch.snippets import sequence_padding, ListDataset, seed_everything
from bert4torch.layers import CRF
from bert4torch.tokenizers import Tokenizer
from bert4torch.models import build_transformer_model, BaseModel, Trainer
from bert4torch.models import BaseModelDP, add_trainer, BaseModelDDP
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data.distributed import DistributedSampler

from tqdm import tqdm

# 加载数据集
class MyDataset(ListDataset):
    @staticmethod
    def load_data(filename):
        D = []
        with open(filename, encoding='utf-8') as f:
            f = f.read()
            for l in f.split('\n\n'):
                if not l:
                    continue
                d = ['']
                for i, c in enumerate(l.split('\n')):
                    char, flag = c.split(' ')
                    d[0] += char
                    if flag[0] == 'B':
                        d.append([i, i, flag[2:]])
                    elif flag[0] == 'I':
                        d[-1][1] = i
                D.append(d)
        return D
    # D[0]: ['海钓比赛地点在厦门与金门之间的海域。', [7, 8, 'LOC'], [10, 11, 'LOC']]

def collate_fn(batch):
    batch_token_ids, batch_labels = [], []
    for d in batch:
        tokens = tokenizer.tokenize(d[0], maxlen=maxlen)
        mapping = tokenizer.rematch(d[0], tokens)
        start_mapping = {j[0]: i for i, j in enumerate(mapping) if j}
        end_mapping = {j[-1]: i for i, j in enumerate(mapping) if j}
        token_ids = tokenizer.tokens_to_ids(tokens)
        labels = np.zeros(len(token_ids))
        for start, end, label in d[1:]:
            if start in start_mapping and end in end_mapping:
                start = start_mapping[start]
                end = end_mapping[end]
                labels[start] = categories_label2id['B-'+label]
                labels[start + 1:end + 1] = categories_label2id['I-'+label]
        batch_token_ids.append(token_ids)
        batch_labels.append(labels)
    batch_token_ids = torch.tensor(sequence_padding(batch_token_ids), dtype=torch.long, device=device)
    batch_labels = torch.tensor(sequence_padding(batch_labels), dtype=torch.long, device=device)
    return batch_token_ids, batch_labels

# 定义bert上的模型结构
class Model(nn.Module):
    def __init__(self):
        super().__init__()
        self.bert = build_transformer_model(config_path=config_path, checkpoint_path=checkpoint_path, segment_vocab_size=0)
        self.fc = nn.Linear(768, len(categories))  # 包含首尾
        self.crf = CRF(len(categories))

    def forward(self, token_ids):
        sequence_output = self.bert([token_ids])  # [btz, seq_len, hdsz]
        emission_score = self.fc(sequence_output)  # [btz, seq_len, tag_size]
        attention_mask = token_ids.gt(0).long()
        return emission_score, attention_mask

    def predict(self, token_ids):
        self.eval()
        with torch.no_grad():
            emission_score, attention_mask = self.forward(token_ids)
            best_path = self.crf.decode(emission_score, attention_mask)  # [btz, seq_len]
        return best_path

class Loss(nn.Module):
    def forward(self, outputs, labels):
        return model.module.crf(*outputs, labels)

def acc(y_pred, y_true):
    y_pred = y_pred[0]
    y_pred = torch.argmax(y_pred, dim=-1)
    acc = torch.sum(y_pred.eq(y_true)).item() / y_true.numel()
    return {'acc': acc}

def evaluate(data):
    X, Y, Z = 1e-10, 1e-10, 1e-10
    X2, Y2, Z2 = 1e-10, 1e-10, 1e-10
    for token_ids, label in tqdm(data):
        scores = model.predict(token_ids)  # [btz, seq_len]
        attention_mask = label.gt(0)

        # token粒度
        X += (scores.eq(label) * attention_mask).sum().item()   # 预测正确的数量 
        Y += scores.gt(0).sum().item()  # 预测中含感兴趣实体的数量
        Z += label.gt(0).sum().item()   # 标签中含感兴趣实体的数量

        # entity粒度
        entity_pred = trans_entity2tuple(scores)
        entity_true = trans_entity2tuple(label)
        X2 += len(entity_pred.intersection(entity_true))
        Y2 += len(entity_pred)
        Z2 += len(entity_true)
    f1, precision, recall = 2 * X / (Y + Z), X / Y, X / Z
    f2, precision2, recall2 = 2 * X2 / (Y2 + Z2), X2/ Y2, X2 / Z2
    return f1, precision, recall, f2, precision2, recall2

def trans_entity2tuple(scores):
    '''把tensor转为(样本id, start, end, 实体类型)的tuple用于计算指标
    '''
    batch_entity_ids = set()
    for i, one_samp in enumerate(scores):
        entity_ids = []
        for j, item in enumerate(one_samp):
            flag_tag = categories_id2label[item.item()]
            if flag_tag.startswith('B-'):  # B
                entity_ids.append([i, j, j, flag_tag[2:]])
            elif len(entity_ids) == 0:
                continue
            elif (len(entity_ids[-1]) > 0) and flag_tag.startswith('I-') and (flag_tag[2:]==entity_ids[-1][-1]):  # I
                entity_ids[-1][-2] = j
            elif len(entity_ids[-1]) > 0:
                entity_ids.append([])

        for i in entity_ids:
            if i:
                batch_entity_ids.add(tuple(i))
    return batch_entity_ids

class Evaluator(Callback):
    """评估与保存
    """
    def __init__(self):
        self.best_val_f1 = 0.

    def on_epoch_end(self, steps, epoch, logs=None):
        f1, precision, recall, f2, precision2, recall2 = evaluate(valid_dataloader)
        if f2 > self.best_val_f1:
            self.best_val_f1 = f2
            # model.save_weights('best_model.pt')
        print(f'[val-token  level] f1: {f1:.5f}, p: {precision:.5f} r: {recall:.5f}')
        print(f'[val-entity level] f1: {f2:.5f}, p: {precision2:.5f} r: {recall2:.5f} best_f1: {self.best_val_f1:.5f}\n')

if __name__ == '__main__':
    local_rank = int(os.environ['LOCAL_RANK'])

    maxlen = 256
    batch_size = 32
    categories = ['O', 'B-LOC', 'I-LOC', 'B-PER', 'I-PER', 'B-ORG', 'I-ORG']
    categories_id2label = {i: k for i, k in enumerate(categories)}
    categories_label2id = {k: i for i, k in enumerate(categories)}

    # BERT base
    config_path = './checkpoints/bert-base-chinese/config.json'
    checkpoint_path = './checkpoints/bert-base-chinese/pytorch_model.bin'
    dict_path = './checkpoints/bert-base-chinese/vocab.txt'

    print('local_rank',local_rank)
    torch.cuda.set_device(local_rank)
    device = torch.device('cuda', local_rank)
    # device = 'cuda' if torch.cuda.is_available() else 'cpu'
    torch.distributed.init_process_group(backend='nccl')

    model = Model().to(device)
    model = BaseModelDDP(model)

    # 固定seed
    seed_everything(42)

    # 分词器
    tokenizer = Tokenizer(dict_path, do_lower_case=True)

    # 数据集
    train_dataset = MyDataset('./datasets/china-people-daily-ner/example.train')
    valid_dataset = MyDataset('./datasets/china-people-daily-ner/example.dev')
    train_sampler = DistributedSampler(train_dataset)
    train_dataloader = DataLoader(train_dataset, 
                                  batch_size=batch_size, 
                                #   shuffle=True, 
                                  collate_fn=collate_fn,
                                  sampler=train_sampler
                                  ) 
    valid_dataloader = DataLoader(valid_dataset, 
                                  batch_size=batch_size, 
                                  collate_fn=collate_fn) 
    print('train_number: ',len(train_dataset))
    print('valid_number: ', len(valid_dataset))

    # 支持多种自定义metrics = ['accuracy', acc, {acc: acc}]均可
    model.compile(loss=Loss(), optimizer=optim.Adam(model.parameters(), lr=2e-5), metrics=acc)

    evaluator = Evaluator()
    model.fit(train_dataloader, epochs=5, steps_per_epoch=None, callbacks=[evaluator])
# else: 
    # model.load_weights('best_model.pt')

输出信息

Traceback (most recent call last):
  File "DDP_test.py", line 187, in <module>
    model = BaseModelDDP(model)
  File "/home/renjunjie.rjj/.local/lib/python3.8/site-packages/torch4keras/model.py", line 485, in __init__
    nn.parallel.DistributedDataParallel.__init__(self, *args, **kwargs)
  File "/home/renjunjie.rjj/.local/lib/python3.8/site-packages/torch/nn/parallel/distributed.py", line 537, in __init__
    self.device = list(self.module.parameters())[0].device
  File "/home/renjunjie.rjj/.local/lib/python3.8/site-packages/torch/nn/modules/module.py", line 1225, in __setattr__
    object.__setattr__(self, name, value)
AttributeError: can't set attribute
ERROR:torch.distributed.elastic.multiprocessing.api:failed (exitcode: 1) local_rank: 0 (pid: 97053) of binary: /opt/conda/envs/python3.8/bin/python
---------------------------------------------------------------------------
ChildFailedError                          Traceback (most recent call last)

自我尝试

尝试了不同的bert4torch版本和torch版本,同样的报错

Tongjilibo commented 1 year ago

我刚重新跑了下我给的DDP的示例是可以跑起来的,用的0.3.1.post2

zhouyiyuan-mt commented 1 year ago

我遇到了同样的问题

Tongjilibo commented 1 year ago

我遇到了同样的问题

能提供下代码吗?我跑我提供的ddp示例的是没问题的,启动的时候是用启动命令启动的,不是直接运行脚本的

zhouyiyuan-mt commented 1 year ago
#! -*- coding:utf-8 -*-
# bert+crf用来做实体识别
# 数据集:http://s3.bmio.net/kashgari/china-people-daily-ner-corpus.tar.gz
# [valid_f1]  token_level: 97.06; entity_level: 95.90

import numpy as np
import torch
from torch.utils.data import DataLoader
import torch.nn as nn
import torch.optim as optim
from bert4torch.callbacks import Callback
from bert4torch.snippets import sequence_padding, ListDataset, seed_everything
from bert4torch.layers import CRF
from bert4torch.tokenizers import Tokenizer
from bert4torch.models import build_transformer_model, BaseModel, BaseModelDDP
from tqdm import tqdm
import os

DDP_ON = bool(int(os.getenv('DDP_ON', 0)))
if DDP_ON:
    torch.distributed.init_process_group(backend='nccl')
    rank = int(os.getenv('RANK'))
    world_size = int(os.getenv('WORLD_SIZE'))
    print("DEBUG: DDP ON-> rank = ", rank, " world_size = ", world_size)
    torch.cuda.set_device(rank)

maxlen = 256
batch_size = 32
categories = ['O', 'B-LOC', 'I-LOC', 'B-PER', 'I-PER', 'B-ORG', 'I-ORG']
categories_id2label = {i: k for i, k in enumerate(categories)}
categories_label2id = {k: i for i, k in enumerate(categories)}

# BERT base
config_path = '/home/zhouyiyuan/bert_data/model/config.json'
checkpoint_path = '/home/zhouyiyuan/bert_data/model/pytorch_model.bin'
dict_path = '/home/zhouyiyuan/bert_data/model/vocab.txt'
device = 'cuda' if torch.cuda.is_available() else 'cpu'

# 固定seed
seed_everything(42)

# 加载数据集
class MyDataset(ListDataset):
    @staticmethod
    def load_data(filename):
        D = []
        with open(filename, encoding='utf-8') as f:
            f = f.read()
            for l in f.split('\n\n'):
                if not l:
                    continue
                d = ['']
                for i, c in enumerate(l.split('\n')):
                    char, flag = c.split(' ')
                    d[0] += char
                    if flag[0] == 'B':
                        d.append([i, i, flag[2:]])
                    elif flag[0] == 'I':
                        d[-1][1] = i
                D.append(d)
        return D

# 建立分词器
tokenizer = Tokenizer(dict_path, do_lower_case=True)

def collate_fn(batch):
    batch_token_ids, batch_labels = [], []
    for d in batch:
        tokens = tokenizer.tokenize(d[0], maxlen=maxlen)
        mapping = tokenizer.rematch(d[0], tokens)
        start_mapping = {j[0]: i for i, j in enumerate(mapping) if j}
        end_mapping = {j[-1]: i for i, j in enumerate(mapping) if j}
        token_ids = tokenizer.tokens_to_ids(tokens)
        labels = np.zeros(len(token_ids))
        for start, end, label in d[1:]:
            if start in start_mapping and end in end_mapping:
                start = start_mapping[start]
                end = end_mapping[end]
                labels[start] = categories_label2id['B-'+label]
                labels[start + 1:end + 1] = categories_label2id['I-'+label]
        batch_token_ids.append(token_ids)
        batch_labels.append(labels)
    batch_token_ids = torch.tensor(sequence_padding(batch_token_ids), dtype=torch.long, device=device)
    batch_labels = torch.tensor(sequence_padding(batch_labels), dtype=torch.long, device=device)
    return batch_token_ids, batch_labels

# 转换数据集
train_dataloader = DataLoader(MyDataset('/home/zhouyiyuan/bert_data/china-people-daily-ner-corpus/example.train'), batch_size=batch_size, shuffle=True, collate_fn=collate_fn) 
valid_dataloader = DataLoader(MyDataset('/home/zhouyiyuan/bert_data/china-people-daily-ner-corpus/example.dev'), batch_size=batch_size, collate_fn=collate_fn) 

# 定义bert上的模型结构
class Model(BaseModel):
    def __init__(self):
        super().__init__()
        self.bert = build_transformer_model(config_path=config_path, checkpoint_path=checkpoint_path, segment_vocab_size=0)
        self.fc = nn.Linear(768, len(categories))  # 包含首尾
        self.crf = CRF(len(categories))

    def forward(self, token_ids):
        sequence_output = self.bert([token_ids])  # [btz, seq_len, hdsz]
        emission_score = self.fc(sequence_output)  # [btz, seq_len, tag_size]
        attention_mask = token_ids.gt(0).long()
        return emission_score, attention_mask

    def predict(self, token_ids):
        self.eval()
        with torch.no_grad():
            emission_score, attention_mask = self.forward(token_ids)
            best_path = self.crf.decode(emission_score, attention_mask)  # [btz, seq_len]
        return best_path

model = Model().to(device)
if DDP_ON:
    # model = BaseModelDDP(model, device_ids=[rank])
    model = BaseModelDDP(model, master_rank=0, device_ids=[rank], output_device=rank, find_unused_parameters=False)

class Loss(nn.Module):
    def forward(self, outputs, labels):
        return model.module.crf(*outputs, labels)

def acc(y_pred, y_true):
    y_pred = y_pred[0]
    y_pred = torch.argmax(y_pred, dim=-1)
    acc = torch.sum(y_pred.eq(y_true)).item() / y_true.numel()
    return {'acc': acc}

# 支持多种自定义metrics = ['accuracy', acc, {acc: acc}]均可
model.compile(loss=Loss(), optimizer=optim.Adam(model.parameters(), lr=2e-5), metrics=acc)

def evaluate(data):
    X, Y, Z = 1e-10, 1e-10, 1e-10
    X2, Y2, Z2 = 1e-10, 1e-10, 1e-10
    for token_ids, label in tqdm(data):
        scores = model.predict(token_ids)  # [btz, seq_len]
        attention_mask = label.gt(0)

        # token粒度
        X += (scores.eq(label) * attention_mask).sum().item()
        Y += scores.gt(0).sum().item()
        Z += label.gt(0).sum().item()

        # entity粒度
        entity_pred = trans_entity2tuple(scores)
        entity_true = trans_entity2tuple(label)
        X2 += len(entity_pred.intersection(entity_true))
        Y2 += len(entity_pred)
        Z2 += len(entity_true)
    f1, precision, recall = 2 * X / (Y + Z), X / Y, X / Z
    f2, precision2, recall2 = 2 * X2 / (Y2 + Z2), X2/ Y2, X2 / Z2
    return f1, precision, recall, f2, precision2, recall2

def trans_entity2tuple(scores):
    '''把tensor转为(样本id, start, end, 实体类型)的tuple用于计算指标
    '''
    batch_entity_ids = set()
    for i, one_samp in enumerate(scores):
        entity_ids = []
        for j, item in enumerate(one_samp):
            flag_tag = categories_id2label[item.item()]
            if flag_tag.startswith('B-'):  # B
                entity_ids.append([i, j, j, flag_tag[2:]])
            elif len(entity_ids) == 0:
                continue
            elif (len(entity_ids[-1]) > 0) and flag_tag.startswith('I-') and (flag_tag[2:]==entity_ids[-1][-1]):  # I
                entity_ids[-1][-2] = j
            elif len(entity_ids[-1]) > 0:
                entity_ids.append([])

        for i in entity_ids:
            if i:
                batch_entity_ids.add(tuple(i))
    return batch_entity_ids

class Evaluator(Callback):
    """评估与保存
    """
    def __init__(self):
        self.best_val_f1 = 0.

    def on_epoch_end(self, steps, epoch, logs=None):
        f1, precision, recall, f2, precision2, recall2 = evaluate(valid_dataloader)
        if f2 > self.best_val_f1:
            self.best_val_f1 = f2
            model.save_weights('./best_model.pt')
        print(f'[val-token  level] f1: {f1:.5f}, p: {precision:.5f} r: {recall:.5f}')
        print(f'[val-entity level] f1: {f2:.5f}, p: {precision2:.5f} r: {recall2:.5f} best_f1: {self.best_val_f1:.5f}\n')

if __name__ == '__main__':

    evaluator = Evaluator()
    model.fit(train_dataloader, epochs=20, steps_per_epoch=None, callbacks=[evaluator])

else: 

    model.load_weights('best_model.pt')
zhouyiyuan-mt commented 1 year ago

我们发现问题在于torch4keras的BaseModelDDP从多个父类继承过来,包括nn.parallel.DistributedDataParallel(torch库里的类)和Trainer(torch4keras里的类),Trainer把device定义成方法并加了@property装饰器,而在DistributedDataParallel又会对device再进行初始化,于是就会报错了

Tongjilibo commented 1 year ago

我们发现问题在于torch4keras的BaseModelDDP从多个父类继承过来,包括nn.parallel.DistributedDataParallel(torch库里的类)和Trainer(torch4keras里的类),Trainer把device定义成方法并加了@Property装饰器,而在DistributedDataParallel又会对device再进行初始化,于是就会报错了

能提供下bert4torch版本,以及torch4keras版本吗,我刚刚用你的代码也能正常跑起来,我看下是不是版本的问题

zhouyiyuan-mt commented 1 year ago

我们发现问题在于torch4keras的BaseModelDDP从多个父类继承过来,包括nn.parallel.DistributedDataParallel(torch库里的类)和Trainer(torch4keras里的类),Trainer把device定义成方法并加了@Property装饰器,而在DistributedDataParallel又会对device再进行初始化,于是就会报错了

能提供下bert4torch版本,以及torch4keras版本吗,我刚刚用你的代码也能正常跑起来,我看下是不是版本的问题

目前的版本bert4torch是0.3.2,torch4keras是0.1.1, torch是2.0.1(之前还测试过bert4torch 0.3.1, 0.3.1.post2都有相同的问题)。我的代码跑之前要export DDP_ON=1才是ddp版本。

Tongjilibo commented 1 year ago

好的,刚又看了下,好像是我这边的问题,我具体看一下,这两天争取更新掉

Tongjilibo commented 1 year ago

@zhouyiyuan-mt @jjRen-xd 原来的代码的确有问题,可以使用git上最新的torch4keras测试一下,我刚刚测试后是可行的,后续我会发个pip版本出来

zhouyiyuan-mt commented 1 year ago

好的,感谢🙏

Tongjilibo commented 1 year ago

最新pip版0.3.3已经修复了,可以直接pip install bert4torch==0.3.3安装

jjRen-xd commented 1 year ago

感谢🙏