FederatedAI / research

Apache License 2.0
165 stars 25 forks source link

VFL Backdoor >> reproduced code with torch in Mnist failed #2

Closed avdvg closed 1 year ago

avdvg commented 2 years ago

Hi, dear author! I used torch to reproduce the backdoor attack paper "Backdoor attacks and defenses in feature-partitioned collaborative learning", but my attack success rate of backdoor attacks on the mnist dataset is only about 10%. My code seems to be similar to the paper you introduced, but I don’t know if I have overlooked important details. Can you give me some suggestions for my code, thanks!

Below is my code:

""" 内容:复现论文《Backdoor attacks and defenses in feature-partitioned collaborative learning》的梯度替换后门攻击方法 人员:Rongchang """

import os os.environ["CUDA_VISIBLE_DEVICES"] = "2"

导入包

import torch import torch.nn.functional as F from torch import nn, optim from torch.utils.data import DataLoader import torchvision import matplotlib.pyplot as plt import copy import numpy as np import torch from torch.utils.data import Dataset from torchvision import transforms from tqdm import tqdm

device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu") print(device) import warnings warnings.filterwarnings("ignore") np.random.seed(123)

原始数据导入

train_transform = transforms.Compose([transforms.ToTensor()]) test_transform = transforms.Compose([transforms.ToTensor()]) trainset = torchvision.datasets.MNIST(root='Mnist/data', train=True, download=True, transform=train_transform) train_loader = torch.utils.data.DataLoader(dataset=trainset, batch_size=128, shuffle=True) testset = torchvision.datasets.MNIST(root='Mnist/data', train=False, download=True, transform=test_transform) test_loader = torch.utils.data.DataLoader(dataset=testset, batch_size=128, shuffle=False)

train_labels = trainset.targets test_labels = testset.targets trainset = trainset.data testset = testset.data

import random import copy

%%

import torch.nn as nn

定义模型

class VFLPassiveModel(nn.Module): def init(self): super(VFLPassiveModel, self).init() self.conv = nn.Conv2d(1, 6, 5, 1, 2) self.pooling = nn.MaxPool2d(2, 2) self.d1 = nn.Linear(588,10)

def forward(self, x):
    x = self.conv(x)
    x = F.relu(x)
    x = self.pooling(x)
    x = torch.flatten(x,1)
    x = self.d1(x)
    return x

mnist数据集对应的服务器模型是不进行训练的

class VFLActiveModel(nn.Module): def init(self): super(VFLActiveModel, self).init() self.d1 = nn.Linear(64,32) self.d2 = nn.Linear(32, 10)

def forward(self, x):
    # x = torch.cat(tuple(x), dim=1)
    # x = self.d1(x)
    # x = self.d2(x)
    x = F.softmax(torch.add(x[0],x[1]), dim=1)
    return x

import copy training_mode_list = ['backdoor','backdoor_with_amplify_rate_10'] result_list = [] for indx in range(len(training_mode_list)): result_list.append([])

被动方检查当前批数据是否包含攻击样本

def need_poison_down_check(images): need_poison_list = [True if images[indx,25,25]>240 and \ images[indx,26,26]>240 and \ images[indx,25,27]>240 and \ images[indx,27,25]>240 else False\ for indx in range(len(images))] return np.array(need_poison_list)

sample_id_need_copy = 1

EPOCHS = 50 number_of_times = 1

训练模式

mode_need_train_list = ['backdoor'] for t in range(number_of_times): for indx in range(len(training_mode_list)):

