Skip to content

Utilization

get_supported_optimizers(filters=None)

Return list of available optimizer names, sorted alphabetically.

Parameters:

Name Type Description Default
filters Optional[Union[str, List[str]]]

wildcard filter string that works with fmatch. if None, it will return the whole list.

None
Source code in pytorch_optimizer/optimizer/__init__.py
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
def get_supported_optimizers(filters: Optional[Union[str, List[str]]] = None) -> List[str]:
    r"""Return list of available optimizer names, sorted alphabetically.

    Args:
        filters (Optional[Union[str, List[str]]]): wildcard filter string that works with fmatch.
            if None, it will return the whole list.

    """
    if filters is None:
        return sorted(OPTIMIZERS.keys())

    include_filters: Sequence[str] = filters if isinstance(filters, (tuple, list)) else [filters]

    filtered_list: Set[str] = set()
    for include_filter in include_filters:
        filtered_list.update(fnmatch.filter(OPTIMIZERS.keys(), include_filter))

    return sorted(filtered_list)

get_supported_lr_schedulers(filters=None)

Return list of available lr scheduler names, sorted alphabetically.

:param filters: Optional[Union[str, List[str]]]. wildcard filter string that works with fmatch. if None, it will return the whole list.

Source code in pytorch_optimizer/lr_scheduler/__init__.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def get_supported_lr_schedulers(filters: Optional[Union[str, List[str]]] = None) -> List[str]:
    r"""Return list of available lr scheduler names, sorted alphabetically.

    :param filters: Optional[Union[str, List[str]]]. wildcard filter string that works with fmatch. if None, it will
        return the whole list.
    """
    if filters is None:
        return sorted(LR_SCHEDULERS.keys())

    include_filters: Sequence[str] = filters if isinstance(filters, (tuple, list)) else [filters]

    filtered_list: Set[str] = set()
    for include_filter in include_filters:
        filtered_list.update(fnmatch.filter(LR_SCHEDULERS.keys(), include_filter))

    return sorted(filtered_list)

get_supported_loss_functions(filters=None)

Return list of available loss function names, sorted alphabetically.

:param filters: Optional[Union[str, List[str]]]. wildcard filter string that works with fmatch. if None, it will return the whole list.

Source code in pytorch_optimizer/loss/__init__.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def get_supported_loss_functions(filters: Optional[Union[str, List[str]]] = None) -> List[str]:
    r"""Return list of available loss function names, sorted alphabetically.

    :param filters: Optional[Union[str, List[str]]]. wildcard filter string that works with fmatch. if None, it will
        return the whole list.
    """
    if filters is None:
        return sorted(LOSS_FUNCTIONS.keys())

    include_filters: Sequence[str] = filters if isinstance(filters, (tuple, list)) else [filters]

    filtered_list: Set[str] = set()
    for include_filter in include_filters:
        filtered_list.update(fnmatch.filter(LOSS_FUNCTIONS.keys(), include_filter))

    return sorted(filtered_list)

CPUOffloadOptimizer

Offload optimizer to CPU for single-GPU training. This will reduce GPU memory by the size of optimizer state.

Reference: https://github.com/pytorch/ao/blob/main/torchao/prototype/low_bit_optim/cpu_offload.py

Parameters:

Name Type Description Default
params ParamsT

A list of parameters or parameter groups.

required
optimizer_class Type[Optimizer]

Constructor of the base optimizer. Defaults to :class:torch.optim.AdamW.

AdamW
offload_gradients bool

Free GPU gradients once they are moved to CPU. Not compatible with gradient accumulation. Defaults to False.

False
kwargs Dict

Other keyword arguments to be passed to the base optimizer, e.g. lr, weight_decay.

