10.1. Neuron Coverage#

In this tutorial, we will show how you can use AIJack to debug and improve the trained neural network with Neuron Coverage proposed in Pei, Kexin, et al. "Deepxplore: Automated whitebox testing of deep learning systems." proceedings of the 26th Symposium on Operating Systems Principles. 2017.

!git clone https://github.com/harveyslash/Facial-Similarity-with-Siamese-Networks-in-Pytorch.git
!mkdir data
!mv Facial-Similarity-with-Siamese-Networks-in-Pytorch/data/faces/testing/* data/
!mv Facial-Similarity-with-Siamese-Networks-in-Pytorch/data/faces/training/* data/
Cloning into 'Facial-Similarity-with-Siamese-Networks-in-Pytorch'...
remote: Enumerating objects: 550, done.
remote: Counting objects: 100% (18/18), done.
remote: Compressing objects: 100% (10/10), done.
remote: Total 550 (delta 6), reused 18 (delta 6), pack-reused 532
Receiving objects: 100% (550/550), 6.32 MiB | 6.99 MiB/s, done.
Resolving deltas: 100% (27/27), done.
Updating files: 100% (405/405), done.
import cv2
import numpy as np
import random
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision.transforms as transforms
from torch.utils.data import TensorDataset
from torch.utils.data import DataLoader, SubsetRandomSampler
from sklearn.model_selection import train_test_split
from matplotlib import pyplot as plt
import glob
from sklearn.metrics import accuracy_score

from aijack.utils import NumpyDataset
from aijack.defense.debugging.neuroncoverage import *
/usr/local/lib/python3.10/dist-packages/tqdm/auto.py:21: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html
  from .autonotebook import tqdm as notebook_tqdm
/usr/local/lib/python3.10/dist-packages/torchvision/io/image.py:13: UserWarning: Failed to load image Python extension: '/usr/local/lib/python3.10/dist-packages/torchvision/image.so: undefined symbol: _ZN3c104cuda20CUDACachingAllocator9allocatorE'If you don't plan on using image functionality from `torchvision.io`, you can ignore this warning. Otherwise, there might be something wrong with your environment. Did you have `libjpeg` or `libpng` installed before building `torchvision` from source?
  warn(
def fix_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True


fix_seed(42)
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.fla = nn.Flatten()
        self.fc = nn.Linear(112 * 92, 40)

    def forward(self, x):
        x = self.fla(x)
        x = self.fc(x)
        x = F.softmax(x, dim=1)
        return x


def split_dataloader(data_loader, k):
    dataset = data_loader.dataset
    dataset_size = len(dataset)
    batch_size = data_loader.batch_size

    # Calculate the size of each subset
    subset_size = dataset_size // k
    remainder = dataset_size % k

    # Create a list to store the k DataLoaders
    dataloaders = []

    # Create subsets and DataLoaders
    start_idx = 0
    for i in range(k):
        end_idx = start_idx + subset_size + (1 if i < remainder else 0)
        indices = list(range(start_idx, end_idx))
        sampler = SubsetRandomSampler(indices)
        dataloader = DataLoader(dataset, batch_size=batch_size, sampler=sampler)
        dataloaders.append(dataloader)
        start_idx = end_idx

    return dataloaders
BASE = "data/"

imgs = []
labels = []
for i in range(1, 41):
    for j in range(1, 11):
        img = cv2.imread(BASE + f"s{i}/{j}.pgm", 0)
        imgs.append(img)
        labels.append(i - 1)

X = np.stack(imgs)
y = np.array(labels)

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.33, random_state=42
)

# ToTensor:画像のグレースケール化(RGBの0~255を0~1の範囲に正規化)、Normalize:Z値化(RGBの平均と標準偏差を0.5で決め打ちして正規化)
transform = transforms.Compose(
    [transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,))]
)
trainset = NumpyDataset(X_train, y_train, transform=transform)
trainloader = torch.utils.data.DataLoader(
    trainset, batch_size=4, shuffle=True, num_workers=2
)
testset = NumpyDataset(X_test, y_test, transform=transform)
testloader = torch.utils.data.DataLoader(
    testset, batch_size=4, shuffle=True, num_workers=2
)

net = Net()
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=0.005, momentum=0.9)
for epoch in range(10):  # loop over the dataset multiple times
    running_loss = 0
    data_size = 0
    for i, data in enumerate(trainloader, 0):
        # get the inputs; data is a list of [inputs, labels]
        inputs, labels = data

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = net(inputs)
        loss = criterion(outputs, labels.to(torch.int64))
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        data_size += inputs.shape[0]

    print(f"epoch {epoch}: loss is {running_loss/data_size}")


in_preds = []
in_label = []
with torch.no_grad():
    for data in testloader:
        inputs, labels = data
        outputs = net(inputs)
        in_preds.append(outputs)
        in_label.append(labels)
    in_preds = torch.cat(in_preds)
    in_label = torch.cat(in_label)
print(
    "Test Accuracy is: ",
    accuracy_score(np.array(torch.argmax(in_preds, axis=1)), np.array(in_label)),
)
epoch 0: loss is 0.9179768215364484
epoch 1: loss is 0.8781074712525553
epoch 2: loss is 0.8334654267154523
epoch 3: loss is 0.8095823072675449
epoch 4: loss is 0.7987308706810226
epoch 5: loss is 0.7798728791635428
epoch 6: loss is 0.7634210195114364
epoch 7: loss is 0.7491738876300071
epoch 8: loss is 0.7392014594220403
epoch 9: loss is 0.7327400935229971
Test Accuracy is:  0.6742424242424242

We then generate additional data that increases Neuron Coverage to improve the performance of the model based on the prior study Yang, Zhou, et al. "Revisiting neuron coverage metrics and quality of deep neural networks." 2022 IEEE International Conference on Software Analysis, Evolution and Reengineering (SANER). IEEE, 2022.

k = 3
split_dataloaders = split_dataloader(trainloader, k)

additional_x = []
additional_y = []
t = 0.5

print(
    "Test Accuracy (before) is: ",
    accuracy_score(np.array(torch.argmax(in_preds, axis=1)), np.array(in_label)),
)
NCT = neuroncoverage.NeuronCoverageTracker(
    net, threshold=t, dummy_data=inputs[[0]], device="cpu"
)
nc = NCT.coverage(trainloader)
print("NC (before): ", nc)

for k, sd in enumerate(split_dataloaders):
    for db in sd:
        xb, yb = db
        for x, y in zip(xb, yb):
            x += torch.randn(x.shape) * 0.03
            x = x.reshape(-1, 1, 112, 92)
            yp = net(x)
            if yp.argmax().item() != y.item():
                ncu = NCT.coverage([x], initialize=False, update=False)
                if ncu > nc:
                    additional_x.append(x.numpy()[0][0])
                    additional_y.append(y.numpy().item())

    if len(additional_x) == 0:
        continue

    X_train_augmented = np.concatenate(
        [X_train, np.stack(additional_x).astype(np.uint8)]
    )
    y_train_augmented = np.concatenate([y_train, np.array(additional_y)])

    trainset_augmented = NumpyDataset(
        X_train_augmented, y_train_augmented, transform=transform
    )
    trainloader_augmented = torch.utils.data.DataLoader(
        trainset_augmented, batch_size=4, shuffle=True, num_workers=2
    )

    # net = Net()
    # criterion = nn.CrossEntropyLoss()
    # optimizer = optim.SGD(net.parameters(), lr=0.005, momentum=0.9)

    for epoch in range(10):  # loop over the dataset multiple times
        running_loss = 0
        data_size = 0
        for i, data in enumerate(trainloader_augmented, 0):
            # get the inputs; data is a list of [inputs, labels]
            inputs, labels = data

            # zero the parameter gradients
            optimizer.zero_grad()

            # forward + backward + optimize
            outputs = net(inputs)
            loss = criterion(outputs, labels.to(torch.int64))
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            data_size += inputs.shape[0]

in_preds = []
in_label = []
with torch.no_grad():
    for data in testloader:
        inputs, labels = data
        outputs = net(inputs)
        in_preds.append(outputs)
        in_label.append(labels)
    in_preds = torch.cat(in_preds)
    in_label = torch.cat(in_label)
print("----")
print(f"{len(additional_x)} test cases generated")
print(
    "Test Accuracy (after) is: ",
    accuracy_score(np.array(torch.argmax(in_preds, axis=1)), np.array(in_label)),
)
nc = NCT.coverage(trainloader_augmented)
print("NC (after): ", nc)
Test Accuracy (before) is:  0.6742424242424242
NC (before):  0.7749999761581421
----
15 test cases generated
Test Accuracy (after) is:  0.8333333333333334
NC (after):  0.8999999761581421