clarena.metrics.cul_dd
The submodule in callbacks
for CULDistributionDistance
.
1r""" 2The submodule in `callbacks` for `CULDistributionDistance`. 3""" 4 5__all__ = ["CULDistributionDistance"] 6 7 8import csv 9import logging 10import os 11from typing import Any 12 13import pandas as pd 14import torch 15from lightning import Trainer 16from lightning.pytorch.utilities import rank_zero_only 17from matplotlib import pyplot as plt 18from torchmetrics import MeanMetric 19 20from clarena.metrics import MetricCallback 21from clarena.utils.eval import CULEvaluation 22from clarena.utils.metrics import MeanMetricBatch 23 24# always get logger for built-in logging in each module 25pylogger = logging.getLogger(__name__) 26 27 28class CULDistributionDistance(MetricCallback): 29 r"""Provides all actions that are related to CUL distribution distance (DD) metric, which include: 30 31 - Defining, initializing and recording DD metric. 32 - Saving DD metric to files. 33 - Visualizing DD metric as plots. 34 35 The callback is able to produce the following outputs: 36 37 - CSV files for DD in each task. 38 - Coloured plot for DD in each task. 39 40 Note that this callback is designed to be used with the `CULEvaluation` module, which is a special evaluation module for continual unlearning. It is not a typical test step in the algorithm, but rather a test protocol that evaluates the performance of the model on unlearned tasks. 41 42 """ 43 44 def __init__( 45 self, 46 save_dir: str, 47 distribution_distance_type: str, 48 distribution_distance_csv_name: str = "dd.csv", 49 distribution_distance_plot_name: str | None = None, 50 ) -> None: 51 r""" 52 **Args:** 53 - **save_dir** (`str`): The directory where data and figures of metrics will be saved. Better inside the output folder. 54 - **distribution_distance_type** (`str`): the type of distribution distance to use; one of: 55 - 'euclidean': Eulidean distance. 56 - 'cosine': Cosine distance. 57 - 'manhattan': Manhattan distance. 58 - **distribution_distance_csv_name** (`str`): file name to save test distribution distance metrics as CSV file. 59 - **distribution_distance_plot_name** (`str` | `None`): file name to save test distribution distance metrics as plot. If `None`, no plot will be saved. 60 61 """ 62 super().__init__(save_dir=save_dir) 63 64 self.distribution_distance_type: str = distribution_distance_type 65 r"""The type of distribution distance to use. """ 66 67 # paths 68 self.distribution_distance_csv_path: str = os.path.join( 69 self.save_dir, distribution_distance_csv_name 70 ) 71 r"""The path to save the test distribution distance metrics CSV file.""" 72 if distribution_distance_plot_name: 73 self.distribution_distance_plot_path: str = os.path.join( 74 self.save_dir, distribution_distance_plot_name 75 ) 76 r"""The path to save the test distribution distance metrics plot file.""" 77 78 # test accumulated metrics 79 self.distribution_distance: dict[int, MeanMetricBatch] 80 r"""Distribution distance unlearning metrics for each seen task. Accumulated and calculated from the test batches. Keys are task IDs and values are the corresponding metrics.""" 81 82 # task ID control 83 self.task_id: int 84 r"""Task ID counter indicating which task is being processed. Self updated during the task loop. Valid from 1 to `cl_dataset.num_tasks`.""" 85 86 @rank_zero_only 87 def on_test_start( 88 self, 89 trainer: Trainer, 90 pl_module: CULEvaluation, 91 ) -> None: 92 r"""Initialize the metrics for testing each seen task in the beginning of a task's testing.""" 93 94 # get the device to put the metrics on the same device 95 device = pl_module.device 96 97 # initialize test metrics for evaluation tasks 98 self.distribution_distance = { 99 task_id: MeanMetricBatch().to(device) 100 for task_id in pl_module.dd_eval_task_ids 101 } 102 103 @rank_zero_only 104 def on_test_batch_end( 105 self, 106 trainer: Trainer, 107 pl_module: CULEvaluation, 108 outputs: dict[str, Any], 109 batch: Any, 110 batch_idx: int, 111 dataloader_idx: int = 0, 112 ) -> None: 113 r"""Accumulating metrics from test batch. We don't need to log and monitor the metrics of test batches. 114 115 **Args:** 116 - **outputs** (`dict[str, Any]`): the outputs of the test step, which is the returns of the `test_step()` method in the `CULEvaluation`. 117 - **batch** (`Any`): the validation data batch. 118 - **dataloader_idx** (`int`): the task ID of seen tasks to be tested. A default value of 0 is given otherwise the LightningModule will raise a `RuntimeError`. 119 """ 120 121 # get the batch size 122 batch_size = len(batch) 123 124 test_task_id = pl_module.get_test_task_id_from_dataloader_idx(dataloader_idx) 125 126 # get the raw outputs from the outputs dictionary 127 agg_out_main = outputs["agg_out_main"] # aggregated outputs from the main model 128 agg_out_ref = outputs[ 129 "agg_out_ref" 130 ] # aggregated outputs from the reference model 131 132 if agg_out_main.dim() != 2: 133 raise ValueError( 134 f"Expected aggregated outputs to be (batch_size, flattened feature), i.e. 2 dimension, but got {agg_out_main.dim()}." 135 ) 136 137 if agg_out_ref.dim() != 2: 138 raise ValueError( 139 f"Expected aggregated outputs to be (batch_size, flattened feature), i.e. 2 dimension, but got {agg_out_ref.dim()}." 140 ) 141 142 # calculate the distribution distance between the main and reference model outputs 143 if self.distribution_distance_type == "euclidean": 144 distance = torch.norm( 145 agg_out_main - agg_out_ref, p=2, dim=-1 146 ).mean() # Euclidean distance 147 148 elif self.distribution_distance_type == "cosine": 149 distance = ( 150 torch.nn.functional.cosine_similarity(agg_out_main, agg_out_ref, dim=-1) 151 ).mean() # cosine distance 152 elif self.distribution_distance_type == "manhattan": 153 distance = torch.norm( 154 agg_out_main - agg_out_ref, p=1, dim=-1 155 ).mean() # Manhattan distance 156 elif self.distribution_distance_type == "cka": 157 distance = 1 - linear_CKA(agg_out_main, agg_out_ref) 158 else: 159 distance = None 160 161 # update the accumulated metrics in order to calculate the metrics of the epoch 162 self.distribution_distance[test_task_id].update(distance, batch_size) 163 164 @rank_zero_only 165 def on_test_epoch_end( 166 self, 167 trainer: Trainer, 168 pl_module: CULEvaluation, 169 ) -> None: 170 r"""Save and plot test metrics at the end of test.""" 171 172 self.update_distribution_distance_to_csv( 173 distance_metric=self.distribution_distance, 174 csv_path=self.distribution_distance_csv_path, 175 ) 176 177 if hasattr(self, "distribution_distance_plot_path"): 178 self.plot_distribution_distance_from_csv( 179 csv_path=self.distribution_distance_csv_path, 180 plot_path=self.distribution_distance_plot_path, 181 ) 182 183 def update_distribution_distance_to_csv( 184 self, 185 distance_metric: dict[int, MeanMetricBatch], 186 csv_path: str, 187 ) -> None: 188 r"""Update the unlearning distribution distance metrics of unlearning tasks to CSV file. 189 190 **Args:** 191 - **distance_metric** (`dict[int, MeanMetricBatch]`): the distribution distance metric of unlearned tasks. Accumulated and calculated from the unlearning test batches. 192 - **csv_path** (`str`): save the test metric to path. E.g. './outputs/expr_name/1970-01-01_00-00-00/results/unlearning_test_after_task_X/distance.csv'. 193 """ 194 195 eval_task_ids = list(distance_metric.keys()) 196 fieldnames = ["average_distribution_distance"] + [ 197 f"unlearning_test_on_task_{task_id}" for task_id in eval_task_ids 198 ] 199 200 new_line = {} 201 202 # write to the columns and calculate the average distribution distance over tasks at the same time 203 average_distribution_distance_over_unlearned_tasks = MeanMetric().to( 204 next(iter(distance_metric.values())).device 205 ) 206 for task_id in eval_task_ids: 207 loss_cls = distance_metric[task_id].compute().item() 208 new_line[f"unlearning_test_on_task_{task_id}"] = loss_cls 209 average_distribution_distance_over_unlearned_tasks(loss_cls) 210 new_line["average_distribution_distance"] = ( 211 average_distribution_distance_over_unlearned_tasks.compute().item() 212 ) 213 214 # write to the csv file 215 is_first = not os.path.exists(csv_path) 216 if not is_first: 217 with open(csv_path, "r", encoding="utf-8") as file: 218 lines = file.readlines() 219 del lines[0] 220 # write header 221 with open(csv_path, "w", encoding="utf-8") as file: 222 writer = csv.DictWriter(file, fieldnames=fieldnames) 223 writer.writeheader() 224 # write metrics 225 with open(csv_path, "a", encoding="utf-8") as file: 226 if not is_first: 227 file.writelines(lines) # write the previous lines 228 writer = csv.DictWriter(file, fieldnames=fieldnames) 229 writer.writerow(new_line) 230 231 def plot_distribution_distance_from_csv( 232 self, csv_path: str, plot_path: str 233 ) -> None: 234 """Plot the unlearning test distance matrix over different unlearned tasks from saved CSV file and save the plot to the designated directory. 235 236 **Args:** 237 - **csv_path** (`str`): the path to the CSV file where the `utils.save_distribution_distance_to_csv()` saved the unlearning test distance metric. 238 - **plot_path** (`str`): the path to save plot. Better same as the output directory of the experiment. E.g. './outputs/expr_name/1970-01-01_00-00-00/results/unlearning_test_after_task_X/distance.png'. 239 """ 240 data = pd.read_csv(csv_path) 241 242 unlearned_task_ids = [ 243 int(col.replace("unlearning_test_on_task_", "")) 244 for col in data.columns 245 if col.startswith("unlearning_test_on_task_") 246 ] 247 num_tasks = len(unlearned_task_ids) 248 num_tests = len(data) 249 250 # plot the accuracy matrix 251 fig, ax = plt.subplots( 252 figsize=(2 * (num_tasks + 1), 2 * (num_tests + 1)) 253 ) # adaptive figure size 254 cax = ax.imshow( 255 data.drop(["average_distribution_distance"], axis=1), 256 interpolation="nearest", 257 cmap="Greens", 258 vmin=0, 259 vmax=1, 260 ) 261 262 colorbar = fig.colorbar(cax) 263 yticks = colorbar.ax.get_yticks() 264 colorbar.ax.set_yticks(yticks) 265 colorbar.ax.set_yticklabels( 266 [f"{tick:.2f}" for tick in yticks], fontsize=10 + num_tasks 267 ) # adaptive font size 268 269 r = 0 270 for r in range(num_tests): 271 for c in range(num_tasks): 272 j = unlearned_task_ids[c] 273 s = ( 274 f"{data.loc[r, f'unlearning_test_on_task_{j}']:.3f}" 275 if f"unlearning_test_on_task_{j}" in data.columns 276 else "" 277 ) 278 ax.text( 279 c, 280 r, 281 s, 282 ha="center", 283 va="center", 284 color="black", 285 fontsize=10 + num_tasks, # adaptive font size 286 ) 287 288 ax.set_xticks(range(num_tasks)) 289 ax.set_yticks(range(num_tests)) 290 ax.set_xticklabels( 291 unlearned_task_ids, fontsize=10 + num_tasks 292 ) # adaptive font size 293 ax.set_yticklabels( 294 range(1, num_tests + 1), fontsize=10 + num_tests 295 ) # adaptive font size 296 297 # Labeling the axes 298 ax.set_xlabel( 299 "Testing unlearning on task τ", fontsize=10 + num_tasks 300 ) # adaptive font size 301 ax.set_ylabel( 302 "Unlearning test after training task t", fontsize=10 + num_tasks 303 ) # adaptive font size 304 fig.savefig(plot_path) 305 plt.close(fig) 306 plt.close(fig) 307 308 309def linear_CKA(X, Y): 310 """ 311 计算两个表征矩阵的 CKA 相似度 312 Args: 313 X: [n, d1] torch.Tensor 314 Y: [n, d2] torch.Tensor 315 Returns: 316 cka: float (相似度,范围 [0,1]) 317 """ 318 # 居中 (center) 319 X = X - X.mean(0, keepdim=True) 320 Y = Y - Y.mean(0, keepdim=True) 321 322 # Gram 矩阵内积 323 dot_product = torch.norm(X.T @ Y, p="fro") ** 2 324 normalization = torch.norm(X.T @ X, p="fro") * torch.norm(Y.T @ Y, p="fro") 325 326 return dot_product / normalization
29class CULDistributionDistance(MetricCallback): 30 r"""Provides all actions that are related to CUL distribution distance (DD) metric, which include: 31 32 - Defining, initializing and recording DD metric. 33 - Saving DD metric to files. 34 - Visualizing DD metric as plots. 35 36 The callback is able to produce the following outputs: 37 38 - CSV files for DD in each task. 39 - Coloured plot for DD in each task. 40 41 Note that this callback is designed to be used with the `CULEvaluation` module, which is a special evaluation module for continual unlearning. It is not a typical test step in the algorithm, but rather a test protocol that evaluates the performance of the model on unlearned tasks. 42 43 """ 44 45 def __init__( 46 self, 47 save_dir: str, 48 distribution_distance_type: str, 49 distribution_distance_csv_name: str = "dd.csv", 50 distribution_distance_plot_name: str | None = None, 51 ) -> None: 52 r""" 53 **Args:** 54 - **save_dir** (`str`): The directory where data and figures of metrics will be saved. Better inside the output folder. 55 - **distribution_distance_type** (`str`): the type of distribution distance to use; one of: 56 - 'euclidean': Eulidean distance. 57 - 'cosine': Cosine distance. 58 - 'manhattan': Manhattan distance. 59 - **distribution_distance_csv_name** (`str`): file name to save test distribution distance metrics as CSV file. 60 - **distribution_distance_plot_name** (`str` | `None`): file name to save test distribution distance metrics as plot. If `None`, no plot will be saved. 61 62 """ 63 super().__init__(save_dir=save_dir) 64 65 self.distribution_distance_type: str = distribution_distance_type 66 r"""The type of distribution distance to use. """ 67 68 # paths 69 self.distribution_distance_csv_path: str = os.path.join( 70 self.save_dir, distribution_distance_csv_name 71 ) 72 r"""The path to save the test distribution distance metrics CSV file.""" 73 if distribution_distance_plot_name: 74 self.distribution_distance_plot_path: str = os.path.join( 75 self.save_dir, distribution_distance_plot_name 76 ) 77 r"""The path to save the test distribution distance metrics plot file.""" 78 79 # test accumulated metrics 80 self.distribution_distance: dict[int, MeanMetricBatch] 81 r"""Distribution distance unlearning metrics for each seen task. Accumulated and calculated from the test batches. Keys are task IDs and values are the corresponding metrics.""" 82 83 # task ID control 84 self.task_id: int 85 r"""Task ID counter indicating which task is being processed. Self updated during the task loop. Valid from 1 to `cl_dataset.num_tasks`.""" 86 87 @rank_zero_only 88 def on_test_start( 89 self, 90 trainer: Trainer, 91 pl_module: CULEvaluation, 92 ) -> None: 93 r"""Initialize the metrics for testing each seen task in the beginning of a task's testing.""" 94 95 # get the device to put the metrics on the same device 96 device = pl_module.device 97 98 # initialize test metrics for evaluation tasks 99 self.distribution_distance = { 100 task_id: MeanMetricBatch().to(device) 101 for task_id in pl_module.dd_eval_task_ids 102 } 103 104 @rank_zero_only 105 def on_test_batch_end( 106 self, 107 trainer: Trainer, 108 pl_module: CULEvaluation, 109 outputs: dict[str, Any], 110 batch: Any, 111 batch_idx: int, 112 dataloader_idx: int = 0, 113 ) -> None: 114 r"""Accumulating metrics from test batch. We don't need to log and monitor the metrics of test batches. 115 116 **Args:** 117 - **outputs** (`dict[str, Any]`): the outputs of the test step, which is the returns of the `test_step()` method in the `CULEvaluation`. 118 - **batch** (`Any`): the validation data batch. 119 - **dataloader_idx** (`int`): the task ID of seen tasks to be tested. A default value of 0 is given otherwise the LightningModule will raise a `RuntimeError`. 120 """ 121 122 # get the batch size 123 batch_size = len(batch) 124 125 test_task_id = pl_module.get_test_task_id_from_dataloader_idx(dataloader_idx) 126 127 # get the raw outputs from the outputs dictionary 128 agg_out_main = outputs["agg_out_main"] # aggregated outputs from the main model 129 agg_out_ref = outputs[ 130 "agg_out_ref" 131 ] # aggregated outputs from the reference model 132 133 if agg_out_main.dim() != 2: 134 raise ValueError( 135 f"Expected aggregated outputs to be (batch_size, flattened feature), i.e. 2 dimension, but got {agg_out_main.dim()}." 136 ) 137 138 if agg_out_ref.dim() != 2: 139 raise ValueError( 140 f"Expected aggregated outputs to be (batch_size, flattened feature), i.e. 2 dimension, but got {agg_out_ref.dim()}." 141 ) 142 143 # calculate the distribution distance between the main and reference model outputs 144 if self.distribution_distance_type == "euclidean": 145 distance = torch.norm( 146 agg_out_main - agg_out_ref, p=2, dim=-1 147 ).mean() # Euclidean distance 148 149 elif self.distribution_distance_type == "cosine": 150 distance = ( 151 torch.nn.functional.cosine_similarity(agg_out_main, agg_out_ref, dim=-1) 152 ).mean() # cosine distance 153 elif self.distribution_distance_type == "manhattan": 154 distance = torch.norm( 155 agg_out_main - agg_out_ref, p=1, dim=-1 156 ).mean() # Manhattan distance 157 elif self.distribution_distance_type == "cka": 158 distance = 1 - linear_CKA(agg_out_main, agg_out_ref) 159 else: 160 distance = None 161 162 # update the accumulated metrics in order to calculate the metrics of the epoch 163 self.distribution_distance[test_task_id].update(distance, batch_size) 164 165 @rank_zero_only 166 def on_test_epoch_end( 167 self, 168 trainer: Trainer, 169 pl_module: CULEvaluation, 170 ) -> None: 171 r"""Save and plot test metrics at the end of test.""" 172 173 self.update_distribution_distance_to_csv( 174 distance_metric=self.distribution_distance, 175 csv_path=self.distribution_distance_csv_path, 176 ) 177 178 if hasattr(self, "distribution_distance_plot_path"): 179 self.plot_distribution_distance_from_csv( 180 csv_path=self.distribution_distance_csv_path, 181 plot_path=self.distribution_distance_plot_path, 182 ) 183 184 def update_distribution_distance_to_csv( 185 self, 186 distance_metric: dict[int, MeanMetricBatch], 187 csv_path: str, 188 ) -> None: 189 r"""Update the unlearning distribution distance metrics of unlearning tasks to CSV file. 190 191 **Args:** 192 - **distance_metric** (`dict[int, MeanMetricBatch]`): the distribution distance metric of unlearned tasks. Accumulated and calculated from the unlearning test batches. 193 - **csv_path** (`str`): save the test metric to path. E.g. './outputs/expr_name/1970-01-01_00-00-00/results/unlearning_test_after_task_X/distance.csv'. 194 """ 195 196 eval_task_ids = list(distance_metric.keys()) 197 fieldnames = ["average_distribution_distance"] + [ 198 f"unlearning_test_on_task_{task_id}" for task_id in eval_task_ids 199 ] 200 201 new_line = {} 202 203 # write to the columns and calculate the average distribution distance over tasks at the same time 204 average_distribution_distance_over_unlearned_tasks = MeanMetric().to( 205 next(iter(distance_metric.values())).device 206 ) 207 for task_id in eval_task_ids: 208 loss_cls = distance_metric[task_id].compute().item() 209 new_line[f"unlearning_test_on_task_{task_id}"] = loss_cls 210 average_distribution_distance_over_unlearned_tasks(loss_cls) 211 new_line["average_distribution_distance"] = ( 212 average_distribution_distance_over_unlearned_tasks.compute().item() 213 ) 214 215 # write to the csv file 216 is_first = not os.path.exists(csv_path) 217 if not is_first: 218 with open(csv_path, "r", encoding="utf-8") as file: 219 lines = file.readlines() 220 del lines[0] 221 # write header 222 with open(csv_path, "w", encoding="utf-8") as file: 223 writer = csv.DictWriter(file, fieldnames=fieldnames) 224 writer.writeheader() 225 # write metrics 226 with open(csv_path, "a", encoding="utf-8") as file: 227 if not is_first: 228 file.writelines(lines) # write the previous lines 229 writer = csv.DictWriter(file, fieldnames=fieldnames) 230 writer.writerow(new_line) 231 232 def plot_distribution_distance_from_csv( 233 self, csv_path: str, plot_path: str 234 ) -> None: 235 """Plot the unlearning test distance matrix over different unlearned tasks from saved CSV file and save the plot to the designated directory. 236 237 **Args:** 238 - **csv_path** (`str`): the path to the CSV file where the `utils.save_distribution_distance_to_csv()` saved the unlearning test distance metric. 239 - **plot_path** (`str`): the path to save plot. Better same as the output directory of the experiment. E.g. './outputs/expr_name/1970-01-01_00-00-00/results/unlearning_test_after_task_X/distance.png'. 240 """ 241 data = pd.read_csv(csv_path) 242 243 unlearned_task_ids = [ 244 int(col.replace("unlearning_test_on_task_", "")) 245 for col in data.columns 246 if col.startswith("unlearning_test_on_task_") 247 ] 248 num_tasks = len(unlearned_task_ids) 249 num_tests = len(data) 250 251 # plot the accuracy matrix 252 fig, ax = plt.subplots( 253 figsize=(2 * (num_tasks + 1), 2 * (num_tests + 1)) 254 ) # adaptive figure size 255 cax = ax.imshow( 256 data.drop(["average_distribution_distance"], axis=1), 257 interpolation="nearest", 258 cmap="Greens", 259 vmin=0, 260 vmax=1, 261 ) 262 263 colorbar = fig.colorbar(cax) 264 yticks = colorbar.ax.get_yticks() 265 colorbar.ax.set_yticks(yticks) 266 colorbar.ax.set_yticklabels( 267 [f"{tick:.2f}" for tick in yticks], fontsize=10 + num_tasks 268 ) # adaptive font size 269 270 r = 0 271 for r in range(num_tests): 272 for c in range(num_tasks): 273 j = unlearned_task_ids[c] 274 s = ( 275 f"{data.loc[r, f'unlearning_test_on_task_{j}']:.3f}" 276 if f"unlearning_test_on_task_{j}" in data.columns 277 else "" 278 ) 279 ax.text( 280 c, 281 r, 282 s, 283 ha="center", 284 va="center", 285 color="black", 286 fontsize=10 + num_tasks, # adaptive font size 287 ) 288 289 ax.set_xticks(range(num_tasks)) 290 ax.set_yticks(range(num_tests)) 291 ax.set_xticklabels( 292 unlearned_task_ids, fontsize=10 + num_tasks 293 ) # adaptive font size 294 ax.set_yticklabels( 295 range(1, num_tests + 1), fontsize=10 + num_tests 296 ) # adaptive font size 297 298 # Labeling the axes 299 ax.set_xlabel( 300 "Testing unlearning on task τ", fontsize=10 + num_tasks 301 ) # adaptive font size 302 ax.set_ylabel( 303 "Unlearning test after training task t", fontsize=10 + num_tasks 304 ) # adaptive font size 305 fig.savefig(plot_path) 306 plt.close(fig) 307 plt.close(fig)
Provides all actions that are related to CUL distribution distance (DD) metric, which include:
- Defining, initializing and recording DD metric.
- Saving DD metric to files.
- Visualizing DD metric as plots.
The callback is able to produce the following outputs:
- CSV files for DD in each task.
- Coloured plot for DD in each task.
Note that this callback is designed to be used with the CULEvaluation
module, which is a special evaluation module for continual unlearning. It is not a typical test step in the algorithm, but rather a test protocol that evaluates the performance of the model on unlearned tasks.
45 def __init__( 46 self, 47 save_dir: str, 48 distribution_distance_type: str, 49 distribution_distance_csv_name: str = "dd.csv", 50 distribution_distance_plot_name: str | None = None, 51 ) -> None: 52 r""" 53 **Args:** 54 - **save_dir** (`str`): The directory where data and figures of metrics will be saved. Better inside the output folder. 55 - **distribution_distance_type** (`str`): the type of distribution distance to use; one of: 56 - 'euclidean': Eulidean distance. 57 - 'cosine': Cosine distance. 58 - 'manhattan': Manhattan distance. 59 - **distribution_distance_csv_name** (`str`): file name to save test distribution distance metrics as CSV file. 60 - **distribution_distance_plot_name** (`str` | `None`): file name to save test distribution distance metrics as plot. If `None`, no plot will be saved. 61 62 """ 63 super().__init__(save_dir=save_dir) 64 65 self.distribution_distance_type: str = distribution_distance_type 66 r"""The type of distribution distance to use. """ 67 68 # paths 69 self.distribution_distance_csv_path: str = os.path.join( 70 self.save_dir, distribution_distance_csv_name 71 ) 72 r"""The path to save the test distribution distance metrics CSV file.""" 73 if distribution_distance_plot_name: 74 self.distribution_distance_plot_path: str = os.path.join( 75 self.save_dir, distribution_distance_plot_name 76 ) 77 r"""The path to save the test distribution distance metrics plot file.""" 78 79 # test accumulated metrics 80 self.distribution_distance: dict[int, MeanMetricBatch] 81 r"""Distribution distance unlearning metrics for each seen task. Accumulated and calculated from the test batches. Keys are task IDs and values are the corresponding metrics.""" 82 83 # task ID control 84 self.task_id: int 85 r"""Task ID counter indicating which task is being processed. Self updated during the task loop. Valid from 1 to `cl_dataset.num_tasks`."""
Args:
- save_dir (
str
): The directory where data and figures of metrics will be saved. Better inside the output folder. - distribution_distance_type (
str
): the type of distribution distance to use; one of:- 'euclidean': Eulidean distance.
- 'cosine': Cosine distance.
- 'manhattan': Manhattan distance.
- distribution_distance_csv_name (
str
): file name to save test distribution distance metrics as CSV file. - distribution_distance_plot_name (
str
|None
): file name to save test distribution distance metrics as plot. IfNone
, no plot will be saved.
The path to save the test distribution distance metrics CSV file.
Distribution distance unlearning metrics for each seen task. Accumulated and calculated from the test batches. Keys are task IDs and values are the corresponding metrics.
Task ID counter indicating which task is being processed. Self updated during the task loop. Valid from 1 to cl_dataset.num_tasks
.
87 @rank_zero_only 88 def on_test_start( 89 self, 90 trainer: Trainer, 91 pl_module: CULEvaluation, 92 ) -> None: 93 r"""Initialize the metrics for testing each seen task in the beginning of a task's testing.""" 94 95 # get the device to put the metrics on the same device 96 device = pl_module.device 97 98 # initialize test metrics for evaluation tasks 99 self.distribution_distance = { 100 task_id: MeanMetricBatch().to(device) 101 for task_id in pl_module.dd_eval_task_ids 102 }
Initialize the metrics for testing each seen task in the beginning of a task's testing.
104 @rank_zero_only 105 def on_test_batch_end( 106 self, 107 trainer: Trainer, 108 pl_module: CULEvaluation, 109 outputs: dict[str, Any], 110 batch: Any, 111 batch_idx: int, 112 dataloader_idx: int = 0, 113 ) -> None: 114 r"""Accumulating metrics from test batch. We don't need to log and monitor the metrics of test batches. 115 116 **Args:** 117 - **outputs** (`dict[str, Any]`): the outputs of the test step, which is the returns of the `test_step()` method in the `CULEvaluation`. 118 - **batch** (`Any`): the validation data batch. 119 - **dataloader_idx** (`int`): the task ID of seen tasks to be tested. A default value of 0 is given otherwise the LightningModule will raise a `RuntimeError`. 120 """ 121 122 # get the batch size 123 batch_size = len(batch) 124 125 test_task_id = pl_module.get_test_task_id_from_dataloader_idx(dataloader_idx) 126 127 # get the raw outputs from the outputs dictionary 128 agg_out_main = outputs["agg_out_main"] # aggregated outputs from the main model 129 agg_out_ref = outputs[ 130 "agg_out_ref" 131 ] # aggregated outputs from the reference model 132 133 if agg_out_main.dim() != 2: 134 raise ValueError( 135 f"Expected aggregated outputs to be (batch_size, flattened feature), i.e. 2 dimension, but got {agg_out_main.dim()}." 136 ) 137 138 if agg_out_ref.dim() != 2: 139 raise ValueError( 140 f"Expected aggregated outputs to be (batch_size, flattened feature), i.e. 2 dimension, but got {agg_out_ref.dim()}." 141 ) 142 143 # calculate the distribution distance between the main and reference model outputs 144 if self.distribution_distance_type == "euclidean": 145 distance = torch.norm( 146 agg_out_main - agg_out_ref, p=2, dim=-1 147 ).mean() # Euclidean distance 148 149 elif self.distribution_distance_type == "cosine": 150 distance = ( 151 torch.nn.functional.cosine_similarity(agg_out_main, agg_out_ref, dim=-1) 152 ).mean() # cosine distance 153 elif self.distribution_distance_type == "manhattan": 154 distance = torch.norm( 155 agg_out_main - agg_out_ref, p=1, dim=-1 156 ).mean() # Manhattan distance 157 elif self.distribution_distance_type == "cka": 158 distance = 1 - linear_CKA(agg_out_main, agg_out_ref) 159 else: 160 distance = None 161 162 # update the accumulated metrics in order to calculate the metrics of the epoch 163 self.distribution_distance[test_task_id].update(distance, batch_size)
Accumulating metrics from test batch. We don't need to log and monitor the metrics of test batches.
Args:
- outputs (
dict[str, Any]
): the outputs of the test step, which is the returns of thetest_step()
method in theCULEvaluation
. - batch (
Any
): the validation data batch. - dataloader_idx (
int
): the task ID of seen tasks to be tested. A default value of 0 is given otherwise the LightningModule will raise aRuntimeError
.
165 @rank_zero_only 166 def on_test_epoch_end( 167 self, 168 trainer: Trainer, 169 pl_module: CULEvaluation, 170 ) -> None: 171 r"""Save and plot test metrics at the end of test.""" 172 173 self.update_distribution_distance_to_csv( 174 distance_metric=self.distribution_distance, 175 csv_path=self.distribution_distance_csv_path, 176 ) 177 178 if hasattr(self, "distribution_distance_plot_path"): 179 self.plot_distribution_distance_from_csv( 180 csv_path=self.distribution_distance_csv_path, 181 plot_path=self.distribution_distance_plot_path, 182 )
Save and plot test metrics at the end of test.
184 def update_distribution_distance_to_csv( 185 self, 186 distance_metric: dict[int, MeanMetricBatch], 187 csv_path: str, 188 ) -> None: 189 r"""Update the unlearning distribution distance metrics of unlearning tasks to CSV file. 190 191 **Args:** 192 - **distance_metric** (`dict[int, MeanMetricBatch]`): the distribution distance metric of unlearned tasks. Accumulated and calculated from the unlearning test batches. 193 - **csv_path** (`str`): save the test metric to path. E.g. './outputs/expr_name/1970-01-01_00-00-00/results/unlearning_test_after_task_X/distance.csv'. 194 """ 195 196 eval_task_ids = list(distance_metric.keys()) 197 fieldnames = ["average_distribution_distance"] + [ 198 f"unlearning_test_on_task_{task_id}" for task_id in eval_task_ids 199 ] 200 201 new_line = {} 202 203 # write to the columns and calculate the average distribution distance over tasks at the same time 204 average_distribution_distance_over_unlearned_tasks = MeanMetric().to( 205 next(iter(distance_metric.values())).device 206 ) 207 for task_id in eval_task_ids: 208 loss_cls = distance_metric[task_id].compute().item() 209 new_line[f"unlearning_test_on_task_{task_id}"] = loss_cls 210 average_distribution_distance_over_unlearned_tasks(loss_cls) 211 new_line["average_distribution_distance"] = ( 212 average_distribution_distance_over_unlearned_tasks.compute().item() 213 ) 214 215 # write to the csv file 216 is_first = not os.path.exists(csv_path) 217 if not is_first: 218 with open(csv_path, "r", encoding="utf-8") as file: 219 lines = file.readlines() 220 del lines[0] 221 # write header 222 with open(csv_path, "w", encoding="utf-8") as file: 223 writer = csv.DictWriter(file, fieldnames=fieldnames) 224 writer.writeheader() 225 # write metrics 226 with open(csv_path, "a", encoding="utf-8") as file: 227 if not is_first: 228 file.writelines(lines) # write the previous lines 229 writer = csv.DictWriter(file, fieldnames=fieldnames) 230 writer.writerow(new_line)
Update the unlearning distribution distance metrics of unlearning tasks to CSV file.
Args:
- distance_metric (
dict[int, MeanMetricBatch]
): the distribution distance metric of unlearned tasks. Accumulated and calculated from the unlearning test batches. - csv_path (
str
): save the test metric to path. E.g. './outputs/expr_name/1970-01-01_00-00-00/results/unlearning_test_after_task_X/distance.csv'.
232 def plot_distribution_distance_from_csv( 233 self, csv_path: str, plot_path: str 234 ) -> None: 235 """Plot the unlearning test distance matrix over different unlearned tasks from saved CSV file and save the plot to the designated directory. 236 237 **Args:** 238 - **csv_path** (`str`): the path to the CSV file where the `utils.save_distribution_distance_to_csv()` saved the unlearning test distance metric. 239 - **plot_path** (`str`): the path to save plot. Better same as the output directory of the experiment. E.g. './outputs/expr_name/1970-01-01_00-00-00/results/unlearning_test_after_task_X/distance.png'. 240 """ 241 data = pd.read_csv(csv_path) 242 243 unlearned_task_ids = [ 244 int(col.replace("unlearning_test_on_task_", "")) 245 for col in data.columns 246 if col.startswith("unlearning_test_on_task_") 247 ] 248 num_tasks = len(unlearned_task_ids) 249 num_tests = len(data) 250 251 # plot the accuracy matrix 252 fig, ax = plt.subplots( 253 figsize=(2 * (num_tasks + 1), 2 * (num_tests + 1)) 254 ) # adaptive figure size 255 cax = ax.imshow( 256 data.drop(["average_distribution_distance"], axis=1), 257 interpolation="nearest", 258 cmap="Greens", 259 vmin=0, 260 vmax=1, 261 ) 262 263 colorbar = fig.colorbar(cax) 264 yticks = colorbar.ax.get_yticks() 265 colorbar.ax.set_yticks(yticks) 266 colorbar.ax.set_yticklabels( 267 [f"{tick:.2f}" for tick in yticks], fontsize=10 + num_tasks 268 ) # adaptive font size 269 270 r = 0 271 for r in range(num_tests): 272 for c in range(num_tasks): 273 j = unlearned_task_ids[c] 274 s = ( 275 f"{data.loc[r, f'unlearning_test_on_task_{j}']:.3f}" 276 if f"unlearning_test_on_task_{j}" in data.columns 277 else "" 278 ) 279 ax.text( 280 c, 281 r, 282 s, 283 ha="center", 284 va="center", 285 color="black", 286 fontsize=10 + num_tasks, # adaptive font size 287 ) 288 289 ax.set_xticks(range(num_tasks)) 290 ax.set_yticks(range(num_tests)) 291 ax.set_xticklabels( 292 unlearned_task_ids, fontsize=10 + num_tasks 293 ) # adaptive font size 294 ax.set_yticklabels( 295 range(1, num_tests + 1), fontsize=10 + num_tests 296 ) # adaptive font size 297 298 # Labeling the axes 299 ax.set_xlabel( 300 "Testing unlearning on task τ", fontsize=10 + num_tasks 301 ) # adaptive font size 302 ax.set_ylabel( 303 "Unlearning test after training task t", fontsize=10 + num_tasks 304 ) # adaptive font size 305 fig.savefig(plot_path) 306 plt.close(fig) 307 plt.close(fig)
Plot the unlearning test distance matrix over different unlearned tasks from saved CSV file and save the plot to the designated directory.
Args:
- csv_path (
str
): the path to the CSV file where theutils.save_distribution_distance_to_csv()
saved the unlearning test distance metric. - plot_path (
str
): the path to save plot. Better same as the output directory of the experiment. E.g. './outputs/expr_name/1970-01-01_00-00-00/results/unlearning_test_after_task_X/distance.png'.