Open C0NGTRI123 opened 10 months ago
mixnet.py.
__all__ = ['MixNet', 'mixnet_s', 'mixnet_m', 'mixnet_l']
import torch.nn.init as init
import torch
import torch.nn as nn
from torch.nn import Parameter
import torch.nn.functional as F
from inspect import isfunction
from torch.autograd import Variable
import numpy as np
class SelectableDense(nn.Module):
"""
Selectable dense layer.
Parameters:
----------
in_features : int
Number of input features.
out_features : int
Number of output features.
bias : bool, default False
Whether the layer uses a bias vector.
num_options : int, default 1
Number of selectable options.
"""
def __init__(self,
in_features,
out_features,
bias=False,
num_options=1):
super(SelectableDense, self).__init__()
self.in_features = in_features
self.out_features = out_features
self.use_bias = bias
self.num_options = num_options
self.weight = Parameter(torch.Tensor(num_options, out_features, in_features))
if bias:
self.bias = Parameter(torch.Tensor(num_options, out_features))
else:
self.register_parameter("bias", None)
def forward(self, x, indices):
weight = torch.index_select(self.weight, dim=0, index=indices)
x = x.unsqueeze(-1)
x = weight.bmm(x)
x = x.squeeze(dim=-1)
if self.use_bias:
bias = torch.index_select(self.bias, dim=0, index=indices)
x += bias
return x
def extra_repr(self):
return "in_features={}, out_features={}, bias={}, num_options={}".format(
self.in_features, self.out_features, self.use_bias, self.num_options)
class DenseBlock(nn.Module):
"""
Standard dense block with Batch normalization and activation.
Parameters:
----------
in_features : int
Number of input features.
out_features : int
Number of output features.
bias : bool, default False
Whether the layer uses a bias vector.
use_bn : bool, default True
Whether to use BatchNorm layer.
bn_eps : float, default 1e-5
Small float added to variance in Batch norm.
activation : function or str or None, default nn.ReLU(inplace=True)
Activation function or name of activation function.
"""
def __init__(self,
in_features,
out_features,
bias=False,
use_bn=True,
bn_eps=1e-5,
activation=(lambda: nn.ReLU(inplace=True))):
super(DenseBlock, self).__init__()
self.activate = (activation is not None)
self.use_bn = use_bn
self.fc = nn.Linear(
in_features=in_features,
out_features=out_features,
bias=bias)
if self.use_bn:
self.bn = nn.BatchNorm1d(
num_features=out_features,
eps=bn_eps)
if self.activate:
self.activ = get_activation_layer(activation)
def forward(self, x):
x = self.fc(x)
if self.use_bn:
x = self.bn(x)
if self.activate:
x = self.activ(x)
return x
class ConvBlock1d(nn.Module):
"""
Standard 1D convolution block with Batch normalization and activation.
Parameters:
----------
in_channels : int
Number of input channels.
out_channels : int
Number of output channels.
kernel_size : int
Convolution window size.
stride : int
Strides of the convolution.
padding : int
Padding value for convolution layer.
dilation : int
Dilation value for convolution layer.
groups : int, default 1
Number of groups.
bias : bool, default False
Whether the layer uses a bias vector.
use_bn : bool, default True
Whether to use BatchNorm layer.
bn_eps : float, default 1e-5
Small float added to variance in Batch norm.
activation : function or str or None, default nn.ReLU(inplace=True)
Activation function or name of activation function.
"""
def __init__(self,
in_channels,
out_channels,
kernel_size,
stride,
padding,
dilation=1,
groups=1,
bias=False,
use_bn=True,
bn_eps=1e-5,
activation=(lambda: nn.ReLU(inplace=True))):
super(ConvBlock1d, self).__init__()
self.activate = (activation is not None)
self.use_bn = use_bn
self.conv = nn.Conv1d(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=kernel_size,
stride=stride,
padding=padding,
dilation=dilation,
groups=groups,
bias=bias)
if self.use_bn:
self.bn = nn.BatchNorm1d(
num_features=out_channels,
eps=bn_eps)
if self.activate:
self.activ = get_activation_layer(activation)
def forward(self, x):
x = self.conv(x)
if self.use_bn:
x = self.bn(x)
if self.activate:
x = self.activ(x)
return x
class Hourglass(nn.Module):
"""
A hourglass module.
Parameters:
----------
down_seq : nn.Sequential
Down modules as sequential.
up_seq : nn.Sequential
Up modules as sequential.
skip_seq : nn.Sequential
Skip connection modules as sequential.
merge_type : str, default 'add'
Type of concatenation of up and skip outputs.
return_first_skip : bool, default False
Whether return the first skip connection output. Used in ResAttNet.
"""
def __init__(self,
down_seq,
up_seq,
skip_seq,
merge_type="add",
return_first_skip=False):
super(Hourglass, self).__init__()
self.depth = len(down_seq)
assert (merge_type in ["cat", "add"])
assert (len(up_seq) == self.depth)
assert (len(skip_seq) in (self.depth, self.depth + 1))
self.merge_type = merge_type
self.return_first_skip = return_first_skip
self.extra_skip = (len(skip_seq) == self.depth + 1)
self.down_seq = down_seq
self.up_seq = up_seq
self.skip_seq = skip_seq
def _merge(self, x, y):
if y is not None:
if self.merge_type == "cat":
x = torch.cat((x, y), dim=1)
elif self.merge_type == "add":
x = x + y
return x
def forward(self, x, **kwargs):
y = None
down_outs = [x]
for down_module in self.down_seq._modules.values():
x = down_module(x)
down_outs.append(x)
for i in range(len(down_outs)):
if i != 0:
y = down_outs[self.depth - i]
skip_module = self.skip_seq[self.depth - i]
y = skip_module(y)
x = self._merge(x, y)
if i != len(down_outs) - 1:
if (i == 0) and self.extra_skip:
skip_module = self.skip_seq[self.depth]
x = skip_module(x)
up_module = self.up_seq[self.depth - 1 - i]
x = up_module(x)
if self.return_first_skip:
return x, y
else:
return x
class SesquialteralHourglass(nn.Module):
"""
A sesquialteral hourglass block.
Parameters:
----------
down1_seq : nn.Sequential
The first down modules as sequential.
skip1_seq : nn.Sequential
The first skip connection modules as sequential.
up_seq : nn.Sequential
Up modules as sequential.
skip2_seq : nn.Sequential
The second skip connection modules as sequential.
down2_seq : nn.Sequential
The second down modules as sequential.
merge_type : str, default 'cat'
Type of concatenation of up and skip outputs.
"""
def __init__(self,
down1_seq,
skip1_seq,
up_seq,
skip2_seq,
down2_seq,
merge_type="cat"):
super(SesquialteralHourglass, self).__init__()
assert (len(down1_seq) == len(up_seq))
assert (len(down1_seq) == len(down2_seq))
assert (len(skip1_seq) == len(skip2_seq))
assert (len(down1_seq) == len(skip1_seq) - 1)
assert (merge_type in ["cat", "add"])
self.merge_type = merge_type
self.depth = len(down1_seq)
self.down1_seq = down1_seq
self.skip1_seq = skip1_seq
self.up_seq = up_seq
self.skip2_seq = skip2_seq
self.down2_seq = down2_seq
def _merge(self, x, y):
if y is not None:
if self.merge_type == "cat":
x = torch.cat((x, y), dim=1)
elif self.merge_type == "add":
x = x + y
return x
def forward(self, x, **kwargs):
y = self.skip1_seq[0](x)
skip1_outs = [y]
for i in range(self.depth):
x = self.down1_seq[i](x)
y = self.skip1_seq[i + 1](x)
skip1_outs.append(y)
x = skip1_outs[self.depth]
y = self.skip2_seq[0](x)
skip2_outs = [y]
for i in range(self.depth):
x = self.up_seq[i](x)
y = skip1_outs[self.depth - 1 - i]
x = self._merge(x, y)
y = self.skip2_seq[i + 1](x)
skip2_outs.append(y)
x = self.skip2_seq[self.depth](x)
for i in range(self.depth):
x = self.down2_seq[i](x)
y = skip2_outs[self.depth - 1 - i]
x = self._merge(x, y)
return x
class MultiOutputSequential(nn.Sequential):
"""
A sequential container with multiple outputs.
Modules will be executed in the order they are added.
Parameters:
----------
multi_output : bool, default True
Whether to return multiple output.
dual_output : bool, default False
Whether to return dual output.
return_last : bool, default True
Whether to forcibly return last value.
"""
def __init__(self,
multi_output=True,
dual_output=False,
return_last=True):
super(MultiOutputSequential, self).__init__()
self.multi_output = multi_output
self.dual_output = dual_output
self.return_last = return_last
def forward(self, x):
outs = []
for module in self._modules.values():
x = module(x)
if hasattr(module, "do_output") and module.do_output:
outs.append(x)
elif hasattr(module, "do_output2") and module.do_output2:
assert (type(x) == tuple)
outs.extend(x[1])
x = x[0]
if self.multi_output:
return [x] + outs if self.return_last else outs
elif self.dual_output:
return x, outs
else:
return x
class HeatmapMaxDetBlock(nn.Module):
"""
Heatmap maximum detector block (for human pose estimation task).
"""
def __init__(self):
super(HeatmapMaxDetBlock, self).__init__()
def forward(self, x):
heatmap = x
vector_dim = 2
batch = heatmap.shape[0]
channels = heatmap.shape[1]
in_size = x.shape[2:]
heatmap_vector = heatmap.view(batch, channels, -1)
scores, indices = heatmap_vector.max(dim=vector_dim, keepdims=True)
scores_mask = (scores > 0.0).float()
pts_x = (indices % in_size[1]) * scores_mask
pts_y = (indices // in_size[1]) * scores_mask
pts = torch.cat((pts_x, pts_y, scores), dim=vector_dim)
for b in range(batch):
for k in range(channels):
hm = heatmap[b, k, :, :]
px = int(pts[b, k, 0])
py = int(pts[b, k, 1])
if (0 < px < in_size[1] - 1) and (0 < py < in_size[0] - 1):
pts[b, k, 0] += (hm[py, px + 1] - hm[py, px - 1]).sign() * 0.25
pts[b, k, 1] += (hm[py + 1, px] - hm[py - 1, px]).sign() * 0.25
return pts
@staticmethod
def calc_flops(x):
assert (x.shape[0] == 1)
num_flops = x.numel() + 26 * x.shape[1]
num_macs = 0
return num_flops, num_macs
class ParallelConcurent(nn.Sequential):
"""
A sequential container with multiple inputs and multiple outputs.
Modules will be executed in the order they are added.
"""
def __init__(self):
super(ParallelConcurent, self).__init__()
def forward(self, x):
out = []
for module, xi in zip(self._modules.values(), x):
out.append(module(xi))
return out
class BreakBlock(nn.Module):
"""
Break coonnection block for hourglass.
"""
def __init__(self):
super(BreakBlock, self).__init__()
def forward(self, x):
return None
def __repr__(self):
return '{name}()'.format(name=self.__class__.__name__)
def conv5x5_block(in_channels,
out_channels,
stride=1,
padding=2,
dilation=1,
groups=1,
bias=False,
use_bn=True,
bn_eps=1e-5,
activation=(lambda: nn.ReLU(inplace=True))):
"""
5x5 version of the standard convolution block.
Parameters:
----------
in_channels : int
Number of input channels.
out_channels : int
Number of output channels.
stride : int or tuple/list of 2 int, default 1
Strides of the convolution.
padding : int, or tuple/list of 2 int, or tuple/list of 4 int, default 2
Padding value for convolution layer.
dilation : int or tuple/list of 2 int, default 1
Dilation value for convolution layer.
groups : int, default 1
Number of groups.
bias : bool, default False
Whether the layer uses a bias vector.
use_bn : bool, default True
Whether to use BatchNorm layer.
bn_eps : float, default 1e-5
Small float added to variance in Batch norm.
activation : function or str or None, default nn.ReLU(inplace=True)
Activation function or name of activation function.
"""
return ConvBlock(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=5,
stride=stride,
padding=padding,
dilation=dilation,
groups=groups,
bias=bias,
use_bn=use_bn,
bn_eps=bn_eps,
activation=activation)
def conv7x7_block(in_channels,
out_channels,
stride=1,
padding=3,
dilation=1,
groups=1,
bias=False,
use_bn=True,
bn_eps=1e-5,
activation=(lambda: nn.ReLU(inplace=True))):
"""
7x7 version of the standard convolution block.
Parameters:
----------
in_channels : int
Number of input channels.
out_channels : int
Number of output channels.
padding : int, or tuple/list of 2 int, or tuple/list of 4 int, default 1
Strides of the convolution.
padding : int or tuple/list of 2 int, default 3
Padding value for convolution layer.
dilation : int or tuple/list of 2 int, default 1
Dilation value for convolution layer.
groups : int, default 1
Number of groups.
bias : bool, default False
Whether the layer uses a bias vector.
use_bn : bool, default True
Whether to use BatchNorm layer.
bn_eps : float, default 1e-5
Small float added to variance in Batch norm.
activation : function or str or None, default nn.ReLU(inplace=True)
Activation function or name of activation function.
"""
return ConvBlock(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=7,
stride=stride,
padding=padding,
dilation=dilation,
groups=groups,
bias=bias,
use_bn=use_bn,
bn_eps=bn_eps,
activation=activation)
def dwconv5x5_block(in_channels,
out_channels,
stride=1,
padding=2,
dilation=1,
bias=False,
bn_eps=1e-5,
activation=(lambda: nn.ReLU(inplace=True))):
"""
5x5 depthwise version of the standard convolution block.
Parameters:
----------
in_channels : int
Number of input channels.
out_channels : int
Number of output channels.
stride : int or tuple/list of 2 int, default 1
Strides of the convolution.
padding : int, or tuple/list of 2 int, or tuple/list of 4 int, default 2
Padding value for convolution layer.
dilation : int or tuple/list of 2 int, default 1
Dilation value for convolution layer.
bias : bool, default False
Whether the layer uses a bias vector.
bn_eps : float, default 1e-5
Small float added to variance in Batch norm.
activation : function or str or None, default nn.ReLU(inplace=True)
Activation function or name of activation function.
"""
return dwconv_block(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=5,
stride=stride,
padding=padding,
dilation=dilation,
bias=bias,
bn_eps=bn_eps,
activation=activation)
class PreConvBlock(nn.Module):
"""
Convolution block with Batch normalization and ReLU pre-activation.
Parameters:
----------
in_channels : int
Number of input channels.
out_channels : int
Number of output channels.
kernel_size : int or tuple/list of 2 int
Convolution window size.
stride : int or tuple/list of 2 int
Strides of the convolution.
padding : int or tuple/list of 2 int
Padding value for convolution layer.
dilation : int or tuple/list of 2 int, default 1
Dilation value for convolution layer.
bias : bool, default False
Whether the layer uses a bias vector.
use_bn : bool, default True
Whether to use BatchNorm layer.
return_preact : bool, default False
Whether return pre-activation. It's used by PreResNet.
activate : bool, default True
Whether activate the convolution block.
"""
def __init__(self,
in_channels,
out_channels,
kernel_size,
stride,
padding,
dilation=1,
bias=False,
use_bn=True,
return_preact=False,
activate=True):
super(PreConvBlock, self).__init__()
self.return_preact = return_preact
self.activate = activate
self.use_bn = use_bn
if self.use_bn:
self.bn = nn.BatchNorm2d(num_features=in_channels)
if self.activate:
self.activ = nn.ReLU(inplace=True)
self.conv = nn.Conv2d(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=kernel_size,
stride=stride,
padding=padding,
dilation=dilation,
bias=bias)
def forward(self, x):
if self.use_bn:
x = self.bn(x)
if self.activate:
x = self.activ(x)
if self.return_preact:
x_pre_activ = x
x = self.conv(x)
if self.return_preact:
return x, x_pre_activ
else:
return x
def pre_conv1x1_block(in_channels,
out_channels,
stride=1,
bias=False,
use_bn=True,
return_preact=False,
activate=True):
"""
1x1 version of the pre-activated convolution block.
Parameters:
----------
in_channels : int
Number of input channels.
out_channels : int
Number of output channels.
stride : int or tuple/list of 2 int, default 1
Strides of the convolution.
bias : bool, default False
Whether the layer uses a bias vector.
use_bn : bool, default True
Whether to use BatchNorm layer.
return_preact : bool, default False
Whether return pre-activation.
activate : bool, default True
Whether activate the convolution block.
"""
return PreConvBlock(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=1,
stride=stride,
padding=0,
bias=bias,
use_bn=use_bn,
return_preact=return_preact,
activate=activate)
def pre_conv3x3_block(in_channels,
out_channels,
stride=1,
padding=1,
dilation=1,
bias=False,
use_bn=True,
return_preact=False,
activate=True):
"""
3x3 version of the pre-activated convolution block.
Parameters:
----------
in_channels : int
Number of input channels.
out_channels : int
Number of output channels.
stride : int or tuple/list of 2 int, default 1
Strides of the convolution.
padding : int or tuple/list of 2 int, default 1
Padding value for convolution layer.
dilation : int or tuple/list of 2 int, default 1
Dilation value for convolution layer.
bias : bool, default False
Whether the layer uses a bias vector.
use_bn : bool, default True
Whether to use BatchNorm layer.
return_preact : bool, default False
Whether return pre-activation.
activate : bool, default True
Whether activate the convolution block.
"""
return PreConvBlock(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=3,
stride=stride,
padding=padding,
dilation=dilation,
bias=bias,
use_bn=use_bn,
return_preact=return_preact,
activate=activate)
class AsymConvBlock(nn.Module):
"""
Asymmetric separable convolution block.
Parameters:
----------
channels : int
Number of input/output channels.
kernel_size : int
Convolution window size.
padding : int
Padding value for convolution layer.
dilation : int, default 1
Dilation value for convolution layer.
groups : int, default 1
Number of groups.
bias : bool, default False
Whether the layer uses a bias vector.
lw_use_bn : bool, default True
Whether to use BatchNorm layer (leftwise convolution block).
rw_use_bn : bool, default True
Whether to use BatchNorm layer (rightwise convolution block).
bn_eps : float, default 1e-5
Small float added to variance in Batch norm.
lw_activation : function or str or None, default nn.ReLU(inplace=True)
Activation function after the leftwise convolution block.
rw_activation : function or str or None, default nn.ReLU(inplace=True)
Activation function after the rightwise convolution block.
"""
def __init__(self,
channels,
kernel_size,
padding,
dilation=1,
groups=1,
bias=False,
lw_use_bn=True,
rw_use_bn=True,
bn_eps=1e-5,
lw_activation=(lambda: nn.ReLU(inplace=True)),
rw_activation=(lambda: nn.ReLU(inplace=True))):
super(AsymConvBlock, self).__init__()
self.lw_conv = ConvBlock(
in_channels=channels,
out_channels=channels,
kernel_size=(kernel_size, 1),
stride=1,
padding=(padding, 0),
dilation=(dilation, 1),
groups=groups,
bias=bias,
use_bn=lw_use_bn,
bn_eps=bn_eps,
activation=lw_activation)
self.rw_conv = ConvBlock(
in_channels=channels,
out_channels=channels,
kernel_size=(1, kernel_size),
stride=1,
padding=(0, padding),
dilation=(1, dilation),
groups=groups,
bias=bias,
use_bn=rw_use_bn,
bn_eps=bn_eps,
activation=rw_activation)
def forward(self, x):
x = self.lw_conv(x)
x = self.rw_conv(x)
return x
def asym_conv3x3_block(padding=1,
**kwargs):
"""
3x3 asymmetric separable convolution block.
Parameters:
----------
channels : int
Number of input/output channels.
padding : int, default 1
Padding value for convolution layer.
dilation : int, default 1
Dilation value for convolution layer.
groups : int, default 1
Number of groups.
bias : bool, default False
Whether the layer uses a bias vector.
lw_use_bn : bool, default True
Whether to use BatchNorm layer (leftwise convolution block).
rw_use_bn : bool, default True
Whether to use BatchNorm layer (rightwise convolution block).
bn_eps : float, default 1e-5
Small float added to variance in Batch norm.
lw_activation : function or str or None, default nn.ReLU(inplace=True)
Activation function after the leftwise convolution block.
rw_activation : function or str or None, default nn.ReLU(inplace=True)
Activation function after the rightwise convolution block.
"""
return AsymConvBlock(
kernel_size=3,
padding=padding,
**kwargs)
class NormActivation(nn.Module):
"""
Activation block with preliminary batch normalization. It's used by itself as the final block in PreResNet.
Parameters:
----------
in_channels : int
Number of input channels.
bn_eps : float, default 1e-5
Small float added to variance in Batch norm.
activation : function or str or None, default nn.ReLU(inplace=True)
Activation function or name of activation function.
"""
def __init__(self,
in_channels,
bn_eps=1e-5,
activation=(lambda: nn.ReLU(inplace=True))):
super(NormActivation, self).__init__()
self.bn = nn.BatchNorm2d(
num_features=in_channels,
eps=bn_eps)
self.activ = get_activation_layer(activation)
def forward(self, x):
x = self.bn(x)
x = self.activ(x)
return x
class InterpolationBlock(nn.Module):
"""
Interpolation upsampling block.
Parameters:
----------
scale_factor : float
Multiplier for spatial size.
out_size : tuple of 2 int, default None
Spatial size of the output tensor for the bilinear interpolation operation.
mode : str, default 'bilinear'
Algorithm used for upsampling.
align_corners : bool, default True
Whether to align the corner pixels of the input and output tensors.
up : bool, default True
Whether to upsample or downsample.
"""
def __init__(self,
scale_factor,
out_size=None,
mode="bilinear",
align_corners=True,
up=True):
super(InterpolationBlock, self).__init__()
self.scale_factor = scale_factor
self.out_size = out_size
self.mode = mode
self.align_corners = align_corners
self.up = up
def forward(self, x, size=None):
if (self.mode == "bilinear") or (size is not None):
out_size = self.calc_out_size(x) if size is None else size
return F.interpolate(
input=x,
size=out_size,
mode=self.mode,
align_corners=self.align_corners)
else:
return F.interpolate(
input=x,
scale_factor=self.scale_factor,
mode=self.mode,
align_corners=self.align_corners)
def calc_out_size(self, x):
if self.out_size is not None:
return self.out_size
if self.up:
return tuple(s * self.scale_factor for s in x.shape[2:])
else:
return tuple(s // self.scale_factor for s in x.shape[2:])
def __repr__(self):
s = '{name}(scale_factor={scale_factor}, out_size={out_size}, mode={mode}, align_corners={align_corners}, up={up})' # noqa
return s.format(
name=self.__class__.__name__,
scale_factor=self.scale_factor,
out_size=self.out_size,
mode=self.mode,
align_corners=self.align_corners,
up=self.up)
def dwconv3x3_block(in_channels,
out_channels,
stride=1,
padding=1,
dilation=1,
bias=False,
bn_eps=1e-5,
activation=(lambda: nn.ReLU(inplace=True))):
"""
3x3 depthwise version of the standard convolution block.
Parameters:
----------
in_channels : int
Number of input channels.
out_channels : int
Number of output channels.
stride : int or tuple/list of 2 int, default 1
Strides of the convolution.
padding : int, or tuple/list of 2 int, or tuple/list of 4 int, default 1
Padding value for convolution layer.
dilation : int or tuple/list of 2 int, default 1
Dilation value for convolution layer.
bias : bool, default False
Whether the layer uses a bias vector.
bn_eps : float, default 1e-5
Small float added to variance in Batch norm.
activation : function or str or None, default nn.ReLU(inplace=True)
Activation function or name of activation function.
"""
return dwconv_block(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=3,
stride=stride,
padding=padding,
dilation=dilation,
bias=bias,
bn_eps=bn_eps,
activation=activation)
def dwsconv3x3_block(in_channels,
out_channels,
stride=1,
padding=1,
dilation=1,
bias=False,
bn_eps=1e-5,
dw_activation=(lambda: nn.ReLU(inplace=True)),
pw_activation=(lambda: nn.ReLU(inplace=True)),
**kwargs):
"""
3x3 depthwise separable version of the standard convolution block.
Parameters:
----------
in_channels : int
Number of input channels.
out_channels : int
Number of output channels.
stride : int or tuple/list of 2 int, default 1
Strides of the convolution.
padding : int, or tuple/list of 2 int, or tuple/list of 4 int, default 1
Padding value for convolution layer.
dilation : int or tuple/list of 2 int, default 1
Dilation value for convolution layer.
bias : bool, default False
Whether the layer uses a bias vector.
bn_eps : float, default 1e-5
Small float added to variance in Batch norm.
dw_activation : function or str or None, default nn.ReLU(inplace=True)
Activation function after the depthwise convolution block.
pw_activation : function or str or None, default nn.ReLU(inplace=True)
Activation function after the pointwise convolution block.
"""
return DwsConvBlock(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=3,
stride=stride,
padding=padding,
dilation=dilation,
bias=bias,
bn_eps=bn_eps,
dw_activation=dw_activation,
pw_activation=pw_activation,
**kwargs)
class DeconvBlock(nn.Module):
"""
Deconvolution block with batch normalization and activation.
Parameters:
----------
in_channels : int
Number of input channels.
out_channels : int
Number of output channels.
kernel_size : int or tuple/list of 2 int
Convolution window size.
stride : int or tuple/list of 2 int
Strides of the deconvolution.
padding : int or tuple/list of 2 int
Padding value for deconvolution layer.
ext_padding : tuple/list of 4 int, default None
Extra padding value for deconvolution layer.
out_padding : int or tuple/list of 2 int
Output padding value for deconvolution layer.
dilation : int or tuple/list of 2 int, default 1
Dilation value for deconvolution layer.
groups : int, default 1
Number of groups.
bias : bool, default False
Whether the layer uses a bias vector.
use_bn : bool, default True
Whether to use BatchNorm layer.
bn_eps : float, default 1e-5
Small float added to variance in Batch norm.
activation : function or str or None, default nn.ReLU(inplace=True)
Activation function or name of activation function.
"""
def __init__(self,
in_channels,
out_channels,
kernel_size,
stride,
padding,
ext_padding=None,
out_padding=0,
dilation=1,
groups=1,
bias=False,
use_bn=True,
bn_eps=1e-5,
activation=(lambda: nn.ReLU(inplace=True))):
super(DeconvBlock, self).__init__()
self.activate = (activation is not None)
self.use_bn = use_bn
self.use_pad = (ext_padding is not None)
if self.use_pad:
self.pad = nn.ZeroPad2d(padding=ext_padding)
self.conv = nn.ConvTranspose2d(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=kernel_size,
stride=stride,
padding=padding,
output_padding=out_padding,
dilation=dilation,
groups=groups,
bias=bias)
if self.use_bn:
self.bn = nn.BatchNorm2d(
num_features=out_channels,
eps=bn_eps)
if self.activate:
self.activ = get_activation_layer(activation)
def forward(self, x):
if self.use_pad:
x = self.pad(x)
x = self.conv(x)
if self.use_bn:
x = self.bn(x)
if self.activate:
x = self.activ(x)
return x
def deconv3x3_block(padding=1,
out_padding=1,
**kwargs):
"""
3x3 version of the deconvolution block with batch normalization and activation.
Parameters:
----------
in_channels : int
Number of input channels.
out_channels : int
Number of output channels.
stride : int or tuple/list of 2 int
Strides of the deconvolution.
padding : int or tuple/list of 2 int, default 1
Padding value for deconvolution layer.
ext_padding : tuple/list of 4 int, default None
Extra padding value for deconvolution layer.
out_padding : int or tuple/list of 2 int, default 1
Output padding value for deconvolution layer.
dilation : int or tuple/list of 2 int, default 1
Dilation value for deconvolution layer.
groups : int, default 1
Number of groups.
bias : bool, default False
Whether the layer uses a bias vector.
use_bn : bool, default True
Whether to use BatchNorm layer.
bn_eps : float, default 1e-5
Small float added to variance in Batch norm.
activation : function or str or None, default nn.ReLU(inplace=True)
Activation function or name of activation function.
"""
return DeconvBlock(
kernel_size=3,
padding=padding,
out_padding=out_padding,
**kwargs)
def channel_shuffle(x,
groups):
"""
Channel shuffle operation from 'ShuffleNet: An Extremely Efficient Convolutional Neural Network for Mobile Devices,'
https://arxiv.org/abs/1707.01083.
Parameters:
----------
x : Tensor
Input tensor.
groups : int
Number of groups.
Returns:
-------
Tensor
Resulted tensor.
"""
batch, channels, height, width = x.size()
# assert (channels % groups == 0)
channels_per_group = channels // groups
x = x.view(batch, groups, channels_per_group, height, width)
x = torch.transpose(x, 1, 2).contiguous()
x = x.view(batch, channels, height, width)
return x
class SEBlock(nn.Module):
"""
Squeeze-and-Excitation block from 'Squeeze-and-Excitation Networks,' https://arxiv.org/abs/1709.01507.
Parameters:
----------
channels : int
Number of channels.
reduction : int, default 16
Squeeze reduction value.
mid_channels : int or None, default None
Number of middle channels.
round_mid : bool, default False
Whether to round middle channel number (make divisible by 8).
use_conv : bool, default True
Whether to convolutional layers instead of fully-connected ones.
activation : function, or str, or nn.Module, default 'relu'
Activation function after the first convolution.
out_activation : function, or str, or nn.Module, default 'sigmoid'
Activation function after the last convolution.
"""
def __init__(self,
channels,
reduction=16,
mid_channels=None,
round_mid=False,
use_conv=True,
mid_activation=(lambda: nn.ReLU(inplace=True)),
out_activation=(lambda: nn.Sigmoid())):
super(SEBlock, self).__init__()
self.use_conv = use_conv
if mid_channels is None:
mid_channels = channels // reduction if not round_mid else round_channels(float(channels) / reduction)
self.pool = nn.AdaptiveAvgPool2d(output_size=1)
if use_conv:
self.conv1 = conv1x1(
in_channels=channels,
out_channels=mid_channels,
bias=True)
else:
self.fc1 = nn.Linear(
in_features=channels,
out_features=mid_channels)
self.activ = get_activation_layer(mid_activation, mid_channels)
if use_conv:
self.conv2 = conv1x1(
in_channels=mid_channels,
out_channels=channels,
bias=True)
else:
self.fc2 = nn.Linear(
in_features=mid_channels,
out_features=channels)
self.sigmoid = get_activation_layer(out_activation, channels)
def forward(self, x):
w = self.pool(x)
if not self.use_conv:
w = w.view(x.size(0), -1)
w = self.conv1(w) if self.use_conv else self.fc1(w)
w = self.activ(w)
w = self.conv2(w) if self.use_conv else self.fc2(w)
w = self.sigmoid(w)
if not self.use_conv:
w = w.unsqueeze(2).unsqueeze(3)
x = x * w
return x
def calc_flops(self, x):
assert (x.shape[0] == 1)
if self.mode == "bilinear":
num_flops = 9 * x.numel()
else:
num_flops = 4 * x.numel()
num_macs = 0
return num_flops, num_macs
class ChannelShuffle(nn.Module):
"""
Channel shuffle layer. This is a wrapper over the same operation. It is designed to save the number of groups.
Parameters:
----------
channels : int
Number of channels.
groups : int
Number of groups.
"""
def __init__(self,
channels,
groups):
super(ChannelShuffle, self).__init__()
# assert (channels % groups == 0)
if channels % groups != 0:
raise ValueError('channels must be divisible by groups')
self.groups = groups
def forward(self, x):
return channel_shuffle(x, self.groups)
def __repr__(self):
s = "{name}(groups={groups})"
return s.format(
name=self.__class__.__name__,
groups=self.groups)
class Identity(nn.Module):
"""
Identity block.
"""
def __init__(self):
super(Identity, self).__init__()
def forward(self, x):
return x
def __repr__(self):
return '{name}()'.format(name=self.__class__.__name__)
class HSigmoid(nn.Module):
"""
Approximated sigmoid function, so-called hard-version of sigmoid from 'Searching for MobileNetV3,'
https://arxiv.org/abs/1905.02244.
"""
def forward(self, x):
return F.relu6(x + 3.0, inplace=True) / 6.0
class Swish(nn.Module):
"""
Swish activation function from 'Searching for Activation Functions,' https://arxiv.org/abs/1710.05941.
"""
def forward(self, x):
return x * torch.sigmoid(x)
class HSwish(nn.Module):
"""
H-Swish activation function from 'Searching for MobileNetV3,' https://arxiv.org/abs/1905.02244.
Parameters:
----------
inplace : bool
Whether to use inplace version of the module.
"""
def __init__(self, inplace=False):
super(HSwish, self).__init__()
self.inplace = inplace
def forward(self, x):
return x * F.relu6(x + 3.0, inplace=self.inplace) / 6.0
def get_activation_layer(activation, param):
"""
Create activation layer from string/function.
Parameters:
----------
activation : function, or str, or nn.Module
Activation function or name of activation function.
Returns:
-------
nn.Module
Activation layer.
"""
assert (activation is not None)
if isfunction(activation):
return activation()
elif isinstance(activation, str):
if activation == "relu":
return nn.ReLU(inplace=True)
elif activation == "prelu":
return nn.PReLU(param)
elif activation == "relu6":
return nn.ReLU6(inplace=True)
elif activation == "swish":
return Swish()
elif activation == "hswish":
return HSwish(inplace=True)
elif activation == "sigmoid":
return nn.Sigmoid()
elif activation == "hsigmoid":
return HSigmoid()
elif activation == "identity":
return Identity()
else:
raise NotImplementedError()
else:
assert (isinstance(activation, nn.Module))
return activation
def round_channels(channels, divisor=8):
"""
Round weighted channel number (make divisible operation).
Parameters:
----------
channels : int or float
Original number of channels.
divisor : int, default 8
Alignment value.
Returns:
-------
int
Weighted number of channels.
"""
rounded_channels = max(int(channels + divisor / 2.0) // divisor * divisor, divisor)
if float(rounded_channels) < 0.9 * channels:
rounded_channels += divisor
return rounded_channels
def conv1x1(in_channels,
out_channels,
stride=1,
groups=1, dilation=1,
bias=False):
"""
Convolution 1x1 layer.
Parameters:
----------
in_channels : int
Number of input channels.
out_channels : int
Number of output channels.
stride : int or tuple/list of 2 int, default 1
Strides of the convolution.
groups : int, default 1
Number of groups.
bias : bool, default False
Whether the layer uses a bias vector.
"""
return nn.Conv2d(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=1,
stride=stride,
groups=groups, dilation=dilation,
bias=bias)
def conv3x3(in_channels,
out_channels,
stride=1,
padding=1,
dilation=1,
groups=1,
bias=False):
"""
Convolution 3x3 layer.
Parameters:
----------
in_channels : int
Number of input channels.
out_channels : int
Number of output channels.
stride : int or tuple/list of 2 int, default 1
Strides of the convolution.
padding : int or tuple/list of 2 int, default 1
Padding value for convolution layer.
dilation : int or tuple/list of 2 int, default 1
Dilation value for convolution layer.
groups : int, default 1
Number of groups.
bias : bool, default False
Whether the layer uses a bias vector.
"""
return nn.Conv2d(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=3,
stride=stride,
padding=padding,
dilation=dilation,
groups=groups,
bias=bias)
class Flatten(nn.Module):
"""
Simple flatten module.
"""
def forward(self, x):
return x.view(x.size(0), -1)
def depthwise_conv3x3(channels,
stride=1,
padding=1,
dilation=1,
bias=False):
"""
Depthwise convolution 3x3 layer.
Parameters:
----------
channels : int
Number of input/output channels.
strides : int or tuple/list of 2 int, default 1
Strides of the convolution.
padding : int or tuple/list of 2 int, default 1
Padding value for convolution layer.
dilation : int or tuple/list of 2 int, default 1
Dilation value for convolution layer.
bias : bool, default False
Whether the layer uses a bias vector.
"""
return nn.Conv2d(
in_channels=channels,
out_channels=channels,
kernel_size=3,
stride=stride,
padding=padding,
dilation=dilation,
groups=channels,
bias=bias)
class ConvBlock(nn.Module):
"""
Standard convolution block with Batch normalization and activation.
Parameters:
----------
in_channels : int
Number of input channels.
out_channels : int
Number of output channels.
kernel_size : int or tuple/list of 2 int
Convolution window size.
stride : int or tuple/list of 2 int
Strides of the convolution.
padding : int, or tuple/list of 2 int, or tuple/list of 4 int
Padding value for convolution layer.
dilation : int or tuple/list of 2 int, default 1
Dilation value for convolution layer.
groups : int, default 1
Number of groups.
bias : bool, default False
Whether the layer uses a bias vector.
use_bn : bool, default True
Whether to use BatchNorm layer.
bn_eps : float, default 1e-5
Small float added to variance in Batch norm.
activation : function or str or None, default nn.ReLU(inplace=True)
Activation function or name of activation function.
"""
def __init__(self,
in_channels,
out_channels,
kernel_size,
stride,
padding,
dilation=1,
groups=1,
bias=False,
use_bn=True,
bn_eps=1e-5,
activation=(lambda: nn.ReLU(inplace=True))):
super(ConvBlock, self).__init__()
self.activate = (activation is not None)
self.use_bn = use_bn
self.use_pad = (isinstance(padding, (list, tuple)) and (len(padding) == 4))
if self.use_pad:
self.pad = nn.ZeroPad2d(padding=padding)
padding = 0
self.conv = nn.Conv2d(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=kernel_size,
stride=stride,
padding=padding,
dilation=dilation,
groups=groups,
bias=bias)
if self.use_bn:
self.bn = nn.BatchNorm2d(
num_features=out_channels,
eps=bn_eps)
if self.activate:
self.activ = get_activation_layer(activation, out_channels)
def forward(self, x):
if self.use_pad:
x = self.pad(x)
x = self.conv(x)
if self.use_bn:
x = self.bn(x)
if self.activate:
x = self.activ(x)
return x
def conv1x1_block(in_channels,
out_channels,
stride=1,
padding=0,
groups=1,
bias=False,
use_bn=True,
bn_eps=1e-5,
activation=(lambda: nn.ReLU(inplace=True))):
"""
1x1 version of the standard convolution block.
Parameters:
----------
in_channels : int
Number of input channels.
out_channels : int
Number of output channels.
stride : int or tuple/list of 2 int, default 1
Strides of the convolution.
padding : int, or tuple/list of 2 int, or tuple/list of 4 int, default 0
Padding value for convolution layer.
groups : int, default 1
Number of groups.
bias : bool, default False
Whether the layer uses a bias vector.
use_bn : bool, default True
Whether to use BatchNorm layer.
bn_eps : float, default 1e-5
Small float added to variance in Batch norm.
activation : function or str or None, default nn.ReLU(inplace=True)
Activation function or name of activation function.
"""
return ConvBlock(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=1,
stride=stride,
padding=padding,
groups=groups,
bias=bias,
use_bn=use_bn,
bn_eps=bn_eps,
activation=activation)
def conv3x3_block(in_channels,
out_channels,
stride=1,
padding=1,
dilation=1,
groups=1,
bias=False,
use_bn=True,
bn_eps=1e-5,
activation=(lambda: nn.ReLU(inplace=True))):
"""
3x3 version of the standard convolution block.
Parameters:
----------
in_channels : int
Number of input channels.
out_channels : int
Number of output channels.
stride : int or tuple/list of 2 int, default 1
Strides of the convolution.
padding : int, or tuple/list of 2 int, or tuple/list of 4 int, default 1
Padding value for convolution layer.
dilation : int or tuple/list of 2 int, default 1
Dilation value for convolution layer.
groups : int, default 1
Number of groups.
bias : bool, default False
Whether the layer uses a bias vector.
use_bn : bool, default True
Whether to use BatchNorm layer.
bn_eps : float, default 1e-5
Small float added to variance in Batch norm.
activation : function or str or None, default nn.ReLU(inplace=True)
Activation function or name of activation function.
"""
return ConvBlock(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=3,
stride=stride,
padding=padding,
dilation=dilation,
groups=groups,
bias=bias,
use_bn=use_bn,
bn_eps=bn_eps,
activation=activation)
class DwsConvBlock(nn.Module):
"""
Depthwise separable convolution block with BatchNorms and activations at each convolution layers.
Parameters:
----------
in_channels : int
Number of input channels.
out_channels : int
Number of output channels.
kernel_size : int or tuple/list of 2 int
Convolution window size.
stride : int or tuple/list of 2 int
Strides of the convolution.
padding : int, or tuple/list of 2 int, or tuple/list of 4 int
Padding value for convolution layer.
dilation : int or tuple/list of 2 int, default 1
Dilation value for convolution layer.
bias : bool, default False
Whether the layer uses a bias vector.
dw_use_bn : bool, default True
Whether to use BatchNorm layer (depthwise convolution block).
pw_use_bn : bool, default True
Whether to use BatchNorm layer (pointwise convolution block).
bn_eps : float, default 1e-5
Small float added to variance in Batch norm.
dw_activation : function or str or None, default nn.ReLU(inplace=True)
Activation function after the depthwise convolution block.
pw_activation : function or str or None, default nn.ReLU(inplace=True)
Activation function after the pointwise convolution block.
"""
def __init__(self,
in_channels,
out_channels,
kernel_size,
stride,
padding,
dilation=1,
bias=False,
dw_use_bn=True,
pw_use_bn=True,
bn_eps=1e-5,
dw_activation=(lambda: nn.ReLU(inplace=True)),
pw_activation=(lambda: nn.ReLU(inplace=True))):
super(DwsConvBlock, self).__init__()
self.dw_conv = dwconv_block(
in_channels=in_channels,
out_channels=in_channels,
kernel_size=kernel_size,
stride=stride,
padding=padding,
dilation=dilation,
bias=bias,
use_bn=dw_use_bn,
bn_eps=bn_eps,
activation=dw_activation)
self.pw_conv = conv1x1_block(
in_channels=in_channels,
out_channels=out_channels,
bias=bias,
use_bn=pw_use_bn,
bn_eps=bn_eps,
activation=pw_activation)
def forward(self, x):
x = self.dw_conv(x)
x = self.pw_conv(x)
return x
def dwconv_block(in_channels,
out_channels,
kernel_size,
stride=1,
padding=1,
dilation=1,
bias=False,
use_bn=True,
bn_eps=1e-5,
activation=(lambda: nn.ReLU(inplace=True))):
"""
Depthwise convolution block.
"""
return ConvBlock(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=kernel_size,
stride=stride,
padding=padding,
dilation=dilation,
groups=out_channels,
bias=bias,
use_bn=use_bn,
bn_eps=bn_eps,
activation=activation)
def channel_shuffle2(x,
groups):
"""
Channel shuffle operation from 'ShuffleNet: An Extremely Efficient Convolutional Neural Network for Mobile Devices,'
https://arxiv.org/abs/1707.01083. The alternative version.
Parameters:
----------
x : Tensor
Input tensor.
groups : int
Number of groups.
Returns:
-------
Tensor
Resulted tensor.
"""
batch, channels, height, width = x.size()
assert (channels % groups == 0)
channels_per_group = channels // groups
x = x.view(batch, channels_per_group, groups, height, width)
x = torch.transpose(x, 1, 2).contiguous()
x = x.view(batch, channels, height, width)
return x
def _calc_width(net):
import numpy as np
net_params = filter(lambda p: p.requires_grad, net.parameters())
weight_count = 0
for param in net_params:
weight_count += np.prod(param.size())
return weight_count
def flops_to_string(flops, units='GFLOPS', precision=4):
if units == 'GFLOPS':
return str(round(flops / 10. ** 9, precision)) + ' ' + units
elif units == 'MFLOPS':
return str(round(flops / 10. ** 6, precision)) + ' ' + units
elif units == 'KFLOPS':
return str(round(flops / 10. ** 3, precision)) + ' ' + units
else:
return str(flops) + ' FLOPS'
def count_model_flops(model, input_res=[112, 112], multiply_adds=True):
list_conv = []
def conv_hook(self, input, output):
batch_size, input_channels, input_height, input_width = input[0].size()
output_channels, output_height, output_width = output[0].size()
kernel_ops = self.kernel_size[0] * self.kernel_size[1] * (self.in_channels / self.groups)
bias_ops = 1 if self.bias is not None else 0
params = output_channels * (kernel_ops + bias_ops)
flops = (kernel_ops * (
2 if multiply_adds else 1) + bias_ops) * output_channels * output_height * output_width * batch_size
list_conv.append(flops)
list_linear = []
def linear_hook(self, input, output):
batch_size = input[0].size(0) if input[0].dim() == 2 else 1
weight_ops = self.weight.nelement() * (2 if multiply_adds else 1)
if self.bias is not None:
bias_ops = self.bias.nelement() if self.bias.nelement() else 0
flops = batch_size * (weight_ops + bias_ops)
else:
flops = batch_size * weight_ops
list_linear.append(flops)
list_bn = []
def bn_hook(self, input, output):
list_bn.append(input[0].nelement() * 2)
list_relu = []
def relu_hook(self, input, output):
list_relu.append(input[0].nelement())
list_pooling = []
def pooling_hook(self, input, output):
batch_size, input_channels, input_height, input_width = input[0].size()
output_channels, output_height, output_width = output[0].size()
kernel_ops = self.kernel_size * self.kernel_size
bias_ops = 0
params = 0
flops = (kernel_ops + bias_ops) * output_channels * output_height * output_width * batch_size
list_pooling.append(flops)
def pooling_hook_ad(self, input, output):
batch_size, input_channels, input_height, input_width = input[0].size()
input = input[0]
flops = int(np.prod(input.shape))
list_pooling.append(flops)
handles = []
def foo(net):
childrens = list(net.children())
if not childrens:
if isinstance(net, torch.nn.Conv2d) or isinstance(net, torch.nn.ConvTranspose2d):
handles.append(net.register_forward_hook(conv_hook))
elif isinstance(net, torch.nn.Linear):
handles.append(net.register_forward_hook(linear_hook))
elif isinstance(net, torch.nn.BatchNorm2d) or isinstance(net, torch.nn.BatchNorm1d):
handles.append(net.register_forward_hook(bn_hook))
elif isinstance(net, torch.nn.ReLU) or isinstance(net, torch.nn.PReLU) or isinstance(net,
torch.nn.Sigmoid) or isinstance(
net, HSwish) or isinstance(net, Swish):
handles.append(net.register_forward_hook(relu_hook))
elif isinstance(net, torch.nn.MaxPool2d) or isinstance(net, torch.nn.AvgPool2d):
handles.append(net.register_forward_hook(pooling_hook))
elif isinstance(net, torch.nn.AdaptiveAvgPool2d):
handles.append(net.register_forward_hook(pooling_hook_ad))
else:
print("warning" + str(net))
return
for c in childrens:
foo(c)
model.eval()
foo(model)
input = Variable(torch.rand(3, input_res[1], input_res[0]).unsqueeze(0), requires_grad=True)
out = model(input)
total_flops = (sum(list_conv) + sum(list_linear) + sum(list_bn) + sum(list_relu) + sum(list_pooling))
for h in handles:
h.remove()
model.train()
return flops_to_string(total_flops)
class MixConv(nn.Module):
"""
Mixed convolution layer from 'MixConv: Mixed Depthwise Convolutional Kernels,' https://arxiv.org/abs/1907.09595.
Parameters:
----------
in_channels : int
Number of input channels.
out_channels : int
Number of output channels.
kernel_size : int or tuple/list of int, or tuple/list of tuple/list of 2 int
Convolution window size.
stride : int or tuple/list of 2 int
Strides of the convolution.
padding : int or tuple/list of int, or tuple/list of tuple/list of 2 int
Padding value for convolution layer.
dilation : int or tuple/list of 2 int, default 1
Dilation value for convolution layer.
groups : int, default 1
Number of groups.
bias : bool, default False
Whether the layer uses a bias vector.
axis : int, default 1
The axis on which to concatenate the outputs.
"""
def __init__(self,
in_channels,
out_channels,
kernel_size,
stride,
padding,
dilation=1,
groups=1,
bias=False,
axis=1):
super(MixConv, self).__init__()
kernel_size = kernel_size if isinstance(kernel_size, list) else [kernel_size]
padding = padding if isinstance(padding, list) else [padding]
kernel_count = len(kernel_size)
self.splitted_in_channels = self.split_channels(in_channels, kernel_count)
splitted_out_channels = self.split_channels(out_channels, kernel_count)
for i, kernel_size_i in enumerate(kernel_size):
in_channels_i = self.splitted_in_channels[i]
out_channels_i = splitted_out_channels[i]
padding_i = padding[i]
self.add_module(
name=str(i),
module=nn.Conv2d(
in_channels=in_channels_i,
out_channels=out_channels_i,
kernel_size=kernel_size_i,
stride=stride,
padding=padding_i,
dilation=dilation,
groups=(out_channels_i if out_channels == groups else groups),
bias=bias))
self.axis = axis
def forward(self, x):
xx = torch.split(x, self.splitted_in_channels, dim=self.axis)
out = [conv_i(x_i) for x_i, conv_i in zip(xx, self._modules.values())]
x = torch.cat(tuple(out), dim=self.axis)
return x
@staticmethod
def split_channels(channels, kernel_count):
splitted_channels = [channels // kernel_count] * kernel_count
splitted_channels[0] += channels - sum(splitted_channels)
return splitted_channels
class MixConvBlock(nn.Module):
"""
Mixed convolution block with Batch normalization and activation.
Parameters:
----------
in_channels : int
Number of input channels.
out_channels : int
Number of output channels.
kernel_size : int or tuple/list of int, or tuple/list of tuple/list of 2 int
Convolution window size.
stride : int or tuple/list of 2 int
Strides of the convolution.
padding : int or tuple/list of int, or tuple/list of tuple/list of 2 int
Padding value for convolution layer.
dilation : int or tuple/list of 2 int, default 1
Dilation value for convolution layer.
groups : int, default 1
Number of groups.
bias : bool, default False
Whether the layer uses a bias vector.
use_bn : bool, default True
Whether to use BatchNorm layer.
bn_eps : float, default 1e-5
Small float added to variance in Batch norm.
activation : function or str or None, default nn.ReLU(inplace=True)
Activation function or name of activation function.
activate : bool, default True
Whether activate the convolution block.
"""
def __init__(self,
in_channels,
out_channels,
kernel_size,
stride,
padding,
dilation=1,
groups=1,
bias=False,
use_bn=True,
bn_eps=1e-5,
activation=(lambda: nn.ReLU(inplace=True))):
super(MixConvBlock, self).__init__()
self.activate = (activation is not None)
self.use_bn = use_bn
self.conv = MixConv(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=kernel_size,
stride=stride,
padding=padding,
dilation=dilation,
groups=groups,
bias=bias)
if self.use_bn:
self.bn = nn.BatchNorm2d(
num_features=out_channels,
eps=bn_eps)
if self.activate:
self.activ = get_activation_layer(activation, out_channels)
def forward(self, x):
x = self.conv(x)
if self.use_bn:
x = self.bn(x)
if self.activate:
x = self.activ(x)
return x
def mixconv1x1_block(in_channels,
out_channels,
kernel_count,
stride=1,
groups=1,
bias=False,
use_bn=True,
bn_eps=1e-5,
activation=(lambda: nn.ReLU(inplace=True))):
"""
1x1 version of the mixed convolution block.
Parameters:
----------
in_channels : int
Number of input channels.
out_channels : int
Number of output channels.
kernel_count : int
Kernel count.
stride : int or tuple/list of 2 int, default 1
Strides of the convolution.
groups : int, default 1
Number of groups.
bias : bool, default False
Whether the layer uses a bias vector.
use_bn : bool, default True
Whether to use BatchNorm layer.
bn_eps : float, default 1e-5
Small float added to variance in Batch norm.
activation : function or str, or None, default nn.ReLU(inplace=True)
Activation function or name of activation function.
"""
return MixConvBlock(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=([1] * kernel_count),
stride=stride,
padding=([0] * kernel_count),
groups=groups,
bias=bias,
use_bn=use_bn,
bn_eps=bn_eps,
activation=activation)
class MixUnit(nn.Module):
"""
MixNet unit.
Parameters:
----------
in_channels : int
Number of input channels.
out_channels : int
Number of output channels.
exp_channels : int
Number of middle (expanded) channels.
stride : int or tuple/list of 2 int
Strides of the second convolution layer.
exp_kernel_count : int
Expansion convolution kernel count for each unit.
conv1_kernel_count : int
Conv1 kernel count for each unit.
conv2_kernel_count : int
Conv2 kernel count for each unit.
exp_factor : int
Expansion factor for each unit.
se_factor : int
SE reduction factor for each unit.
activation : str
Activation function or name of activation function.
"""
def __init__(self,
in_channels,
out_channels,
stride,
exp_kernel_count,
conv1_kernel_count,
conv2_kernel_count,
exp_factor,
se_factor,
activation, shuffle=True):
super(MixUnit, self).__init__()
assert (exp_factor >= 1)
assert (se_factor >= 0)
self.shuffle = shuffle
self.residual = (in_channels == out_channels) and (stride == 1)
self.use_se = se_factor > 0
mid_channels = exp_factor * in_channels
self.use_exp_conv = exp_factor > 1
self.conv1_kernel_count = conv1_kernel_count
if self.use_exp_conv:
if exp_kernel_count == 1:
self.exp_conv = conv1x1_block(
in_channels=in_channels,
out_channels=mid_channels,
activation=activation)
else:
self.exp_conv = mixconv1x1_block(
in_channels=in_channels,
out_channels=mid_channels,
kernel_count=exp_kernel_count,
activation=activation)
if conv1_kernel_count == 1:
self.conv1 = dwconv3x3_block(
in_channels=mid_channels,
out_channels=mid_channels,
stride=stride,
activation=activation)
else:
self.conv1 = MixConvBlock(
in_channels=mid_channels,
out_channels=mid_channels,
kernel_size=[3 + 2 * i for i in range(conv1_kernel_count)],
stride=stride,
padding=[1 + i for i in range(conv1_kernel_count)],
groups=mid_channels,
activation=activation)
if self.use_se:
self.se = SEBlock(
channels=mid_channels,
reduction=(exp_factor * se_factor),
round_mid=False,
mid_activation=activation)
if conv2_kernel_count == 1:
self.conv2 = conv1x1_block(
in_channels=mid_channels,
out_channels=out_channels,
activation=None)
else:
self.conv2 = mixconv1x1_block(
in_channels=mid_channels,
out_channels=out_channels,
kernel_count=conv2_kernel_count,
activation=None)
def forward(self, x):
if self.residual:
identity = x
if self.use_exp_conv:
x = self.exp_conv(x)
x = self.conv1(x)
if self.use_se:
x = self.se(x)
x = self.conv2(x)
if self.residual:
x = x + identity
if self.shuffle:
x = channel_shuffle2(x, 2)
return x
class MixInitBlock(nn.Module):
"""
MixNet specific initial block.
Parameters:
----------
in_channels : int
Number of input channels.
out_channels : int
Number of output channels.
"""
def __init__(self,
in_channels,
out_channels, activation, stride=1, shuffle=True):
super(MixInitBlock, self).__init__()
self.conv1 = conv3x3_block(
in_channels=in_channels,
out_channels=out_channels,
stride=stride, activation=activation)
self.conv2 = MixUnit(
in_channels=out_channels,
out_channels=out_channels,
stride=1,
exp_kernel_count=1,
conv1_kernel_count=1,
conv2_kernel_count=1,
exp_factor=1,
se_factor=0,
activation=activation, shuffle=shuffle)
def forward(self, x):
x = self.conv1(x)
x = self.conv2(x)
return x
class MixNet(nn.Module):
"""
MixNet model from 'MixConv: Mixed Depthwise Convolutional Kernels,' https://arxiv.org/abs/1907.09595.
Parameters:
----------
channels : list of int
Number of output channels for each unit.
init_block_channels : int
Number of output channels for the initial unit.
final_block_channels : int
Number of output channels for the final block of the feature extractor.
exp_kernel_counts : list of int
Expansion convolution kernel count for each unit.
conv1_kernel_counts : list of int
Conv1 kernel count for each unit.
conv2_kernel_counts : list of int
Conv2 kernel count for each unit.
exp_factors : list of int
Expansion factor for each unit.
se_factors : list of int
SE reduction factor for each unit.
in_channels : int, default 3
Number of input channels.
in_size : tuple of two ints, default (224, 224)
Spatial size of the expected input image.
num_classes : int, default 1000
Number of classification classes.
"""
def __init__(self,
channels,
init_block_channels,
final_block_channels,
exp_kernel_counts,
conv1_kernel_counts,
conv2_kernel_counts,
exp_factors,
se_factors,
in_channels=3,
in_size=(112, 112),
num_classes=1000, gdw_size=512, shuffle=True):
super(MixNet, self).__init__()
self.in_size = in_size
self.num_classes = num_classes
self.shuffle = shuffle
self.features = nn.Sequential()
self.features.add_module("init_block", MixInitBlock(
in_channels=in_channels,
out_channels=init_block_channels, activation="prelu", stride=2))
in_channels = init_block_channels
for i, channels_per_stage in enumerate(channels):
stage = nn.Sequential()
for j, out_channels in enumerate(channels_per_stage):
stride = 2 if ((j == 0) and (i != 3) and (i != 0)) or (
(j == len(channels_per_stage) // 2) and (i == 3)) else 1
exp_kernel_count = exp_kernel_counts[i][j]
conv1_kernel_count = conv1_kernel_counts[i][j]
conv2_kernel_count = conv2_kernel_counts[i][j]
exp_factor = exp_factors[i][j]
se_factor = se_factors[i][j]
activation = "prelu" if i == 0 else "swish"
stage.add_module("unit{}".format(j + 1), MixUnit(
in_channels=in_channels,
out_channels=out_channels,
stride=stride,
exp_kernel_count=exp_kernel_count,
conv1_kernel_count=conv1_kernel_count,
conv2_kernel_count=conv2_kernel_count,
exp_factor=exp_factor,
se_factor=se_factor,
activation=activation, shuffle=self.shuffle))
in_channels = out_channels
self.features.add_module("stage{}".format(i + 1), stage)
self.tail = conv1x1_block(in_channels=in_channels, out_channels=gdw_size, activation="prelu")
self.feautre_layer = DwsConvBlock(in_channels=gdw_size, out_channels=final_block_channels, kernel_size=7,
padding=0, stride=1, pw_activation=None, dw_activation=None, pw_use_bn=False)
self.features_norm = nn.BatchNorm1d(final_block_channels, eps=1e-05)
nn.init.constant_(self.features_norm.weight, 1.0)
self.features_norm.weight.requires_grad = False
self._init_params()
def _init_params(self):
for name, module in self.named_modules():
if isinstance(module, nn.Conv2d):
init.kaiming_uniform_(module.weight)
if module.bias is not None:
init.constant_(module.bias, 0)
elif isinstance(module, (nn.BatchNorm2d, nn.GroupNorm)):
nn.init.constant_(module.weight, 1)
nn.init.constant_(module.bias, 0)
def forward(self, x):
x = self.features(x)
x = self.tail(x)
x = self.feautre_layer(x)
x = x.view(x.size(0), -1)
x = self.features_norm(x)
return x
def get_mixnet(version, width_scale, embedding_size=512, model_name="mixnet_s", gdw_size=512, weight=None, shuffle=True,
**kwargs):
"""
Create MixNet model with specific parameters.
Parameters:
----------
version : str
Version of MobileNetV3 ('s' or 'm').
width_scale : float
Scale factor for width of layers.
model_name : str or None, default None
Model name for loading pretrained model.
pretrained : bool, default False
Whether to load the pretrained weights for model.
root : str, default '~/.torch/models'
Location for keeping the model parameters.
"""
if version == "s":
init_block_channels = 16
channels = [[24, 24], [40, 40, 40, 40], [80, 80, 80], [120, 120, 120, 200, 200, 200]]
exp_kernel_counts = [[2, 2], [1, 2, 2, 2], [1, 1, 1], [2, 2, 2, 1, 1, 1]]
conv1_kernel_counts = [[1, 1], [3, 2, 2, 2], [3, 2, 2], [3, 4, 4, 5, 4, 4]]
conv2_kernel_counts = [[2, 2], [1, 2, 2, 2], [2, 2, 2], [2, 2, 2, 1, 2, 2]]
exp_factors = [[6, 3], [6, 6, 6, 6], [6, 6, 6], [6, 3, 3, 6, 6, 6]]
se_factors = [[0, 0], [2, 2, 2, 2], [4, 4, 4], [2, 2, 2, 2, 2, 2]]
elif version == "m":
init_block_channels = 24
channels = [[32, 32], [40, 40, 40, 40], [80, 80, 80, 80], [120, 120, 120, 120, 200, 200, 200, 200]]
exp_kernel_counts = [[2, 2], [1, 2, 2, 2], [1, 2, 2, 2], [1, 2, 2, 2, 1, 1, 1, 1]]
conv1_kernel_counts = [[3, 1], [4, 2, 2, 2], [3, 4, 4, 4], [1, 4, 4, 4, 4, 4, 4, 4]]
conv2_kernel_counts = [[2, 2], [1, 2, 2, 2], [1, 2, 2, 2], [1, 2, 2, 2, 1, 2, 2, 2]]
exp_factors = [[6, 3], [6, 6, 6, 6], [6, 6, 6, 6], [6, 3, 3, 3, 6, 6, 6, 6]]
se_factors = [[0, 0], [2, 2, 2, 2], [4, 4, 4, 4], [2, 2, 2, 2, 2, 2, 2, 2]]
else:
raise ValueError("Unsupported MixNet version {}".format(version))
final_block_channels = embedding_size
if width_scale != 1.0:
channels = [[round_channels(cij * width_scale) for cij in ci] for ci in channels]
init_block_channels = round_channels(init_block_channels * width_scale)
net = MixNet(
channels=channels,
init_block_channels=init_block_channels,
final_block_channels=final_block_channels,
exp_kernel_counts=exp_kernel_counts,
conv1_kernel_counts=conv1_kernel_counts,
conv2_kernel_counts=conv2_kernel_counts,
exp_factors=exp_factors,
se_factors=se_factors, gdw_size=gdw_size, shuffle=shuffle,
**kwargs)
if weight is not None:
weight = torch.load(weight)
net.load_state_dict(weight)
return net
def mixnet_s(embedding_size=512, width_scale=0.5, gdw_size=512, weight=None, shuffle=True, **kwargs):
"""
MixNet-S model from 'MixConv: Mixed Depthwise Convolutional Kernels,' https://arxiv.org/abs/1907.09595.
Parameters:
----------
pretrained : bool, default False
Whether to load the pretrained weights for model.
root : str, default '~/.torch/models'
Location for keeping the model parameters.
"""
return get_mixnet(version="s", width_scale=width_scale, embedding_size=embedding_size, gdw_size=gdw_size,
model_name="mixnet_s", shuffle=shuffle, **kwargs)
def mixnet_m(embedding_size=512, width_scale=0.5, gdw_size=512, shuffle=True, **kwargs):
"""
MixNet-M model from 'MixConv: Mixed Depthwise Convolutional Kernels,' https://arxiv.org/abs/1907.09595.
Parameters:
----------
pretrained : bool, default False
Whether to load the pretrained weights for model.
root : str, default '~/.torch/models'
Location for keeping the model parameters.
"""
return get_mixnet(version="m", width_scale=width_scale, embedding_size=embedding_size, gdw_size=gdw_size,
model_name="mixnet_m", shuffle=shuffle, **kwargs)
def mixnet_l(embedding_size=512, width_scale=1.3, shuffle=True, **kwargs):
"""
MixNet-L model from 'MixConv: Mixed Depthwise Convolutional Kernels,' https://arxiv.org/abs/1907.09595.
Parameters:
----------
pretrained : bool, default False
Whether to load the pretrained weights for model.
root : str, default '~/.torch/models'
Location for keeping the model parameters.
"""
return get_mixnet(version="m", width_scale=width_scale, embedding_size=embedding_size, model_name="mixnet_l",
shuffle=shuffle, **kwargs)
if __name__ == "__main__":
net = mixnet_l()
x = torch.randn(1, 3, 112, 112)
net.eval()
y = net(x)
print(y.shape)
Sure, thanks for sharing this DNN. I will check it in next week.
@C0NGTRI123
I checked the mixnet today. The mixture of split, group conv, and concat definitely caused some trouble. Supporting it requires sophisticated efforts to consider their combination, which may not be an easy fix. But if we could exclude group conv, marking groups=1
, the fix should be relatively light-weight. The below is my pseudo fix in pruning_compression.py.
@tianyic I print node_in.op_name that they not return 'split'. Can you check again that, Here is sample network I test, I take from torch pruning
class DemoSplitNet(nn.Module):
def __init__(self, in_dim):
super().__init__()
self.block1 = nn.Sequential(
nn.Conv2d(in_dim, in_dim, 1),
nn.BatchNorm2d(in_dim),
nn.GELU(),
nn.Conv2d(in_dim, in_dim * 4, 1),
nn.BatchNorm2d(in_dim * 4)
)
self.block2_1 = nn.Sequential(
nn.Conv2d(in_dim, in_dim, 1),
nn.BatchNorm2d(in_dim)
)
self.block2_2 = nn.Sequential(
nn.Conv2d(2 * in_dim, in_dim, 1),
nn.BatchNorm2d(in_dim)
)
def forward(self, x):
x = self.block1(x)
num_ch = x.shape[1]
c1, c2 = self.block2_1[0].in_channels, self.block2_2[0].in_channels
x1, x2, x3 = torch.split(x, [c1, c1, c2], dim=1)
x1 = self.block2_1(x1)
x2 = self.block2_1(x2)
x3 = self.block2_2(x3)
return x1, x2, x3
if __name__ == "__main__":
model = DemoConcatSplitNet(10)
input = torch.randn(1, 10, 7, 7)
output = model(input)
@C0NGTRI123
Thanks for more information. I am actively updating the repo upon some recent news, thereby promptly took sample network for a try. But I was not able to reproduce the mentioned issue, i.e., no split returned. For debugging purpose, could you take a look into the rendered pruning dependency graph to check if there is a split
node.
If I added the the following print functions, I could get
Node id: node-42-43-44, op_name: split, param_names: []
Node id: node-45, op_name: conv, param_names: ['block2_1.0.weight', 'block2_1.0.bias']
Node id: node-42-43-44, op_name: split, param_names: []
Node id: node-49, op_name: conv, param_names: ['block2_2.0.weight', 'block2_2.0.bias']
Therefore, as long as the in_dim_pruned_idxes
properly setup for split
node, such DNN is expected to be supported smoothly.
I try to compress mixnet.py, Can you check again case concat-split, I try to unpruned conv before concat, and after split. But they return false in split. Thank's you MixNet_zig.gv.pdf