设置训练模式

    training_mode = training_mode_list[indx]
    if training_mode not in mode_need_train_list:
        continue

    # 设置被动方模型
    passive_model_up = VFLPassiveModel().cuda()
    passive_model_down = VFLPassiveModel().cuda()

    # 设置主动方模型
    active_model = VFLActiveModel().cuda()
    print('training_mode', training_mode)

    # 设置优化器
    opt_p1 = optim.Adam(passive_model_up.parameters(), lr=0.001)
    opt_p2 = optim.Adam(passive_model_down.parameters(), lr=0.001)
    opt_a = optim.Adam(active_model.parameters(), lr=0.001)

    acc_train = []
    acc_test = []
    acc_test_label = []
    acc_backdoor = []
    loss_train = []
    loss_test = []
    loss_backdoor = []

    active_up_gradient_res = None
    has_poison_up_grad = False
    active_down_gradient_res = None
    has_poison_down_grad = True
    poison_N = 600

    for epoch in range(EPOCHS):

        loss_init = 0
        right_init = 0
        opt_p1.zero_grad()
        opt_p2.zero_grad()
        opt_a.zero_grad()
        number_of_poison = 0
        poison_number = 0
        iter_num = 10 # 一个批量中中毒的数量

        has_found_target_grad = False

        for i, (images, labels) in enumerate(train_loader):

            # 如何确定被中毒的样本
            poison_indx = random.sample(range(images.shape[0]), iter_num)

            images = images.cuda()
            labels = labels.cuda()

            if poison_number<poison_N and i!=0:

                images[poison_indx, :, 27, 27] = 1.0
                images[poison_indx, :, 26, 26] = 1.0
                images[poison_indx, :, 25, 27] = 1.0
                images[poison_indx, :, 27, 25] = 1.0

            images_up = images[:, :, :, :14]
            images_down = images[:, :, :, 14:]

            passive_up_output = passive_model_up(images_up)
            passive_down_output = passive_model_down(images_down)

            if poison_number<poison_N and i!=0:
                passive_down_output[poison_indx]=torch.zeros(passive_down_output[poison_indx].shape).cuda()

            poisoned_up_output = torch.autograd.Variable(passive_up_output.detach().clone(), requires_grad=True)
            poisoned_down_output = torch.autograd.Variable(passive_down_output.detach().clone(), requires_grad=True)

            concat_tensor = [poisoned_up_output, poisoned_down_output]

            # 主动方基于被动方的输出计算损失函数
            active_output = active_model(concat_tensor)
            loss = nn.CrossEntropyLoss()(active_output, labels)

            # 计算主动方和被动方的梯度信息
            activate_gradient_list = [torch.autograd.grad(loss, a, retain_graph=True) for a in concat_tensor]

            # 找到已知辅助样本的梯度信息
            if not has_found_target_grad:
                # 需要被替换的梯度信息
                inject_gard = activate_gradient_list[1][0][sample_id_need_copy]
                has_found_target_grad = True

            # 将训练中中毒的样本进行改变梯度
            amplify_rate = 10
            if poison_number<poison_N:
                activate_gradient_list[1][0][poison_indx]=inject_gard * amplify_rate
                poison_number+=iter_num

            passive_model_up_gradient = torch.autograd.grad(passive_up_output, passive_model_up.parameters(), grad_outputs=activate_gradient_list[0], retain_graph=True)
            passive_model_down_gradient = torch.autograd.grad(passive_down_output, passive_model_down.parameters(), grad_outputs=activate_gradient_list[1], retain_graph=True)

            for w, g in zip(passive_model_up.parameters(), passive_model_up_gradient):
                w.grad = g.detach()
            opt_p1.step()

            for w, g in zip(passive_model_down.parameters(), passive_model_down_gradient):
                w.grad = g.detach()
            opt_p2.step()

            # 更新主动方模型
            loss.backward()
            opt_a.step()

            # 计算被动方梯度信息
            loss_init += loss.item()
            pred = np.argmax(active_output.detach().cpu(), axis=1)
            # print('pred',pred)
            # print('label',labels)
            right_init += np.sum(np.array(pred == labels.cpu()) + 0)
        print('poison number', poison_number)
        print('epoch: {}, loss is {}'.format(epoch, loss_init/trainset.shape[0]))

%%

        # 测试阶段
        loss_init = 0
        right_init = 0
        for images, labels in test_loader:
            images = images.cuda()
            labels = labels.cuda()
            images_up = images[:, :, :, :14]
            images_down = images[:, :, :, 14:]
            passive_up_output = passive_model_up(images_up)
            passive_down_output = passive_model_down(images_down)
            concat_tensor = [passive_up_output, passive_down_output]

            # 主动方基于被动方的输出计算损失函数
            active_output = active_model(concat_tensor)

            pred = np.argmax(active_output.detach().cpu(), axis=1)
            right_init += np.sum(np.array(pred == labels.cpu()) + 0)
        print('epoch: {}, testing acc is {}'.format(epoch, right_init / testset.shape[0]))

        # 测试后门攻击的成功率
        loss_init = 0
        right_init = 0
        # for images, labels in test_loader:
        for i, (images, labels) in enumerate(test_loader):

            # print(torch.max(images))
            # images[:, :, 25:, 25:] = 1.0
            images[:, :, 27, 27] = 1.0
            images[:, :, 26, 26] = 1.0
            images[:, :, 25, 27] = 1.0
            images[:, :, 27, 25] = 1.0

            labels[:] = train_labels[sample_id_need_copy]
            images_up = images[:, :, :, :14].cuda()
            images_down = images[:, :, :, 14:].cuda()
            labels = labels.cuda()

            passive_up_output = passive_model_up(images_up)
            passive_down_output = passive_model_down(images_down)
            concat_tensor = [passive_up_output, passive_down_output]

            # 主动方基于被动方的输出计算损失函数
            active_output = active_model(concat_tensor)

            if i == 0 and epoch==0:
                output_distribution = np.average(active_output.cpu().detach().numpy(), axis=0)
                X = np.arange(10)
                plt.bar(X, output_distribution)
                plt.show()

            pred = np.argmax(active_output.detach().cpu(), axis=1)
            right_init += np.sum(np.array(pred == labels.cpu()) + 0)

        print('epoch: {}, testing asr is {}'.format(epoch, right_init / testset.shape[0]))
        print(' ')

experiment results:

poison number 600 epoch: 9, loss is 0.011629524425665538 epoch: 9, testing acc is 0.9759 epoch: 9, testing asr is 0.1001

Best, Rongchang

yankang18 commented 1 year ago

You can find the code for this paper through https://github.com/FLAIR-THU/TrustworthyFederatedLearning and ask the first author for these questions.