{}
Source code in pytorch_optimizer/optimizer/utils.py
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
class CPUOffloadOptimizer:  # pragma: no cover
    r"""Offload optimizer to CPU for single-GPU training. This will reduce GPU memory by the size of optimizer state.

    Reference: https://github.com/pytorch/ao/blob/main/torchao/prototype/low_bit_optim/cpu_offload.py

    Args:
        params (ParamsT): A list of parameters or parameter groups.
        optimizer_class (Type[torch.optim.Optimizer]): Constructor of the base optimizer.
            Defaults to :class:`torch.optim.AdamW`.
        offload_gradients (bool, optional): Free GPU gradients once they are moved to CPU.
            Not compatible with gradient accumulation. Defaults to False.
        kwargs (Dict): Other keyword arguments to be passed to the base optimizer, e.g. `lr`, `weight_decay`.

    """

    def __init__(
        self,
        params: ParamsT,
        optimizer_class: Type[Optimizer] = torch.optim.AdamW,
        *,
        offload_gradients: bool = False,
        **kwargs,
    ) -> None:
        if optimizer_class is torch.optim.AdamW and TORCH_VERSION_AT_LEAST_2_4 and 'fused' not in kwargs:
            kwargs.update(fused=True)

        param_groups = list(params)
        if len(param_groups) == 0:
            raise ValueError('optimizer got an empty parameter list')

        if not isinstance(param_groups[0], dict):
            param_groups = [{'params': param_groups}]

        self.param_cuda2cpu_map: Dict[torch.Tensor, torch.Tensor] = {}
        self.optim_dict: Dict[torch.Tensor, Optimizer] = {}
        self.stream = torch.cuda.Stream()

        self.queue = {}

        def backward_hook(p_cuda: torch.Tensor) -> None:
            if p_cuda.grad is None:
                return

            p_cpu = self.param_cuda2cpu_map[p_cuda]

            self.stream.wait_stream(torch.cuda.current_stream())
            with torch.cuda.stream(self.stream):
                p_cpu.grad.copy_(p_cuda.grad, non_blocking=True)

            if p_cuda in self.queue:
                del self.queue[p_cuda]

            self.queue[p_cuda] = self.stream.record_event()

            if offload_gradients:
                p_cuda.grad.record_stream(self.stream)
                p_cuda.grad = None

        for param_group in param_groups:
            params = param_group.get('params', None)  # type: ignore
            if params is None:
                continue

            for p_cuda in params:
                p_cpu = torch.empty_like(p_cuda, device='cpu', pin_memory=True)
                p_cpu.grad = torch.empty_like(p_cpu, pin_memory=True)

                p_cpu.copy_(p_cuda.detach(), non_blocking=True)
                self.param_cuda2cpu_map[p_cuda] = p_cpu

                p_cuda.register_post_accumulate_grad_hook(backward_hook)
                self.optim_dict[p_cuda] = optimizer_class([{'params': p_cpu, **param_group}], **kwargs)  # type: ignore

    @torch.no_grad()
    def step(self, closure: Closure = None) -> Loss:
        loss = None
        if closure is not None:
            loss = closure()

        for p_cuda, grad_d2h_event in self.queue.items():
            grad_d2h_event.synchronize()
            self.optim_dict[p_cuda].step()

            p_cpu = self.param_cuda2cpu_map[p_cuda]
            with torch.cuda.stream(self.stream):
                p_cuda.copy_(p_cpu, non_blocking=True)

        self.queue.clear()

        return loss

    def zero_grad(self, _: bool = True) -> None:
        for p_cuda in self.param_cuda2cpu_map:
            p_cuda.grad = None

    @property
    def param_groups(self):
        return functools.reduce(operator.add, (optim.param_groups for optim in self.optim_dict.values()), [])

    def state_dict(self):
        return [optim.state_dict() for optim in self.optim_dict.values()]

    def load_state_dict(self, state_dict):
        for optim, optim_state_dict in zip(self.optim_dict.values(), state_dict):
            optim.load_state_dict(optim_state_dict)

is_valid_parameters(parameters)

Check where the parameters are valid.

Source code in pytorch_optimizer/optimizer/utils.py
216
217
218
def is_valid_parameters(parameters: ParamsT) -> bool:
    """Check where the parameters are valid."""
    return isinstance(parameters, (list, tuple)) and len(parameters) > 0 and isinstance(parameters[0], dict)

has_overflow(grad_norm)

Detect inf and NaN in grad_norm.

Source code in pytorch_optimizer/optimizer/utils.py
221
222
223
def has_overflow(grad_norm: torch.Tensor) -> bool:
    """Detect inf and NaN in grad_norm."""
    return bool(torch.logical_or(torch.isnan(grad_norm), torch.isinf(grad_norm)).any())

to_real(x)

Return real value of tensor.

Source code in pytorch_optimizer/optimizer/utils.py
226
227
228
def to_real(x: torch.Tensor) -> torch.Tensor:
    """Return real value of tensor."""
    return x.real if torch.is_complex(x) else x

