Source code for aijack.defense.dp.manager.adadps
import torch
from .dpoptimizer import (
_apply_clip_coef,
_calculate_clip_coef,
_clear_accumulated_grads,
_privatize_lot_grads,
)
def _update_side_info_rmsprop(opt):
"""
Update side information for RMSprop optimizer.
Args:
opt: Optimizer instance.
"""
for group in opt.param_groups:
for param, si in zip(group["params"], group["side_information"]):
if param.requires_grad:
si = (opt.beta) * si + (1 - opt.beta) * param.grad.data**2
def _apply_side_infor_rmsprop(opt):
"""
Apply side information for RMSprop optimizer.
Args:
opt: Optimizer instance.
"""
for group in opt.param_groups:
for param, si in zip(group["params"], group["side_information"]):
if param.requires_grad:
param.grad.data.div_(torch.sqrt(si) + opt.eps_to_avoid_nan)
def _update_side_info_adam(opt):
"""
Update side information for Adam optimizer.
Args:
opt: Optimizer instance.
"""
for group in opt.param_groups:
for param, si, pm in zip(
group["params"], group["side_information"], group["potential_momentum"]
):
if param.requires_grad:
si = (opt.beta) * si + (1 - opt.beta) * param.grad.data**2
pm = (opt.beta) * pm + (1 - opt.beta) * param.grad.data
def _apply_side_infor_adam(opt):
"""
Apply side information for Adam optimizer.
Args:
opt: Optimizer instance.
"""
for group in opt.param_groups:
for param, si, pm in zip(
group["params"], group["side_information"], group["potential_momentum"]
):
if param.requires_grad:
param.grad.data.mul_(pm / (torch.sqrt(si) + opt.eps_to_avoid_nan))
def _precondition_grads_with_side_info(opt):
"""
Precondition gradients with side information.
Args:
opt: Optimizer instance.
"""
if opt.mode == "rmsprop":
_apply_side_infor_rmsprop(opt)
elif opt.mode == "adam":
_apply_side_infor_adam(opt)
[docs]def attach_adadps(
cls,
accountant,
l2_norm_clip,
noise_multiplier,
lot_size,
batch_size,
dataset_size,
mode="rmsprop",
beta=0.9,
eps_to_avoid_nan=1e-8,
):
"""
Attach the AdaDPS optimizer to the given class.
Args:
cls: Class to which AdaDPS optimizer will be attached.
accountant: Privacy accountant.
l2_norm_clip (float): L2 norm clip value.
noise_multiplier (float): Noise multiplier value.
lot_size (int): Lot size.
batch_size (int): Batch size.
dataset_size (int): Size of the dataset.
mode (str, optional): Mode of optimization. Defaults to "rmsprop".
beta (float, optional): Beta value. Defaults to 0.9.
eps_to_avoid_nan (float, optional): Epsilon value to avoid NaN. Defaults to 1e-8.
Returns:
class: Class with AdaDPS optimizer attached.
"""
class AdaDPSWrapper(cls):
"""Implementation of AdaDPS proposed in
`Private Adaptive Optimization with Side information`
(https://arxiv.org/pdf/2202.05963.pdf)"""
def __init__(self, *args, **kwargs):
super(AdaDPSWrapper, self).__init__(*args, **kwargs)
if noise_multiplier < 0.0:
raise ValueError(
"Invalid noise_multiplier: {}".format(noise_multiplier)
)
if l2_norm_clip < 0.0:
raise ValueError("Invalid l2_norm_clip: {}".format(l2_norm_clip))
self.accountant = accountant
self.l2_norm_clip = l2_norm_clip
self.noise_multiplier = noise_multiplier
self.batch_size = batch_size
self.lot_size = lot_size
self.mode = mode
self.beta = beta
self.eps_to_avoid_nan = eps_to_avoid_nan
for group in self.param_groups:
group["accum_grads"] = [
torch.zeros_like(param.data) if param.requires_grad else None
for param in group["params"]
]
for group in self.param_groups:
group["side_information"] = [
torch.zeros_like(param.data) if param.requires_grad else None
for param in group["params"]
]
for group in self.param_groups:
group["potential_momentum"] = [
torch.zeros_like(param.data) if param.requires_grad else None
for param in group["params"]
]
def zero_grad(self):
super(AdaDPSWrapper, self).zero_grad()
def accumulate_grad(self):
_precondition_grads_with_side_info(self)
clip_coef = _calculate_clip_coef(self)
_apply_clip_coef(self, clip_coef)
def step_public(self):
if mode == "rmsprop":
_update_side_info_rmsprop(self)
elif mode == "adam":
_update_side_info_adam(self)
def step(self):
self.accumulate_grad()
def zero_grad_for_lot(self):
_clear_accumulated_grads(self)
def step_for_lot(self, *args, **kwargs):
_privatize_lot_grads(self)
super(AdaDPSWrapper, self).step(*args, **kwargs)
accountant.add_step_info(
{"sigma": self.noise_multiplier}, self.lot_size / dataset_size, 1
)
return AdaDPSWrapper