Open 518789Adf opened 3 months ago
class MaskRcnnDataset(Dataset): def init(self, annotation_lines, input_shape=[600, 600], train=True): self.annotation_lines = annotation_lines self.length = len(annotation_lines) self.input_shape = input_shape self.train = train
def __len__(self):
return self.length
def __getitem__(self, index):
index = index % self.length
#---------------------------------------------------#
# 训练时进行数据的随机增强
# 验证时不进行数据的随机增强
#---------------------------------------------------#
image, y, masks = self.get_random_data(self.annotation_lines[index], self.input_shape[0:2], random=self.train)
image = np.transpose(F.to_tensor(np.array(image, dtype=np.float32)), (2, 0, 1))
box_data = np.zeros((len(y), 5))
if len(y) > 0:
box_data[:len(y)] = y
box = box_data[:, :4]
label = box_data[:, -1]
# Convert masks to the correct format
masks = [Image.fromarray(mask.astype(np.uint8)) for mask in masks]
masks = [F.to_tensor(mask) for mask in masks]
masks = torch.stack(masks, dim=0)
return image, box, label, masks
def rand(self, a=0, b=1):
return np.random.rand() * (b - a) + a
def get_random_data(self, annotation_line, input_shape, jitter=.3, hue=.1, sat=0.7, val=0.4, random=True):
line = annotation_line.split()
#------------------------------#
# 读取图像并转换成RGB图像
#------------------------------#
image = Image.open(line[0])
image = cvtColor(image)
#------------------------------#
# 获得图像的高宽与目标高宽
#------------------------------#
iw, ih = image.size
h, w = input_shape
#------------------------------#
# 获得预测框
#------------------------------#
box = np.array([np.array(list(map(int, box.split(',')))) for box in line[1:]])
#------------------------------#
# 加载掩码数据
#------------------------------#
masks = [Image.open(line[i + 1]).convert('1') for i in range(len(box))]
if not random:
scale = min(w / iw, h / ih)
nw = int(iw * scale)
nh = int(ih * scale)
dx = (w - nw) // 2
dy = (h - nh) // 2
# 将图像多余的部分加上灰条
image = image.resize((nw, nh), Image.BICUBIC)
new_image = Image.new('RGB', (w, h), (128, 128, 128))
new_image.paste(image, (dx, dy))
image_data = np.array(new_image, np.float32)
# 对真实框进行调整
if len(box) > 0:
np.random.shuffle(box)
box[:, [0, 2]] = box[:, [0, 2]] * nw / iw + dx
box[:, [1, 3]] = box[:, [1, 3]] * nh / ih + dy
# 对掩码进行调整
masks = [mask.resize((nw, nh), Image.NEAREST) for mask in masks]
masks = [np.pad(mask, ((0, h - nh), (0, w - nw)), mode='constant') for mask in masks]
box[:, 0:2][box[:, 0:2] < 0] = 0
box[:, 2][box[:, 2] > w] = w
box[:, 3][box[:, 3] > h] = h
box_w = box[:, 2] - box[:, 0]
box_h = box[:, 3] - box[:, 1]
box = box[np.logical_and(box_w > 1, box_h > 1)] # discard invalid box
masks = [mask for mask, b in zip(masks, box) if b[2] - b[0] > 1 and b[3] - b[1] > 1]
return image_data, box, masks
# 对图像进行缩放并且进行长和宽的扭曲
new_ar = iw / ih * self.rand(1 - jitter, 1 + jitter) / self.rand(1 - jitter, 1 + jitter)
scale = self.rand(.25, 2)
if new_ar < 1:
nh = int(scale * h)
nw = int(nh * new_ar)
else:
nw = int(scale * w)
nh = int(nw / new_ar)
image = image.resize((nw, nh), Image.BICUBIC)
# 将图像多余的部分加上灰条
dx = int(self.rand(0, w - nw))
dy = int(self.rand(0, h - nh))
new_image = Image.new('RGB', (w, h), (128, 128, 128))
new_image.paste(image, (dx, dy))
image = new_image
# 翻转图像
flip = self.rand() < .5
if flip:
image = image.transpose(Image.FLIP_LEFT_RIGHT)
masks = [mask.transpose(Image.FLIP_LEFT_RIGHT) for mask in masks]
image_data = np.array(image, np.uint8)
# 对图像进行色域变换
r = np.random.uniform(-1, 1, 3) * [hue, sat, val] + 1
# 将图像转到HSV上
hue, sat, val = cv2.split(cv2.cvtColor(image_data, cv2.COLOR_RGB2HSV))
dtype = image_data.dtype
# 应用变换
x = np.arange(0, 256, dtype=r.dtype)
lut_hue = ((x * r[0]) % 180).astype(dtype)
lut_sat = np.clip(x * r[1], 0, 255).astype(dtype)
lut_val = np.clip(x * r[2], 0, 255).astype(dtype)
image_data = cv2.merge((cv2.LUT(hue, lut_hue), cv2.LUT(sat, lut_sat), cv2.LUT(val, lut_val)))
image_data = cv2.cvtColor(image_data, cv2.COLOR_HSV2RGB)
# 对真实框进行调整
if len(box) > 0:
np.random.shuffle(box)
box[:, [0, 2]] = box[:, [0, 2]] * nw / iw + dx
box[:, [1, 3]] = box[:, [1, 3]] * nh / ih + dy
if flip:
box[:, [0, 2]] = w - box[:, [2, 0]]
box[:, 0:2][box[:, 0:2] < 0] = 0
box[:, 2][box[:, 2] > w] = w
box[:, 3][box[:, 3] > h] = h
box_w = box[:, 2] - box[:, 0]
box_h = box[:, 3] - box[:, 1]
box = box[np.logical_and(box_w > 1, box_h > 1)]
# 对掩码进行调整
masks = [mask.resize((nw, nh), Image.NEAREST) for mask in masks]
masks = [np.pad(mask, ((0, h - nh), (0, w - nw)), mode='constant') for mask in masks]
masks = [mask.transpose(Image.FLIP_LEFT_RIGHT) if flip else mask for mask in masks]
masks = [mask for mask, b in zip(masks, box) if b[2] - b[0] > 1 and b[3] - b[1] > 1]
return image_data, box, masks
注意,from torch.nn import functional as F加上这一句话防止.F报错未定义
import torch import torch.nn as nn import torch.nn.functional as F from torch.cuda.amp import autocast
class MaskRCNNTrainer(nn.Module): def init(self, model_train, optimizer): super(MaskRCNNTrainer, self).init() self.model_train = model_train self.optimizer = optimizer self.rpn_sigma = 1 self.roi_sigma = 1 self.anchor_target_creator = AnchorTargetCreator() self.proposal_target_creator = ProposalTargetCreator() self.loc_normalize_std = [0.1, 0.1, 0.2, 0.2]
def _fast_rcnn_loc_loss(self, pred_loc, gt_loc, gt_label, sigma):
# ... (保持原样)
def forward(self, imgs, bboxes, labels, masks, scale):
n = imgs.shape[0]
img_size = imgs.shape[2:]
#-#
# 获取公用特征层
#-#
base_feature = self.model_train(imgs, mode='extractor')
# - #
# 利用rpn网络获得调整参数、得分、建议框、先验框
# - #
rpn_locs, rpn_scores, rois, roi_indices, anchor = self.model_train(x=[base_feature, img_size], scale=scale, mode='rpn')
rpn_loc_loss_all, rpn_cls_loss_all, roi_loc_loss_all, roi_cls_loss_all, mask_loss_all = 0, 0, 0, 0, 0
sample_rois, sample_indexes, gt_roi_locs, gt_roi_labels, gt_masks = [], [], [], [], []
for i in range(n):
bbox = bboxes[i]
label = labels[i]
mask = masks[i]
rpn_loc = rpn_locs[i]
rpn_score = rpn_scores[i]
roi = rois[i]
# - #
# 利用真实框和先验框获得建议框网络应该有的预测结果
# - #
gt_rpn_loc, gt_rpn_label = self.anchor_target_creator(bbox, anchor[0].cpu().numpy())
gt_rpn_loc = torch.Tensor(gt_rpn_loc).type_as(rpn_locs)
gt_rpn_label = torch.Tensor(gt_rpn_label).type_as(rpn_locs).long()
# - #
# 分别计算建议框网络的回归损失和分类损失
# - #
rpn_loc_loss = self._fast_rcnn_loc_loss(rpn_loc, gt_rpn_loc, gt_rpn_label, self.rpn_sigma)
rpn_cls_loss = F.cross_entropy(rpn_score, gt_rpn_label, ignore_index=-1)
rpn_loc_loss_all += rpn_loc_loss
rpn_cls_loss_all += rpn_cls_loss
# - #
# 利用真实框和建议框获得classifier网络应该有的预测结果
# - #
sample_roi, gt_roi_loc, gt_roi_label = self.proposal_target_creator(roi, bbox, label, self.loc_normalize_std)
sample_rois.append(torch.Tensor(sample_roi).type_as(rpn_locs))
sample_indexes.append(torch.ones(len(sample_roi)).type_as(rpn_locs) * roi_indices[i][0])
gt_roi_locs.append(torch.Tensor(gt_roi_loc).type_as(rpn_locs))
gt_roi_labels.append(torch.Tensor(gt_roi_label).type_as(rpn_locs).long())
gt_masks.append(torch.Tensor(mask).type_as(rpn_locs))
sample_rois = torch.stack(sample_rois, dim=0)
sample_indexes = torch.stack(sample_indexes, dim=0)
roi_cls_locs, roi_scores, roi_masks = self.model_train([base_feature, sample_rois, sample_indexes, img_size], mode='head')
for i in range(n):
# - #
# 根据建议框的种类,取出对应的回归预测结果
# - #
n_sample = roi_cls_locs.size()[1]
roi_cls_loc = roi_cls_locs[i]
roi_score = roi_scores[i]
roi_mask = roi_masks[i]
gt_roi_loc = gt_roi_locs[i]
gt_roi_label = gt_roi_labels[i]
gt_mask = gt_masks[i]
roi_cls_loc = roi_cls_loc.view(n_sample, -1, 4)
roi_loc = roi_cls_loc[torch.arange(0, n_sample), gt_roi_label]
# - #
# 分别计算Classifier网络的回归损失和分类损失
# - #
roi_loc_loss = self._fast_rcnn_loc_loss(roi_loc, gt_roi_loc, gt_roi_label.data, self.roi_sigma)
roi_cls_loss = nn.CrossEntropyLoss()(roi_score, gt_roi_label)
roi_loc_loss_all += roi_loc_loss
roi_cls_loss_all += roi_cls_loss
# - #
# 计算Mask损失
# - #
roi_mask = roi_mask[torch.arange(0, n_sample), gt_roi_label.unsqueeze(1), :, :]
roi_mask_loss = F.binary_cross_entropy_with_logits(roi_mask, gt_mask, reduction='mean')
mask_loss_all += roi_mask_loss
losses = [rpn_loc_loss_all/n, rpn_cls_loss_all/n, roi_loc_loss_all/n, roi_cls_loss_all/n, mask_loss_all/n]
losses = losses + [sum(losses)]
return losses
def train_step(self, imgs, bboxes, labels, masks, scale, fp16=False, scaler=None):
self.optimizer.zero_grad()
if not fp16:
losses = self.forward(imgs, bboxes, labels, masks, scale)
losses[-1].backward()
self.optimizer.step()
else:
with autocast():
losses = self.forward(imgs, bboxes, labels, masks, scale)
#-#
# 反向传播
#-#
scaler.scale(losses[-1]).backward()
scaler.step(self.optimizer)
scaler.update()
return losses
class MaskRCNNTrainer(nn.Module): def init(self, model_train, optimizer): super(MaskRCNNTrainer, self).init() self.model_train = model_train self.optimizer = optimizer self.rpn_sigma = 1 self.roi_sigma = 1 self.anchor_target_creator = AnchorTargetCreator() self.proposal_target_creator = ProposalTargetCreator() self.loc_normalize_std = [0.1, 0.1, 0.2, 0.2]
def _fast_rcnn_loc_loss(self, pred_loc, gt_loc, gt_label, sigma):
# ... (保持原样)
pred_loc = pred_loc[gt_label > 0]
gt_loc = gt_loc[gt_label > 0]
sigma_squared = sigma ** 2
regression_diff = (gt_loc - pred_loc)
regression_diff = regression_diff.abs().float()
regression_loss = torch.where(
regression_diff < (1. / sigma_squared),
0.5 * sigma_squared * regression_diff ** 2,
regression_diff - 0.5 / sigma_squared
)
regression_loss = regression_loss.sum()
num_pos = (gt_label > 0).sum().float()
regression_loss /= torch.max(num_pos, torch.ones_like(num_pos))
return regression_loss
def forward(self, imgs, bboxes, labels, masks, scale):
n = imgs.shape[0]
img_size = imgs.shape[2:]
#-#
# 获取公用特征层
#-#
base_feature = self.model_train(imgs, mode='extractor')
# - #
# 利用rpn网络获得调整参数、得分、建议框、先验框
# - #
rpn_locs, rpn_scores, rois, roi_indices, anchor = self.model_train(x=[base_feature, img_size], scale=scale, mode='rpn')
rpn_loc_loss_all, rpn_cls_loss_all, roi_loc_loss_all, roi_cls_loss_all, mask_loss_all = 0, 0, 0, 0, 0
sample_rois, sample_indexes, gt_roi_locs, gt_roi_labels, gt_masks = [], [], [], [], []
for i in range(n):
bbox = bboxes[i]
label = labels[i]
mask = masks[i]
rpn_loc = rpn_locs[i]
rpn_score = rpn_scores[i]
roi = rois[i]
# - #
# 利用真实框和先验框获得建议框网络应该有的预测结果
# - #
gt_rpn_loc, gt_rpn_label = self.anchor_target_creator(bbox, anchor[0].cpu().numpy())
gt_rpn_loc = torch.Tensor(gt_rpn_loc).type_as(rpn_locs)
gt_rpn_label = torch.Tensor(gt_rpn_label).type_as(rpn_locs).long()
# - #
# 分别计算建议框网络的回归损失和分类损失
# - #
rpn_loc_loss = self._fast_rcnn_loc_loss(rpn_loc, gt_rpn_loc, gt_rpn_label, self.rpn_sigma)
rpn_cls_loss = F.cross_entropy(rpn_score, gt_rpn_label, ignore_index=-1)
rpn_loc_loss_all += rpn_loc_loss
rpn_cls_loss_all += rpn_cls_loss
# - #
# 利用真实框和建议框获得classifier网络应该有的预测结果
# - #
sample_roi, gt_roi_loc, gt_roi_label = self.proposal_target_creator(roi, bbox, label, self.loc_normalize_std)
sample_rois.append(torch.Tensor(sample_roi).type_as(rpn_locs))
sample_indexes.append(torch.ones(len(sample_roi)).type_as(rpn_locs) * roi_indices[i][0])
gt_roi_locs.append(torch.Tensor(gt_roi_loc).type_as(rpn_locs))
gt_roi_labels.append(torch.Tensor(gt_roi_label).type_as(rpn_locs).long())
gt_masks.append(torch.Tensor(mask).type_as(rpn_locs))
sample_rois = torch.stack(sample_rois, dim=0)
sample_indexes = torch.stack(sample_indexes, dim=0)
roi_cls_locs, roi_scores, roi_masks = self.model_train([base_feature, sample_rois, sample_indexes, img_size], mode='head')
for i in range(n):
# - #
# 根据建议框的种类,取出对应的回归预测结果
# - #
n_sample = roi_cls_locs.size()[1]
roi_cls_loc = roi_cls_locs[i]
roi_score = roi_scores[i]
roi_mask = roi_masks[i]
gt_roi_loc = gt_roi_locs[i]
gt_roi_label = gt_roi_labels[i]
gt_mask = gt_masks[i]
roi_cls_loc = roi_cls_loc.view(n_sample, -1, 4)
roi_loc = roi_cls_loc[torch.arange(0, n_sample), gt_roi_label]
# - #
# 分别计算Classifier网络的回归损失和分类损失
# - #
roi_loc_loss = self._fast_rcnn_loc_loss(roi_loc, gt_roi_loc, gt_roi_label.data, self.roi_sigma)
roi_cls_loss = nn.CrossEntropyLoss()(roi_score, gt_roi_label)
roi_loc_loss_all += roi_loc_loss
roi_cls_loss_all += roi_cls_loss
# - #
# 计算Mask损失
# - #
roi_mask = roi_mask[torch.arange(0, n_sample), gt_roi_label.unsqueeze(1), :, :]
roi_mask_loss = F.binary_cross_entropy_with_logits(roi_mask, gt_mask, reduction='mean')
mask_loss_all += roi_mask_loss
losses = [rpn_loc_loss_all/n, rpn_cls_loss_all/n, roi_loc_loss_all/n, roi_cls_loss_all/n, mask_loss_all/n]
losses = losses + [sum(losses)]
return losses
def train_step(self, imgs, bboxes, labels, masks, scale, fp16=False, scaler=None):
self.optimizer.zero_grad()
if not fp16:
losses = self.forward(imgs, bboxes, labels, masks, scale)
losses[-1].backward()
self.optimizer.step()
else:
with autocast():
losses = self.forward(imgs, bboxes, labels, masks, scale)
#-#
# 反向传播
#-#
scaler.scale(losses[-1]).backward()
scaler.step(self.optimizer)
scaler.update()
return losses
class MaskRCNN(nn.Module): 这个参数定义不能丢
class MaskRCNN(nn.Module):
def init(self, num_classes,
mode = "training",
feat_stride = 16,
anchor_scales = [8, 16, 32],
ratios = [0.5, 1, 2],
backbone = 'vgg',
pretrained = False):
super(MaskRCNN, self).init()
self.feat_stride = feat_stride
# 一共存在两个主干
# vgg和resnet50
#---------------------------------#
if backbone == 'vgg':
self.extractor, classifier = decom_vgg16(pretrained)
#---------------------------------#
# 构建建议框网络
#---------------------------------#
self.rpn = RegionProposalNetwork(
512, 512,
ratios = ratios,
anchor_scales = anchor_scales,
feat_stride = self.feat_stride,
mode = mode
)
#---------------------------------#
# 构建分类器网络
#---------------------------------#
self.head = VGG16RoIHead(
n_class = num_classes + 1,
roi_size = 7,
spatial_scale = 1,
classifier = classifier
)
self.mask_head = MaskRCNNHead(
n_class=num_classes + 1,
roi_size=14,
spatial_scale=1
)
elif backbone == 'resnet50':
self.extractor, classifier = resnet50(pretrained)
#---------------------------------#
# 构建classifier网络
#---------------------------------#
self.rpn = RegionProposalNetwork(
1024, 512,
ratios = ratios,
anchor_scales = anchor_scales,
feat_stride = self.feat_stride,
mode = mode
)
#---------------------------------#
# 构建classifier网络
#---------------------------------#
self.head = Resnet50RoIHead(
n_class = num_classes + 1,
roi_size = 14,
spatial_scale = 1,
classifier = classifier
)
self.mask_head = MaskRCNNHead(
n_class=num_classes + 1,
roi_size=14,
spatial_scale=1
)
def forward(self, x, scale=1., mode="forward"):
if mode == "forward":
#---------------------------------#
# 计算输入图片的大小
#---------------------------------#
img_size = x.shape[2:]
#---------------------------------#
# 利用主干网络提取特征
#---------------------------------#
base_feature = self.extractor.forward(x)
#---------------------------------#
# 获得建议框
#---------------------------------#
_, _, rois, roi_indices, _ = self.rpn.forward(base_feature, img_size, scale)
#---------------------------------------#
# 获得classifier的分类结果和回归结果
#---------------------------------------#
roi_cls_locs, roi_scores = self.head.forward(base_feature, rois, roi_indices, img_size)
return roi_cls_locs, roi_scores, rois, roi_indices
elif mode == "extractor":
#---------------------------------#
# 利用主干网络提取特征
#---------------------------------#
base_feature = self.extractor.forward(x)
return base_feature
elif mode == "rpn":
base_feature, img_size = x
#---------------------------------#
# 获得建议框
#---------------------------------#
rpn_locs, rpn_scores, rois, roi_indices, anchor = self.rpn.forward(base_feature, img_size, scale)
return rpn_locs, rpn_scores, rois, roi_indices, anchor
elif mode == "head":
base_feature, rois, roi_indices, img_size = x
#---------------------------------------#
# 获得classifier的分类结果和回归结果
#---------------------------------------#
roi_cls_locs, roi_scores = self.head.forward(base_feature, rois, roi_indices, img_size)
return roi_cls_locs, roi_scores
elif mode == "mask_head":
base_feature, rois, roi_indices, img_size = x
roi_masks = self.mask_head.forward(base_feature, rois, roi_indices, img_size)
return roi_masks
def freeze_bn(self):
for m in self.modules():
if isinstance(m, nn.BatchNorm2d):
m.eval()
import torch import numpy as np
def maskrcnn_dataset_collate(batch): images = [] bboxes = [] labels = [] masks = [] # 新增的列表,用于存储掩码信息
for img, box, label, mask in batch: # 假设 batch 中的每个元素现在包含 (img, box, label, mask)
images.append(img)
bboxes.append(box)
labels.append(label)
masks.append(mask) # 添加掩码到 masks 列表
images = torch.from_numpy(np.array(images))
return images, bboxes, labels, masks # 返回四个列表
train.py文件修改:
"""
train_dataset = MaskRcnnDataset(train_lines, input_shape, train = True)
val_dataset = MaskRcnnDataset(val_lines, input_shape, train = False)
gen = DataLoader(train_dataset, shuffle = True, batch_size = batch_size, num_workers = num_workers, pin_memory=True,
drop_last=True, collate_fn=maskrcnn_dataset_collate,
worker_init_fn=partial(worker_init_fn, rank=0, seed=seed))
gen_val = DataLoader(val_dataset , shuffle = True, batch_size = batch_size, num_workers = num_workers, pin_memory=True,
drop_last=True, collate_fn=maskrcnn_dataset_collate,
worker_init_fn=partial(worker_init_fn, rank=0, seed=seed))
train_util = MaskRCNNTrainer(model_train, optimizer)
这样就正常了 再添加导入
from nets.Maskrcnn import MaskRCNN
import colorsys import os import time
import numpy as np import torch import torch.nn as nn from PIL import Image, ImageDraw, ImageFont from nets.Maskrcnn import MaskRCNN from nets.frcnn import FasterRCNN from utils.utils import (cvtColor, get_classes, get_new_img_size, resize_image, preprocess_input, show_config) from utils.utils_bbox import DecodeBox from utils.utils_mask import expand_boxes, expand_masks, random_colors, apply_mask, display_instances
class MRCNN(object): _defaults = { "model_path": 'model_data/mask_rcnn_weights.pth', "classes_path": 'model_data/voc_classes.txt', "backbone": "resnet50", "confidence": 0.5, "nms_iou": 0.3, 'anchors_size': [8, 16, 32], "cuda": False, }
@classmethod
def get_defaults(cls, n):
if n in cls._defaults:
return cls._defaults[n]
else:
return "Unrecognized attribute name '" + n + "'"
def __init__(self, **kwargs):
self.__dict__.update(self._defaults)
for name, value in kwargs.items():
setattr(self, name, value)
self._defaults[name] = value
self.class_names, self.num_classes = get_classes(self.classes_path)
self.std = torch.Tensor([0.1, 0.1, 0.2, 0.2]).repeat(self.num_classes + 1)[None]
if self.cuda:
self.std = self.std.cuda()
self.bbox_util = DecodeBox(self.std, self.num_classes)
self.generate()
show_config(**self._defaults)
def generate(self):
self.net = MaskRCNN(self.num_classes, "predict", anchor_scales=self.anchors_size, backbone=self.backbone)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
self.net.load_state_dict(torch.load(self.model_path, map_location=device))
self.net = self.net.eval()
print('{} model, anchors, and classes loaded.'.format(self.model_path))
if self.cuda:
self.net = nn.DataParallel(self.net)
self.net = self.net.cuda()
def detect_image(self, image, crop=False, count=False):
image_shape = np.array(np.shape(image)[0:2])
input_shape = get_new_img_size(image_shape[0], image_shape[1])
image = cvtColor(image)
image_data = resize_image(image, [input_shape[1], input_shape[0]])
image_data = np.expand_dims(np.transpose(preprocess_input(np.array(image_data, dtype='float32')), (2, 0, 1)), 0)
with torch.no_grad():
images = torch.from_numpy(image_data)
if self.cuda:
images = images.cuda()
roi_cls_locs, roi_scores, rois, masks, _ = self.net(images)
results = self.bbox_util.forward(roi_cls_locs, roi_scores, rois, image_shape, input_shape,
nms_iou=self.nms_iou, confidence=self.confidence)
if len(results[0]) <= 0:
return image
top_label = np.array(results[0][:, 5], dtype='int32')
top_conf = results[0][:, 4]
top_boxes = results[0][:, :4]
masks = masks.squeeze(0).permute(1, 2, 0).cpu().numpy()
boxes = expand_boxes(top_boxes, scale=1.0)
masks = expand_masks(masks, boxes, image_shape, scale=1.0)
if count:
print("top_label:", top_label)
classes_nums = np.zeros([self.num_classes])
for i in range(self.num_classes):
num = np.sum(top_label == i)
if num > 0:
print(self.class_names[i], " : ", num)
classes_nums[i] = num
print("classes_nums:", classes_nums)
if crop:
for i, c in enumerate(top_label):
top, left, bottom, right = boxes[i]
top = max(0, np.floor(top).astype('int32'))
left = max(0, np.floor(left).astype('int32'))
bottom = min(image.size[1], np.ceil(bottom).astype('int32'))
right = min(image.size[0], np.ceil(right).astype('int32'))
dir_save_path = "img_crop"
if not os.path.exists(dir_save_path):
os.makedirs(dir_save_path)
crop_image = image.crop([left, top, right, bottom])
crop_image.save(os.path.join(dir_save_path, "crop_" + str(i) + ".png"), quality=95, subsampling=0)
print("save crop_" + str(i) + ".png to " + dir_save_path)
colors = random_colors(len(top_boxes))
masked_image = image.copy()
for i, c in enumerate(top_label):
masked_image = apply_mask(masked_image, masks[:, :, i], self.class_names[c], colors[i],
alpha=0.5)
image = Image.fromarray(masked_image)
font = ImageFont.truetype(font='model_data/simhei.ttf', size=np.floor(3e-2 * image.size[1] + 0.5).astype('int32'))
thickness = int(max((image.size[0] + image.size[1]) // np.mean(input_shape), 1))
for i, c in enumerate(top_label):
top, left, bottom, right = boxes[i]
top = max(0, np.floor(top).astype('int32'))
left = max(0, np.floor(left).astype('int32'))
label = '{} {:.2f}'.format(self.class_names[c], top_conf[i])
draw = ImageDraw.Draw(image)
label_size = draw.textsize(label, font)
text_origin = np.array([left, top + 1])
for j in range(thickness):
draw.rectangle([left + j, top + j, right - j, bottom - j], outline=colors[i])
draw.rectangle([tuple(text_origin), tuple(text_origin + label_size)], fill=colors[i])
draw.text(text_origin, str(label, 'UTF-8'), fill=(0, 0, 0), font=font)
del draw
return image
def get_FPS(self, image, test_interval):
image_shape = np.array(np.shape(image)[0:2])
input_shape = get_new_img_size(image_shape[0], image_shape[1])
image = cvtColor(image)
image_data = resize_image(image, [input_shape[1], input_shape[0]])
image_data = np.expand_dims(np.transpose(preprocess_input(np.array(image_data, dtype='float32')), (2, 0, 1)), 0)
with torch.no_grad():
images = torch.from_numpy(image_data)
if self.cuda:
images = images.cuda()
roi_cls_locs, roi_scores, rois, masks, _ = self.net(images)
t1 = time.time()
for _ in range(test_interval):
with torch.no_grad():
roi_cls_locs, roi_scores, rois, masks, _ = self.net(images)
t2 = time.time()
tact_time = (t2 - t1) / test_interval
return tact_time
def get_map_txt(self, image_id, image, class_names, map_out_path):
f = open(os.path.join(map_out_path, "detection-results/" + image_id + ".txt"), "w")
image_shape = np.array(np.shape(image)[0:2])
input_shape = get_new_img_size(image_shape[0], image_shape[1])
image = cvtColor(image)
image_data = resize_image(image, [input_shape[1], input_shape[0]])
image_data = np.expand_dims(np.transpose(preprocess_input(np.array(image_data, dtype='float32')), (2, 0, 1)), 0)
with torch.no_grad():
images = torch.from_numpy(image_data)
if self.cuda:
images = images.cuda()
roi_cls_locs, roi_scores, rois, masks, _ = self.net(images)
results = self.bbox_util.forward(roi_cls_locs, roi_scores, rois, image_shape, input_shape,
nms_iou=self.nms_iou, confidence=self.confidence)
if len(results[0]) <= 0:
return
top_label = np.array(results[0][:, 5], dtype='int32')
top_conf = results[0][:, 4]
top_boxes = results[0][:, :4]
for i, c in list(enumerate(top_label)):
predicted_class = self.class_names[int(c)]
box = top_boxes[i]
score = str(top_conf[i])
top, left, bottom, right = box
if predicted_class not in class_names:
continue
f.write("%s %s %s %s %s %s\n" %
(predicted_class, score[:6], str(int(left)), str(int(top)), str(int(right)), str(int(bottom))))
f.close()
return
frcnn文件补出这个
import numpy as np from skimage.measure import find_contours import matplotlib.pyplot as plt import matplotlib.patches as patches import matplotlib.lines as lines from matplotlib.collections import PatchCollection import colorsys from PIL import Image, ImageDraw import random
def expand_boxes(boxes, scale): """Expand an array of boxes by a given scale.""" w_half = (boxes[:, 2] - boxes[:, 0]) .5 h_half = (boxes[:, 3] - boxes[:, 1]) .5 x_c = (boxes[:, 2] + boxes[:, 0]) .5 y_c = (boxes[:, 3] + boxes[:, 1]) .5
w_half *= scale
h_half *= scale
boxes_exp = np.zeros(boxes.shape)
boxes_exp[:, 0] = x_c - w_half
boxes_exp[:, 2] = x_c + w_half
boxes_exp[:, 1] = y_c - h_half
boxes_exp[:, 3] = y_c + h_half
return boxes_exp
def expand_masks(masks, boxes, image_shape, scale): """Expand masks using the specified scale."""
boxes_exp = expand_boxes(boxes, scale)
# Extract masks within the expanded boxes
expanded_masks = []
for i, box in enumerate(boxes_exp):
mask = masks[:, :, i]
y1, x1, y2, x2 = box.astype(int)
# Ensure the mask is within the image bounds
y1 = max(0, y1)
x1 = max(0, x1)
y2 = min(image_shape[0], y2)
x2 = min(image_shape[1], x2)
# Crop the mask
cropped_mask = mask[y1:y2, x1:x2]
# Resize the cropped mask back to the original mask size
resized_mask = np.resize(cropped_mask, (image_shape[0], image_shape[1]))
expanded_masks.append(resized_mask)
return np.stack(expanded_masks, axis=-1)
def random_colors(N, bright=True): """ Generate random colors. To get visually distinct colors, generate them in HSV space then convert to RGB. """ brightness = 1.0 if bright else 0.7 hsv = [(i / N, 1, brightness) for i in range(N)] colors = list(map(lambda c: colorsys.hsv_to_rgb(*c), hsv)) return colors
def apply_mask(image, mask, color, alpha=0.5): """Apply the given mask to the image.""" for c in range(3): image[:, :, c] = np.where(mask == 1, image[:, :, c] (1 - alpha) + alpha color[c] * 255, image[:, :, c]) return image
def display_instances(image, boxes, masks, class_ids, class_names, scores=None, title="", figsize=(16, 16), ax=None, show_mask=True, show_bbox=True, colors=None, captions=None): """ boxes: [num_instance, (y1, x1, y2, x2, class_id)] in image coordinates. masks: [height, width, num_instances] class_ids: [num_instances] class_names: list of class names of the dataset scores: (optional) confidence scores for each box title: (optional) Figure title show_mask, show_bbox: To show masks and bounding boxes or not figsize: (optional) the size of the image colors: (optional) An array or colors to use with each object captions: (optional) A list of strings to use as captions for each object """
N = boxes.shape[0]
if not N:
print("\n*** No instances to display *** \n")
else:
assert boxes.shape[0] == masks.shape[-1] == class_ids.shape[0]
# If no axis is passed, create one and automatically call show()
auto_show = False
if not ax:
_, ax = plt.subplots(1, figsize=figsize)
auto_show = True
# Generate random colors
colors = colors or random_colors(N)
# Show area outside image boundaries.
height, width = image.shape[:2]
ax.set_ylim(height + 10, -10)
ax.set_xlim(-10, width + 10)
ax.axis('off')
ax.set_title(title)
masked_image = image.astype(np.uint32).copy()
for i in range(N):
color = colors[i]
# Bounding box
if not np.any(boxes[i]):
# Skip this instance. Has no bbox. Likely lost in image cropping.
continue
y1, x1, y2, x2 = boxes[i]
if show_bbox:
p = patches.Rectangle((x1, y1), x2 - x1, y2 - y1, linewidth=2,
alpha=0.7, linestyle="dashed",
edgecolor=color, facecolor='none')
ax.add_patch(p)
# Label
if not captions:
class_id = class_ids[i]
score = scores[i] if scores is not None else None
label = class_names[class_id]
x = random.randint(x1, (x1 + x2) // 2)
caption = "{} {:.3f}".format(label, score) if score else label
else:
caption = captions[i]
ax.text(x1, y1 + 8, caption,
color='w', size=11, backgroundcolor="none")
# Mask
mask = masks[:, :, i]
if show_mask:
masked_image = apply_mask(masked_image, mask, color)
# Mask Polygon
# Pad to ensure proper polygons for masks that touch image edges.
padded_mask = np.zeros(
(mask.shape[0] + 2, mask.shape[1] + 2), dtype=np.uint8)
padded_mask[1:-1, 1:-1] = mask
contours = find_contours(padded_mask, 0.5)
for verts in contours:
# Subtract the padding and flip (y, x) to (x, y)
verts = np.fliplr(verts) - 1
p = patches.Polygon(verts, facecolor="none", edgecolor=color)
ax.add_patch(p)
# Show the image with masks and bounding boxes
ax.imshow(masked_image.astype(np.uint8))
if auto_show:
plt.show()
这个是utils_mask.py
"""
train_dataset = MaskRcnnDataset(train_lines, input_shape, train = True)
val_dataset = MaskRcnnDataset(val_lines, input_shape, train = False)
gen = DataLoader(train_dataset, shuffle = True, batch_size = batch_size, num_workers = num_workers, pin_memory=True,
drop_last=True, collate_fn=maskrcnn_dataset_collate,
worker_init_fn=partial(worker_init_fn, rank=0, seed=seed))
gen_val = DataLoader(val_dataset , shuffle = True, batch_size = batch_size, num_workers = num_workers, pin_memory=True,
drop_last=True, collate_fn=maskrcnn_dataset_collate,
worker_init_fn=partial(worker_init_fn, rank=0, seed=seed))
train_util = MaskRCNNTrainer(model_train, optimizer)
"""
#----------------------#
训练时替换
frcnn = FRCNN()
#frcnn = MRCNN(confidence = confidence, nms_iou = nms_iou)
这是替换,用于predict和getmap,开头加上 from frcnn import FRCNN,MRCNN 即可
def fit_one_epoch_1(model, train_util, loss_history, eval_callback, optimizer, epoch, epoch_step, epoch_step_val, gen, gen_val, Epoch, cuda, fp16, scaler, save_period, save_dir): total_loss = 0 rpn_loc_loss = 0 rpn_cls_loss = 0 roi_loc_loss = 0 roi_cls_loss = 0 mask_loss = 0
val_loss = 0
print('Start Train')
with tqdm(total=epoch_step, desc=f'Epoch {epoch + 1}/{Epoch}', postfix=dict, mininterval=0.3) as pbar:
for iteration, batch in enumerate(gen):
if iteration >= epoch_step:
break
images, boxes, labels, masks = batch[0], batch[1], batch[2], batch[3]
with torch.no_grad():
if cuda:
images = images.cuda()
boxes = [box.cuda() for box in boxes]
labels = [label.cuda() for label in labels]
masks = [mask.cuda() for mask in masks]
if fp16:
with torch.cuda.amp.autocast():
rpn_loc, rpn_cls, roi_loc, roi_cls, mask, losses = model(images, boxes, labels, masks)
total = sum(losses.values())
else:
rpn_loc, rpn_cls, roi_loc, roi_cls, mask, losses = model(images, boxes, labels, masks)
total = sum(losses.values())
optimizer.zero_grad()
if fp16:
scaler.scale(total).backward()
scaler.step(optimizer)
scaler.update()
else:
total.backward()
optimizer.step()
total_loss += total.item()
rpn_loc_loss += rpn_loc.item()
rpn_cls_loss += rpn_cls.item()
roi_loc_loss += roi_loc.item()
roi_cls_loss += roi_cls.item()
mask_loss += mask.item()
pbar.set_postfix(**{'total_loss': total_loss / (iteration + 1),
'rpn_loc': rpn_loc_loss / (iteration + 1),
'rpn_cls': rpn_cls_loss / (iteration + 1),
'roi_loc': roi_loc_loss / (iteration + 1),
'roi_cls': roi_cls_loss / (iteration + 1),
'mask_loss': mask_loss / (iteration + 1),
'lr': optimizer.param_groups[0]['lr']})
pbar.update(1)
print('Finish Train')
print('Start Validation')
model.eval() # Set the model to evaluation mode
with tqdm(total=epoch_step_val, desc=f'Epoch {epoch + 1}/{Epoch}', postfix=dict, mininterval=0.3) as pbar:
for iteration, batch in enumerate(gen_val):
if iteration >= epoch_step_val:
break
images, boxes, labels, masks = batch[0], batch[1], batch[2], batch[3]
with torch.no_grad():
if cuda:
images = images.cuda()
boxes = [box.cuda() for box in boxes]
labels = [label.cuda() for label in labels]
masks = [mask.cuda() for mask in masks]
_, _, _, _, _, val_losses = model(images, boxes, labels, masks)
val_total = sum(val_losses.values())
val_loss += val_total.item()
pbar.set_postfix(**{'val_loss': val_loss / (iteration + 1)})
pbar.update(1)
print('Finish Validation')
loss_history.append_loss(epoch + 1, total_loss / epoch_step, val_loss / epoch_step_val)
eval_callback.on_epoch_end(epoch + 1)
print('Epoch:' + str(epoch + 1) + '/' + str(Epoch))
print('Total Loss: %.3f || Val Loss: %.3f ' % (total_loss / epoch_step, val_loss / epoch_step_val))
#-----------------------------------------------#
# Save weights
#-----------------------------------------------#
if (epoch + 1) % save_period == 0 or epoch + 1 == Epoch:
torch.save(model.state_dict(), os.path.join(save_dir, 'ep%03d-loss%.3f-val_loss%.3f.pth' % (epoch + 1, total_loss / epoch_step, val_loss / epoch_step_val)))
if len(loss_history.val_loss) <= 1 or (val_loss / epoch_step_val) <= min(loss_history.val_loss):
print('Save best model to best_epoch_weights.pth')
torch.save(model.state_dict(), os.path.join(save_dir, "best_epoch_weights.pth"))
torch.save(model.state_dict(), os.path.join(save_dir, "last_epoch_weights.pth"))
#Maskrcnn
fit_one_epoch_1(model, train_util, loss_history, eval_callback, optimizer, epoch, epoch_step, epoch_step_val, gen, gen_val, UnFreeze_Epoch, Cuda, fp16, scaler, save_period, save_dir)
改成这个并导入就行
masks = [Image.open(line[0][:-3]+"png").convert('1') for i in range(len(box))] 在utils/dataloader.py的Class MaskRcnnDAtaset下修改即可。
masks = [Image.open((line[0][:-3]+"png").replace("JPEG","Segmentation")).convert('1') for i in range(len(box))]
有代码,以下是maskrcnn在这个基代码的实现: class MaskRCNNHead(nn.Module): def init(self, n_class, roi_size, spatial_scale, num_convs=4, conv_dim=256, mask_out_dim=28): super(MaskRCNNHead, self).init()
def roi_pooling(features, roi, roi_index, roi_size, spatial_scale):
假设roi_pooling是一个已经定义好的函数,用于从特征图中裁剪出对应ROI的部分
还有就是,再建立一个文件maskrcnn.py: import torch.nn as nn
from nets.classifier import Resnet50RoIHead, VGG16RoIHead, MaskRCNNHead from nets.resnet50 import resnet50 from nets.rpn import RegionProposalNetwork from nets.vgg16 import decom_vgg16
class MaskRCNN(): def init(self, num_classes,
mode = "training", feat_stride = 16, anchor_scales = [8, 16, 32], ratios = [0.5, 1, 2], backbone = 'vgg', pretrained = False): super(MaskRCNN, self).init() self.feat_stride = feat_stride
---------------------------------
即可完成maskrcnn的训练功能!