normalize_gradient(x, use_channels=False, epsilon=1e-08)

Normalize gradient with stddev.

Parameters:

Name Type Description Default
x Tensor

Gradient tensor to normalize.

required
use_channels bool

If True, perform channel-wise normalization.

False
epsilon float

Small constant added for numerical stability.

1e-08
Source code in pytorch_optimizer/optimizer/utils.py
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
def normalize_gradient(x: torch.Tensor, use_channels: bool = False, epsilon: float = 1e-8) -> None:
    """Normalize gradient with stddev.

    Args:
        x (torch.Tensor): Gradient tensor to normalize.
        use_channels (bool): If True, perform channel-wise normalization.
        epsilon (float): Small constant added for numerical stability.

    """
    size: int = x.dim()
    if size > 1 and use_channels:
        s = x.std(dim=tuple(range(1, size)), keepdim=True).add_(epsilon)
        x.div_(s)
    elif torch.numel(x) > 2:
        s = x.std().add_(epsilon)
        x.div_(s)

clip_grad_norm(parameters, max_norm=0.0, sync=False)

Clip gradient norms.

During combination with FSDP, will also ensure that grad norms are aggregated across all workers, since each worker only stores their shard of the gradients.

Parameters:

Name Type Description Default
parameters ParamsT

ParamsT whose gradients we wish to clip.

required
max_norm float

Maximum norm we wish the gradients to have. If non-positive, then we will not perform clipping.

0.0
sync bool

Boolean indicating whether we should aggregate across the distributed group. Used only in combination with FSDP.

False

Returns:

Name Type Description
float Union[Tensor, float]

The gradient norm across all parameters, before clipping.

Source code in pytorch_optimizer/optimizer/utils.py
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
def clip_grad_norm(
    parameters: Union[ParamsT, torch.Tensor],
    max_norm: float = 0.0,
    sync: bool = False,
) -> Union[torch.Tensor, float]:
    """Clip gradient norms.

    During combination with FSDP, will also ensure that grad norms are aggregated across all workers,
    since each worker only stores their shard of the gradients.

    Args:
        parameters (ParamsT): ParamsT whose gradients we wish to clip.
        max_norm (float): Maximum norm we wish the gradients to have. If non-positive,
            then we will not perform clipping.
        sync (bool): Boolean indicating whether we should aggregate across the distributed group.
            Used only in combination with FSDP.

    Returns:
        float: The gradient norm across all parameters, before clipping.

    """
    if parameters is None:
        raise ValueError('ParamsT cannot be None.')

    if isinstance(parameters, torch.Tensor):
        parameters = [parameters]

    # make sure any generators are expanded
    parameters = cast(List, list(parameters))

    # if syncing we need to manually perform the clipping so that we aggregate properly
    if max_norm > 0 and not sync:
        return clip_grad_norm_(parameters, max_norm)

    norm_sq = sum(p.grad.norm() ** 2 for p in parameters if p.grad is not None)
    if sync:  # pragma: no cover
        # also need to get the norms from all the other sharded works in FSDP
        all_reduce(norm_sq)

    grad_norm: float = math.sqrt(norm_sq)
    if max_norm > 0:  # pragma: no cover
        clip_coefficient = max_norm / (grad_norm + 1e-6)
        for p in parameters:
            if p.grad is not None:
                p.grad.detach().mul_(clip_coefficient)

    return grad_norm

unit_norm(x, norm=2.0)

Get norm of unit.

Source code in pytorch_optimizer/optimizer/utils.py
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
def unit_norm(x: torch.Tensor, norm: float = 2.0) -> torch.Tensor:
    """Get norm of unit."""
    keep_dim: bool = True
    dim: Optional[Union[int, Tuple[int, ...]]] = None

    x_len: int = len(x.shape)
    if x_len <= 1:
        keep_dim = False
    elif x_len in (2, 3):
        dim = 1
    elif x_len == 4:
        dim = (1, 2, 3)
    else:
        dim = tuple(range(1, x_len))

    return x.norm(p=norm, dim=dim, keepdim=keep_dim)

disable_running_stats(model)

Disable running stats (momentum) of BatchNorm.

