clarena.cl_algorithms.independent
The submodule in cl_algorithms for Independent learning algorithm.
1r""" 2The submodule in `cl_algorithms` for Independent learning algorithm. 3""" 4 5__all__ = ["Independent"] 6 7import logging 8from copy import deepcopy 9from typing import Any 10 11from torch import Tensor 12from torch.utils.data import DataLoader 13 14from clarena.backbones import CLBackbone 15from clarena.cl_algorithms import Finetuning, UnlearnableCLAlgorithm 16from clarena.heads import HeadsCIL, HeadsTIL 17 18# always get logger for built-in logging in each module 19pylogger = logging.getLogger(__name__) 20 21 22class Independent(Finetuning): 23 r"""Independent learning algorithm. 24 25 Another naive way for task-incremental learning aside from Finetuning. It assigns a new independent model for each task. This is a simple way to avoid catastrophic forgetting at the extreme cost of memory. It achieves the theoretical upper bound of performance in continual learning. 26 27 We implement Independent as a subclass of Finetuning algorithm, as Independent has the same `forward()`, `training_step()`, `validation_step()` and `test_step()` method as `Finetuning` class. 28 """ 29 30 def __init__( 31 self, 32 backbone: CLBackbone, 33 heads: HeadsTIL | HeadsCIL, 34 non_algorithmic_hparams: dict[str, Any] = {}, 35 ) -> None: 36 r"""Initialize the Independent algorithm with the network. It has no additional hyperparameters. 37 38 **Args:** 39 - **backbone** (`CLBackbone`): backbone network. 40 - **heads** (`HeadsTIL` | `HeadsCIL`): output heads. 41 - **non_algorithmic_hparams** (`dict[str, Any]`): non-algorithmic hyperparameters that are not related to the algorithm itself are passed to this `LightningModule` object from the config, such as optimizer and learning rate scheduler configurations. They are saved for Lightning APIs from `save_hyperparameters()` method. This is useful for the experiment configuration and reproducibility. 42 43 """ 44 super().__init__( 45 backbone=backbone, 46 heads=heads, 47 non_algorithmic_hparams=non_algorithmic_hparams, 48 ) 49 50 self.original_backbone: dict = deepcopy(backbone) 51 r"""The original backbone network state dict is stored as the source of creating new independent backbone. """ 52 53 self.backbones: dict[int, CLBackbone] = {} 54 r"""The list of independent backbones for each task. Keys are task IDs and values are the corresponding backbones. """ 55 56 self.backbone_valid_task_ids: set[int] = set() 57 r"""The list of task IDs that have valid backbones.""" 58 59 def on_train_start(self) -> None: 60 r"""Initialize an independent backbone for `self.task_id`, duplicated from the original backbone.""" 61 self.backbone = deepcopy(self.original_backbone) # must deepcopy the backbone completely! Do not use load_state_dict 62 63 def on_train_end(self) -> None: 64 r"""The trained independent backbone for `self.task_id`.""" 65 self.backbones[self.task_id] = deepcopy(self.backbone) 66 print("XXXXXXXX", self.backbone_valid_task_ids) 67 self.backbone_valid_task_ids.add(self.task_id) 68 print("YYYY", self.backbone_valid_task_ids) 69 70 def test_step( 71 self, batch: DataLoader, batch_idx: int, dataloader_idx: int = 0 72 ) -> dict[str, Tensor]: 73 r"""Test step for current task `self.task_id`, which tests for all seen tasks indexed by `dataloader_idx`. 74 75 **Args:** 76 - **batch** (`Any`): a batch of test data. 77 - **dataloader_idx** (`int`): the task ID of seen tasks to be tested. A default value of 0 is given otherwise the LightningModule will raise a `RuntimeError`. 78 79 **Returns:** 80 - **outputs** (`dict[str, Tensor]`): a dictionary contains loss and other metrics from this test step. Keys (`str`) are the metrics names, and values (`Tensor`) are the metrics. 81 """ 82 test_task_id = self.get_test_task_id_from_dataloader_idx(dataloader_idx) 83 84 x, y = batch 85 backbone = self.backbones[ 86 test_task_id 87 ] # use the corresponding independenet backbone for the test task 88 feature, _ = backbone(x, stage="test", task_id=test_task_id) 89 logits = self.heads(feature, test_task_id) 90 # use the corresponding head to test (instead of the current task `self.task_id`) 91 loss_cls = self.criterion(logits, y) 92 acc = (logits.argmax(dim=1) == y).float().mean() 93 94 # Return metrics for lightning loggers callback to handle at `on_test_batch_end()` 95 return { 96 "loss_cls": loss_cls, 97 "acc": acc, 98 } 99 100 101class UnlearnableIndependent(UnlearnableCLAlgorithm, Independent): 102 r"""Unlearnable Independent learning algorithm. 103 104 This is a variant of Independent that supports unlearning. It has the same functionality as Independent, but it also supports unlearning requests and permanent tasks. 105 """ 106 107 def __init__( 108 self, 109 backbone: CLBackbone, 110 heads: HeadsTIL | HeadsCIL, 111 non_algorithmic_hparams: dict[str, Any] = {}, 112 ) -> None: 113 r"""Initialize the Independent algorithm with the network. It has no additional hyperparameters. 114 115 **Args:** 116 - **backbone** (`CLBackbone`): backbone network. 117 - **heads** (`HeadsTIL` | `HeadsCIL`): output heads. 118 """ 119 super().__init__( 120 backbone=backbone, 121 heads=heads, 122 non_algorithmic_hparams=non_algorithmic_hparams, 123 ) 124 125 def aggregated_backbone_output(self, input: Tensor) -> Tensor: 126 r"""Get the aggregated backbone output for the input data. All parts of backbones should be aggregated together. 127 128 This output feature is used for measuring unlearning metrics, such as Distribution Distance (DD). An aggregated output involving every part of the backbone is needed to ensure the fairness of the metric. 129 130 **Args:** 131 - **input** (`Tensor`): the input tensor from data. 132 133 **Returns:** 134 - **output** (`Tensor`): the aggregated backbone output tensor. 135 """ 136 feature = 0 137 138 print("agg", self.backbone_valid_task_ids) 139 140 for t in self.backbone_valid_task_ids: 141 feature_t = self.backbones[t](input, stage="test", task_id=t)[0] 142 feature += feature_t 143 feature = feature / len(self.backbone_valid_task_ids) 144 145 return feature
23class Independent(Finetuning): 24 r"""Independent learning algorithm. 25 26 Another naive way for task-incremental learning aside from Finetuning. It assigns a new independent model for each task. This is a simple way to avoid catastrophic forgetting at the extreme cost of memory. It achieves the theoretical upper bound of performance in continual learning. 27 28 We implement Independent as a subclass of Finetuning algorithm, as Independent has the same `forward()`, `training_step()`, `validation_step()` and `test_step()` method as `Finetuning` class. 29 """ 30 31 def __init__( 32 self, 33 backbone: CLBackbone, 34 heads: HeadsTIL | HeadsCIL, 35 non_algorithmic_hparams: dict[str, Any] = {}, 36 ) -> None: 37 r"""Initialize the Independent algorithm with the network. It has no additional hyperparameters. 38 39 **Args:** 40 - **backbone** (`CLBackbone`): backbone network. 41 - **heads** (`HeadsTIL` | `HeadsCIL`): output heads. 42 - **non_algorithmic_hparams** (`dict[str, Any]`): non-algorithmic hyperparameters that are not related to the algorithm itself are passed to this `LightningModule` object from the config, such as optimizer and learning rate scheduler configurations. They are saved for Lightning APIs from `save_hyperparameters()` method. This is useful for the experiment configuration and reproducibility. 43 44 """ 45 super().__init__( 46 backbone=backbone, 47 heads=heads, 48 non_algorithmic_hparams=non_algorithmic_hparams, 49 ) 50 51 self.original_backbone: dict = deepcopy(backbone) 52 r"""The original backbone network state dict is stored as the source of creating new independent backbone. """ 53 54 self.backbones: dict[int, CLBackbone] = {} 55 r"""The list of independent backbones for each task. Keys are task IDs and values are the corresponding backbones. """ 56 57 self.backbone_valid_task_ids: set[int] = set() 58 r"""The list of task IDs that have valid backbones.""" 59 60 def on_train_start(self) -> None: 61 r"""Initialize an independent backbone for `self.task_id`, duplicated from the original backbone.""" 62 self.backbone = deepcopy(self.original_backbone) # must deepcopy the backbone completely! Do not use load_state_dict 63 64 def on_train_end(self) -> None: 65 r"""The trained independent backbone for `self.task_id`.""" 66 self.backbones[self.task_id] = deepcopy(self.backbone) 67 print("XXXXXXXX", self.backbone_valid_task_ids) 68 self.backbone_valid_task_ids.add(self.task_id) 69 print("YYYY", self.backbone_valid_task_ids) 70 71 def test_step( 72 self, batch: DataLoader, batch_idx: int, dataloader_idx: int = 0 73 ) -> dict[str, Tensor]: 74 r"""Test step for current task `self.task_id`, which tests for all seen tasks indexed by `dataloader_idx`. 75 76 **Args:** 77 - **batch** (`Any`): a batch of test data. 78 - **dataloader_idx** (`int`): the task ID of seen tasks to be tested. A default value of 0 is given otherwise the LightningModule will raise a `RuntimeError`. 79 80 **Returns:** 81 - **outputs** (`dict[str, Tensor]`): a dictionary contains loss and other metrics from this test step. Keys (`str`) are the metrics names, and values (`Tensor`) are the metrics. 82 """ 83 test_task_id = self.get_test_task_id_from_dataloader_idx(dataloader_idx) 84 85 x, y = batch 86 backbone = self.backbones[ 87 test_task_id 88 ] # use the corresponding independenet backbone for the test task 89 feature, _ = backbone(x, stage="test", task_id=test_task_id) 90 logits = self.heads(feature, test_task_id) 91 # use the corresponding head to test (instead of the current task `self.task_id`) 92 loss_cls = self.criterion(logits, y) 93 acc = (logits.argmax(dim=1) == y).float().mean() 94 95 # Return metrics for lightning loggers callback to handle at `on_test_batch_end()` 96 return { 97 "loss_cls": loss_cls, 98 "acc": acc, 99 }
Independent learning algorithm.
Another naive way for task-incremental learning aside from Finetuning. It assigns a new independent model for each task. This is a simple way to avoid catastrophic forgetting at the extreme cost of memory. It achieves the theoretical upper bound of performance in continual learning.
We implement Independent as a subclass of Finetuning algorithm, as Independent has the same forward(), training_step(), validation_step() and test_step() method as Finetuning class.
31 def __init__( 32 self, 33 backbone: CLBackbone, 34 heads: HeadsTIL | HeadsCIL, 35 non_algorithmic_hparams: dict[str, Any] = {}, 36 ) -> None: 37 r"""Initialize the Independent algorithm with the network. It has no additional hyperparameters. 38 39 **Args:** 40 - **backbone** (`CLBackbone`): backbone network. 41 - **heads** (`HeadsTIL` | `HeadsCIL`): output heads. 42 - **non_algorithmic_hparams** (`dict[str, Any]`): non-algorithmic hyperparameters that are not related to the algorithm itself are passed to this `LightningModule` object from the config, such as optimizer and learning rate scheduler configurations. They are saved for Lightning APIs from `save_hyperparameters()` method. This is useful for the experiment configuration and reproducibility. 43 44 """ 45 super().__init__( 46 backbone=backbone, 47 heads=heads, 48 non_algorithmic_hparams=non_algorithmic_hparams, 49 ) 50 51 self.original_backbone: dict = deepcopy(backbone) 52 r"""The original backbone network state dict is stored as the source of creating new independent backbone. """ 53 54 self.backbones: dict[int, CLBackbone] = {} 55 r"""The list of independent backbones for each task. Keys are task IDs and values are the corresponding backbones. """ 56 57 self.backbone_valid_task_ids: set[int] = set() 58 r"""The list of task IDs that have valid backbones."""
Initialize the Independent algorithm with the network. It has no additional hyperparameters.
Args:
- backbone (
CLBackbone): backbone network. - heads (
HeadsTIL|HeadsCIL): output heads. - non_algorithmic_hparams (
dict[str, Any]): non-algorithmic hyperparameters that are not related to the algorithm itself are passed to thisLightningModuleobject from the config, such as optimizer and learning rate scheduler configurations. They are saved for Lightning APIs fromsave_hyperparameters()method. This is useful for the experiment configuration and reproducibility.
The original backbone network state dict is stored as the source of creating new independent backbone.
The list of independent backbones for each task. Keys are task IDs and values are the corresponding backbones.
60 def on_train_start(self) -> None: 61 r"""Initialize an independent backbone for `self.task_id`, duplicated from the original backbone.""" 62 self.backbone = deepcopy(self.original_backbone) # must deepcopy the backbone completely! Do not use load_state_dict
Initialize an independent backbone for self.task_id, duplicated from the original backbone.
64 def on_train_end(self) -> None: 65 r"""The trained independent backbone for `self.task_id`.""" 66 self.backbones[self.task_id] = deepcopy(self.backbone) 67 print("XXXXXXXX", self.backbone_valid_task_ids) 68 self.backbone_valid_task_ids.add(self.task_id) 69 print("YYYY", self.backbone_valid_task_ids)
The trained independent backbone for self.task_id.
71 def test_step( 72 self, batch: DataLoader, batch_idx: int, dataloader_idx: int = 0 73 ) -> dict[str, Tensor]: 74 r"""Test step for current task `self.task_id`, which tests for all seen tasks indexed by `dataloader_idx`. 75 76 **Args:** 77 - **batch** (`Any`): a batch of test data. 78 - **dataloader_idx** (`int`): the task ID of seen tasks to be tested. A default value of 0 is given otherwise the LightningModule will raise a `RuntimeError`. 79 80 **Returns:** 81 - **outputs** (`dict[str, Tensor]`): a dictionary contains loss and other metrics from this test step. Keys (`str`) are the metrics names, and values (`Tensor`) are the metrics. 82 """ 83 test_task_id = self.get_test_task_id_from_dataloader_idx(dataloader_idx) 84 85 x, y = batch 86 backbone = self.backbones[ 87 test_task_id 88 ] # use the corresponding independenet backbone for the test task 89 feature, _ = backbone(x, stage="test", task_id=test_task_id) 90 logits = self.heads(feature, test_task_id) 91 # use the corresponding head to test (instead of the current task `self.task_id`) 92 loss_cls = self.criterion(logits, y) 93 acc = (logits.argmax(dim=1) == y).float().mean() 94 95 # Return metrics for lightning loggers callback to handle at `on_test_batch_end()` 96 return { 97 "loss_cls": loss_cls, 98 "acc": acc, 99 }
Test step for current task self.task_id, which tests for all seen tasks indexed by dataloader_idx.
Args:
- batch (
Any): a batch of test data. - dataloader_idx (
int): the task ID of seen tasks to be tested. A default value of 0 is given otherwise the LightningModule will raise aRuntimeError.
Returns:
- outputs (
dict[str, Tensor]): a dictionary contains loss and other metrics from this test step. Keys (str) are the metrics names, and values (Tensor) are the metrics.