Source code for aijack.defense.dp.manager.dpoptimizer
import torch
def _clear_accumulated_grads(opt):
for group in opt.param_groups:
for accum_grad in group["accum_grads"]:
if accum_grad is not None:
accum_grad.zero_()
def _calculate_clip_coef(opt):
total_norm = 0.0
for group in opt.param_groups:
for param in group["params"]:
if param.requires_grad:
total_norm += param.grad.data.norm(2).item() ** 2.0
total_norm = total_norm**0.5
clip_coef = min(opt.l2_norm_clip / (total_norm + 1e-6), 1.0)
return clip_coef
def _apply_clip_coef(opt, clip_coef):
for group in opt.param_groups:
for param, accum_grad in zip(group["params"], group["accum_grads"]):
if param.requires_grad:
accum_grad.add_(param.grad.data.mul(clip_coef))
def _privatize_lot_grads(opt):
for group in opt.param_groups:
for param, accum_grad in zip(group["params"], group["accum_grads"]):
if param.requires_grad:
param.grad.data = accum_grad.clone()
param.grad.data.add_(
opt.l2_norm_clip
* opt.noise_multiplier
* torch.randn_like(param.grad.data)
)
param.grad.data.mul_(opt.batch_size / opt.lot_size)
[docs]def attach_dpoptimizer(
cls,
accountant,
l2_norm_clip,
noise_multiplier,
lot_size,
batch_size,
dataset_size,
smoothing=False,
smoothing_radius=10.0,
):
"""Wraps the given optimizer class in DPOptimizerWrapper.
Args:
accountant (BaseMomentAccountant): moment accountant
l2_norm_clip (float): upper bound of l2-norm
noise_multiplier (float): scale for added noise
lot_size (int): sampled lot size
batch_size (int): batch size
dataset_size (int): total number of samples in the dataset
smoothing (bool): if true, apply smoothing proposed in `Wang, Wenxiao,
et al. ``Dplis: Boosting utility of differentially
private deep learning via randomized smoothing.``
arXiv preprint arXiv:2103.01496 (2021).`
(default=False)
smoothing_radius (float): radius of smoothing (default=10.0)
Raises:
ValueError: if noise_multiplier < 0.0
ValueError: if l2_norm_clip < 0
Returns:
cls: wrapped DPOptimizerWrapper
"""
class DPOptimizerWrapper(cls):
def __init__(self, *args, **kwargs):
super(DPOptimizerWrapper, self).__init__(*args, **kwargs)
if noise_multiplier < 0.0:
raise ValueError(
"Invalid noise_multiplier: {}".format(noise_multiplier)
)
if l2_norm_clip < 0.0:
raise ValueError("Invalid l2_norm_clip: {}".format(l2_norm_clip))
self.accountant = accountant
self.l2_norm_clip = l2_norm_clip
self.noise_multiplier = noise_multiplier
self.batch_size = batch_size
self.lot_size = lot_size
self.prev_params = None
for group in self.param_groups:
group["accum_grads"] = [
torch.zeros_like(param.data) if param.requires_grad else None
for param in group["params"]
]
def zero_grad_keep_accum_grads(self):
super(DPOptimizerWrapper, self).zero_grad()
if smoothing:
if self.prev_params is not None:
for group, prev_ps in zip(self.param_groups, self.prev_params):
for param, prev_p in zip(group["params"], prev_ps):
param.data = prev_p.data + (
smoothing_radius * group["lr"] / self.lot_size
) * self.l2_norm_clip * noise_multiplier * torch.randn_like(
prev_p.data
)
else:
self.prev_params = [
[p.clone() for p in group["params"]]
for group in self.param_groups
]
def zero_grad(self):
self.zero_grad_keep_accum_grads()
def accumulate_grad(self):
clip_coef = _calculate_clip_coef(self)
_apply_clip_coef(self, clip_coef)
def step(self):
self.accumulate_grad()
def zero_grad_for_lot(self):
_clear_accumulated_grads(self)
self.prev_params = None
def step_for_lot(self, *args, **kwargs):
_privatize_lot_grads(self)
super(DPOptimizerWrapper, self).step(*args, **kwargs)
accountant.add_step_info(
{"sigma": self.noise_multiplier}, self.lot_size / dataset_size, 1
)
return DPOptimizerWrapper