Source code in pytorch_optimizer/optimizer/utils.py
316
317
318
319
320
321
322
323
324
def disable_running_stats(model: nn.Module):
    """Disable running stats (momentum) of BatchNorm."""

    def _disable(module):
        if isinstance(module, _BatchNorm):
            module.backup_momentum = module.momentum
            module.momentum = 0

    model.apply(_disable)

enable_running_stats(model)

Enable running stats (momentum) of BatchNorm.

Source code in pytorch_optimizer/optimizer/utils.py
327
328
329
330
331
332
333
334
def enable_running_stats(model: nn.Module):
    """Enable running stats (momentum) of BatchNorm."""

    def _enable(module):
        if isinstance(module, _BatchNorm) and hasattr(module, 'backup_momentum'):
            module.momentum = module.backup_momentum

    model.apply(_enable)

get_global_gradient_norm(param_groups)

Get global gradient norm.

Source code in pytorch_optimizer/optimizer/utils.py
337
338
339
340
341
342
343
344
345
346
347
@torch.no_grad()
def get_global_gradient_norm(param_groups: List[Dict]) -> torch.Tensor:
    """Get global gradient norm."""
    global_grad_norm = torch.zeros(1, dtype=torch.float32, device=param_groups[0]['params'][0].device)

    for group in param_groups:
        for p in group['params']:
            if p.grad is not None:
                global_grad_norm.add_(p.grad.norm().pow(2))

    return global_grad_norm

reg_noise(network1, network2, num_data, lr, eta=0.008, temperature=0.0001)

Entropy-MCMC: Sampling from flat basins with ease.

Usage example and detailed implementation can be found at: https://github.com/lblaoke/EMCMC/blob/master/exp/cifar10_emcmc.py

Parameters:

Name Type Description Default
network1 Module

First neural network.

required
network2 Module

Second neural network.

required
num_data int

Number of training data points.

required
lr float

Learning rate.

required
eta float

Eta parameter controlling auxiliary guiding variable.

0.008
temperature float

Temperature parameter for sampling.

0.0001
Source code in pytorch_optimizer/optimizer/utils.py
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
@torch.no_grad()
def reg_noise(
    network1: nn.Module, network2: nn.Module, num_data: int, lr: float, eta: float = 8e-3, temperature: float = 1e-4
) -> Union[torch.Tensor, float]:
    """Entropy-MCMC: Sampling from flat basins with ease.

    Usage example and detailed implementation can be found at:
    https://github.com/lblaoke/EMCMC/blob/master/exp/cifar10_emcmc.py

    Args:
        network1 (nn.Module): First neural network.
        network2 (nn.Module): Second neural network.
        num_data (int): Number of training data points.
        lr (float): Learning rate.
        eta (float): Eta parameter controlling auxiliary guiding variable.
        temperature (float): Temperature parameter for sampling.

    """
    reg_coef: float = 0.5 / (eta * num_data)
    noise_coef: float = math.sqrt(2.0 / lr / num_data * temperature)

    loss = torch.tensor(0.0, device=next(network1.parameters()).device)

    for param1, param2 in zip(network1.parameters(), network2.parameters()):
        reg = (param1 - param2).pow_(2).mul_(reg_coef).sum()

        noise = param1 * torch.randn_like(param1)
        noise.add_(param2 * torch.randn_like(param2))

        loss.add_(reg - noise.mul_(noise_coef).sum())

    return loss

copy_stochastic(target, source)

Copy stochastic.

reference: https://github.com/pytorch/pytorch/issues/120376#issuecomment-1974828905

Parameters:

Name Type Description Default
target Tensor

A tensor in bfloat16 format to copy to.

required
source Tensor

A tensor in float32 format to copy from.

required
Source code in pytorch_optimizer/optimizer/utils.py
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
@torch.no_grad()
def copy_stochastic(target: torch.Tensor, source: torch.Tensor) -> None:
    r"""Copy stochastic.

    reference: https://github.com/pytorch/pytorch/issues/120376#issuecomment-1974828905

    Args:
        target (torch.Tensor): A tensor in bfloat16 format to copy to.
        source (torch.Tensor): A tensor in float32 format to copy from.

    """
    result = torch.randint_like(
        source,
        dtype=torch.int32,
        low=0,
        high=1 << 16,
    )

    result.add_(source.view(dtype=torch.int32))

    result.bitwise_and_(-65536)

    target.copy_(result.view(dtype=torch.float32))