Source code for aijack.defense.debugging.neuroncoverage.neuroncoverage
import copy
import torch
import torch.nn as nn
[docs]def scale(out, dim=-1, rmax=1, rmin=0):
output_std = (out - out.min()) / (out.max() - out.min())
output_scaled = output_std * (rmax - rmin) + rmin
return output_scaled
[docs]def step_through_model(model, prefix=""):
for name, module in model.named_children():
path = "{}/{}".format(prefix, name)
if (
isinstance(module, nn.Conv1d)
or isinstance(module, nn.Conv2d)
or isinstance(module, nn.Linear)
): # test for dataset
yield (path, name, module)
else:
yield from step_through_model(module, path)
[docs]def get_model_layers(model, cross_section_size=0):
layer_dict = {}
i = 0
for path, name, module in step_through_model(model):
layer_dict[str(i) + path] = module
i += 1
if cross_section_size > 0:
target_layers = list(layer_dict)[0::cross_section_size]
layer_dict = {
target_layer: layer_dict[target_layer] for target_layer in target_layers
}
return layer_dict
[docs]def get_layer_output_sizes(model, layer_dict, data):
output_sizes = {}
hooks = []
def hook(module, input, output):
module_idx = len(output_sizes)
m_key = list(layer_dict)[module_idx]
output_sizes[m_key] = list(output.size()[1:])
for name, module in layer_dict.items():
hooks.append(module.register_forward_hook(hook))
try:
model(data[:1])
finally:
for h in hooks:
h.remove()
return output_sizes
[docs]class NeuronCoverageTracker:
def __init__(self, model, dummy_data, threshold=0.9, device="cpu"):
self.model = model
self.threshold = threshold
self.device = device
self.layer_dict = get_model_layers(model)
self.output_shape_of_layers = get_layer_output_sizes(
model, self.layer_dict, dummy_data
)
self.cov_tracker = {}
def _init_cov_tracker(self):
for layer_name, layer_shape in self.output_shape_of_layers.items():
self.cov_tracker[layer_name] = (
torch.zeros(layer_shape[0]).type(torch.BoolTensor).to(self.device)
)
def _update_cov_tracker(self, x):
for (
layer_name,
layer_module,
) in self.layer_dict.items():
layer_intermediate_output = get_intermediate_outputs(
self.model, x, layer_module
)
layer_intermediate_output = torch.squeeze(
torch.sum(layer_intermediate_output, dim=1)
)
layer_intermediate_output_scaled = scale(layer_intermediate_output)
threshold = self.threshold
mask_index = layer_intermediate_output_scaled > threshold
self.cov_tracker[layer_name] = mask_index | self.cov_tracker[layer_name]
[docs] def coverage(self, dataloader, pos_of_x=0, initialize=True, update=True):
if initialize:
self._init_cov_tracker()
if not update:
cov_tracker_prev = copy.deepcopy(self.cov_tracker)
for data in dataloader:
if pos_of_x is None:
x = data.to(self.decice)
else:
x = data[pos_of_x].to(self.device)
self._update_cov_tracker(x)
num_covered_neurons = 0
num_total_neurons = 0
for is_covered in self.cov_tracker.values():
num_covered_neurons += is_covered.sum()
num_total_neurons += len(is_covered)
if not update:
self.cov_tracker = cov_tracker_prev
return (num_covered_neurons / num_total_neurons).item()