clarena.metrics.cul_dd

The submodule in callbacks for CULDistributionDistance.

  1r"""
  2The submodule in `callbacks` for `CULDistributionDistance`.
  3"""
  4
  5__all__ = ["CULDistributionDistance"]
  6
  7
  8import csv
  9import logging
 10import os
 11from typing import Any
 12
 13import pandas as pd
 14import torch
 15from lightning import Trainer
 16from lightning.pytorch.utilities import rank_zero_only
 17from matplotlib import pyplot as plt
 18from torchmetrics import MeanMetric
 19
 20from clarena.metrics import MetricCallback
 21from clarena.utils.eval import CULEvaluation
 22from clarena.utils.metrics import MeanMetricBatch
 23
 24# always get logger for built-in logging in each module
 25pylogger = logging.getLogger(__name__)
 26
 27
 28class CULDistributionDistance(MetricCallback):
 29    r"""Provides all actions that are related to CUL distribution distance (DD) metric, which include:
 30
 31    - Defining, initializing and recording DD metric.
 32    - Saving DD metric to files.
 33    - Visualizing DD metric as plots.
 34
 35    The callback is able to produce the following outputs:
 36
 37    - CSV files for DD in each task.
 38    - Coloured plot for DD in each task.
 39
 40    Note that this callback is designed to be used with the `CULEvaluation` module, which is a special evaluation module for continual unlearning. It is not a typical test step in the algorithm, but rather a test protocol that evaluates the performance of the model on unlearned tasks.
 41
 42    """
 43
 44    def __init__(
 45        self,
 46        save_dir: str,
 47        distribution_distance_type: str,
 48        distribution_distance_csv_name: str = "dd.csv",
 49        distribution_distance_plot_name: str | None = None,
 50    ) -> None:
 51        r"""
 52        **Args:**
 53        - **save_dir** (`str`): The directory where data and figures of metrics will be saved. Better inside the output folder.
 54        - **distribution_distance_type** (`str`): the type of distribution distance to use; one of:
 55            - 'euclidean': Eulidean distance.
 56            - 'cosine': Cosine distance.
 57            - 'manhattan': Manhattan distance.
 58        - **distribution_distance_csv_name** (`str`): file name to save test distribution distance metrics as CSV file.
 59        - **distribution_distance_plot_name** (`str` | `None`): file name to save test distribution distance metrics as plot. If `None`, no plot will be saved.
 60
 61        """
 62        super().__init__(save_dir=save_dir)
 63
 64        self.distribution_distance_type: str = distribution_distance_type
 65        r"""The type of distribution distance to use. """
 66
 67        # paths
 68        self.distribution_distance_csv_path: str = os.path.join(
 69            self.save_dir, distribution_distance_csv_name
 70        )
 71        r"""The path to save the test distribution distance metrics CSV file."""
 72        if distribution_distance_plot_name:
 73            self.distribution_distance_plot_path: str = os.path.join(
 74                self.save_dir, distribution_distance_plot_name
 75            )
 76            r"""The path to save the test distribution distance metrics plot file."""
 77
 78        # test accumulated metrics
 79        self.distribution_distance: dict[int, MeanMetricBatch]
 80        r"""Distribution distance unlearning metrics for each seen task. Accumulated and calculated from the test batches. Keys are task IDs and values are the corresponding metrics."""
 81
 82        # task ID control
 83        self.task_id: int
 84        r"""Task ID counter indicating which task is being processed. Self updated during the task loop. Valid from 1 to `cl_dataset.num_tasks`."""
 85
 86    @rank_zero_only
 87    def on_test_start(
 88        self,
 89        trainer: Trainer,
 90        pl_module: CULEvaluation,
 91    ) -> None:
 92        r"""Initialize the metrics for testing each seen task in the beginning of a task's testing."""
 93
 94        # get the device to put the metrics on the same device
 95        device = pl_module.device
 96
 97        # initialize test metrics for evaluation tasks
 98        self.distribution_distance = {
 99            task_id: MeanMetricBatch().to(device)
100            for task_id in pl_module.dd_eval_task_ids
101        }
102
103    @rank_zero_only
104    def on_test_batch_end(
105        self,
106        trainer: Trainer,
107        pl_module: CULEvaluation,
108        outputs: dict[str, Any],
109        batch: Any,
110        batch_idx: int,
111        dataloader_idx: int = 0,
112    ) -> None:
113        r"""Accumulating metrics from test batch. We don't need to log and monitor the metrics of test batches.
114
115        **Args:**
116        - **outputs** (`dict[str, Any]`): the outputs of the test step, which is the returns of the `test_step()` method in the `CULEvaluation`.
117        - **batch** (`Any`): the validation data batch.
118        - **dataloader_idx** (`int`): the task ID of seen tasks to be tested. A default value of 0 is given otherwise the LightningModule will raise a `RuntimeError`.
119        """
120
121        # get the batch size
122        batch_size = len(batch)
123
124        test_task_id = pl_module.get_test_task_id_from_dataloader_idx(dataloader_idx)
125
126        # get the raw outputs from the outputs dictionary
127        agg_out_main = outputs["agg_out_main"]  # aggregated outputs from the main model
128        agg_out_ref = outputs[
129            "agg_out_ref"
130        ]  # aggregated outputs from the reference model
131
132        if agg_out_main.dim() != 2:
133            raise ValueError(
134                f"Expected aggregated outputs to be (batch_size, flattened feature), i.e. 2 dimension, but got {agg_out_main.dim()}."
135            )
136
137        if agg_out_ref.dim() != 2:
138            raise ValueError(
139                f"Expected aggregated outputs to be (batch_size, flattened feature), i.e. 2 dimension, but got {agg_out_ref.dim()}."
140            )
141
142        # calculate the distribution distance between the main and reference model outputs
143        if self.distribution_distance_type == "euclidean":
144            distance = torch.norm(
145                agg_out_main - agg_out_ref, p=2, dim=-1
146            ).mean()  # Euclidean distance
147
148        elif self.distribution_distance_type == "cosine":
149            distance = (
150                torch.nn.functional.cosine_similarity(agg_out_main, agg_out_ref, dim=-1)
151            ).mean()  # cosine distance
152        elif self.distribution_distance_type == "manhattan":
153            distance = torch.norm(
154                agg_out_main - agg_out_ref, p=1, dim=-1
155            ).mean()  # Manhattan distance
156        elif self.distribution_distance_type == "cka":
157            distance = 1 - linear_CKA(agg_out_main, agg_out_ref)
158        else:
159            distance = None
160
161        # update the accumulated metrics in order to calculate the metrics of the epoch
162        self.distribution_distance[test_task_id].update(distance, batch_size)
163
164    @rank_zero_only
165    def on_test_epoch_end(
166        self,
167        trainer: Trainer,
168        pl_module: CULEvaluation,
169    ) -> None:
170        r"""Save and plot test metrics at the end of test."""
171
172        self.update_distribution_distance_to_csv(
173            distance_metric=self.distribution_distance,
174            csv_path=self.distribution_distance_csv_path,
175        )
176
177        if hasattr(self, "distribution_distance_plot_path"):
178            self.plot_distribution_distance_from_csv(
179                csv_path=self.distribution_distance_csv_path,
180                plot_path=self.distribution_distance_plot_path,
181            )
182
183    def update_distribution_distance_to_csv(
184        self,
185        distance_metric: dict[int, MeanMetricBatch],
186        csv_path: str,
187    ) -> None:
188        r"""Update the unlearning distribution distance metrics of unlearning tasks to CSV file.
189
190        **Args:**
191        - **distance_metric** (`dict[int, MeanMetricBatch]`): the distribution distance metric of unlearned tasks. Accumulated and calculated from the unlearning test batches.
192        - **csv_path** (`str`): save the test metric to path. E.g. './outputs/expr_name/1970-01-01_00-00-00/results/unlearning_test_after_task_X/distance.csv'.
193        """
194
195        eval_task_ids = list(distance_metric.keys())
196        fieldnames = ["average_distribution_distance"] + [
197            f"unlearning_test_on_task_{task_id}" for task_id in eval_task_ids
198        ]
199
200        new_line = {}
201
202        # write to the columns and calculate the average distribution distance over tasks at the same time
203        average_distribution_distance_over_unlearned_tasks = MeanMetric().to(
204            next(iter(distance_metric.values())).device
205        )
206        for task_id in eval_task_ids:
207            loss_cls = distance_metric[task_id].compute().item()
208            new_line[f"unlearning_test_on_task_{task_id}"] = loss_cls
209            average_distribution_distance_over_unlearned_tasks(loss_cls)
210        new_line["average_distribution_distance"] = (
211            average_distribution_distance_over_unlearned_tasks.compute().item()
212        )
213
214        # write to the csv file
215        is_first = not os.path.exists(csv_path)
216        if not is_first:
217            with open(csv_path, "r", encoding="utf-8") as file:
218                lines = file.readlines()
219                del lines[0]
220        # write header
221        with open(csv_path, "w", encoding="utf-8") as file:
222            writer = csv.DictWriter(file, fieldnames=fieldnames)
223            writer.writeheader()
224        # write metrics
225        with open(csv_path, "a", encoding="utf-8") as file:
226            if not is_first:
227                file.writelines(lines)  # write the previous lines
228            writer = csv.DictWriter(file, fieldnames=fieldnames)
229            writer.writerow(new_line)
230
231    def plot_distribution_distance_from_csv(
232        self, csv_path: str, plot_path: str
233    ) -> None:
234        """Plot the unlearning test distance matrix over different unlearned tasks from saved CSV file and save the plot to the designated directory.
235
236        **Args:**
237        - **csv_path** (`str`): the path to the CSV file where the `utils.save_distribution_distance_to_csv()` saved the unlearning test distance metric.
238        - **plot_path** (`str`): the path to save plot. Better same as the output directory of the experiment. E.g. './outputs/expr_name/1970-01-01_00-00-00/results/unlearning_test_after_task_X/distance.png'.
239        """
240        data = pd.read_csv(csv_path)
241
242        unlearned_task_ids = [
243            int(col.replace("unlearning_test_on_task_", ""))
244            for col in data.columns
245            if col.startswith("unlearning_test_on_task_")
246        ]
247        num_tasks = len(unlearned_task_ids)
248        num_tests = len(data)
249
250        # plot the accuracy matrix
251        fig, ax = plt.subplots(
252            figsize=(2 * (num_tasks + 1), 2 * (num_tests + 1))
253        )  # adaptive figure size
254        cax = ax.imshow(
255            data.drop(["average_distribution_distance"], axis=1),
256            interpolation="nearest",
257            cmap="Greens",
258            vmin=0,
259            vmax=1,
260        )
261
262        colorbar = fig.colorbar(cax)
263        yticks = colorbar.ax.get_yticks()
264        colorbar.ax.set_yticks(yticks)
265        colorbar.ax.set_yticklabels(
266            [f"{tick:.2f}" for tick in yticks], fontsize=10 + num_tasks
267        )  # adaptive font size
268
269        r = 0
270        for r in range(num_tests):
271            for c in range(num_tasks):
272                j = unlearned_task_ids[c]
273                s = (
274                    f"{data.loc[r, f'unlearning_test_on_task_{j}']:.3f}"
275                    if f"unlearning_test_on_task_{j}" in data.columns
276                    else ""
277                )
278                ax.text(
279                    c,
280                    r,
281                    s,
282                    ha="center",
283                    va="center",
284                    color="black",
285                    fontsize=10 + num_tasks,  # adaptive font size
286                )
287
288        ax.set_xticks(range(num_tasks))
289        ax.set_yticks(range(num_tests))
290        ax.set_xticklabels(
291            unlearned_task_ids, fontsize=10 + num_tasks
292        )  # adaptive font size
293        ax.set_yticklabels(
294            range(1, num_tests + 1), fontsize=10 + num_tests
295        )  # adaptive font size
296
297        # Labeling the axes
298        ax.set_xlabel(
299            "Testing unlearning on task τ", fontsize=10 + num_tasks
300        )  # adaptive font size
301        ax.set_ylabel(
302            "Unlearning test after training task t", fontsize=10 + num_tasks
303        )  # adaptive font size
304        fig.savefig(plot_path)
305        plt.close(fig)
306        plt.close(fig)
307
308
309def linear_CKA(X, Y):
310    """
311    计算两个表征矩阵的 CKA 相似度
312    Args:
313        X: [n, d1] torch.Tensor
314        Y: [n, d2] torch.Tensor
315    Returns:
316        cka: float (相似度,范围 [0,1])
317    """
318    # 居中 (center)
319    X = X - X.mean(0, keepdim=True)
320    Y = Y - Y.mean(0, keepdim=True)
321
322    # Gram 矩阵内积
323    dot_product = torch.norm(X.T @ Y, p="fro") ** 2
324    normalization = torch.norm(X.T @ X, p="fro") * torch.norm(Y.T @ Y, p="fro")
325
326    return dot_product / normalization
class CULDistributionDistance(clarena.metrics.base.MetricCallback):
 29class CULDistributionDistance(MetricCallback):
 30    r"""Provides all actions that are related to CUL distribution distance (DD) metric, which include:
 31
 32    - Defining, initializing and recording DD metric.
 33    - Saving DD metric to files.
 34    - Visualizing DD metric as plots.
 35
 36    The callback is able to produce the following outputs:
 37
 38    - CSV files for DD in each task.
 39    - Coloured plot for DD in each task.
 40
 41    Note that this callback is designed to be used with the `CULEvaluation` module, which is a special evaluation module for continual unlearning. It is not a typical test step in the algorithm, but rather a test protocol that evaluates the performance of the model on unlearned tasks.
 42
 43    """
 44
 45    def __init__(
 46        self,
 47        save_dir: str,
 48        distribution_distance_type: str,
 49        distribution_distance_csv_name: str = "dd.csv",
 50        distribution_distance_plot_name: str | None = None,
 51    ) -> None:
 52        r"""
 53        **Args:**
 54        - **save_dir** (`str`): The directory where data and figures of metrics will be saved. Better inside the output folder.
 55        - **distribution_distance_type** (`str`): the type of distribution distance to use; one of:
 56            - 'euclidean': Eulidean distance.
 57            - 'cosine': Cosine distance.
 58            - 'manhattan': Manhattan distance.
 59        - **distribution_distance_csv_name** (`str`): file name to save test distribution distance metrics as CSV file.
 60        - **distribution_distance_plot_name** (`str` | `None`): file name to save test distribution distance metrics as plot. If `None`, no plot will be saved.
 61
 62        """
 63        super().__init__(save_dir=save_dir)
 64
 65        self.distribution_distance_type: str = distribution_distance_type
 66        r"""The type of distribution distance to use. """
 67
 68        # paths
 69        self.distribution_distance_csv_path: str = os.path.join(
 70            self.save_dir, distribution_distance_csv_name
 71        )
 72        r"""The path to save the test distribution distance metrics CSV file."""
 73        if distribution_distance_plot_name:
 74            self.distribution_distance_plot_path: str = os.path.join(
 75                self.save_dir, distribution_distance_plot_name
 76            )
 77            r"""The path to save the test distribution distance metrics plot file."""
 78
 79        # test accumulated metrics
 80        self.distribution_distance: dict[int, MeanMetricBatch]
 81        r"""Distribution distance unlearning metrics for each seen task. Accumulated and calculated from the test batches. Keys are task IDs and values are the corresponding metrics."""
 82
 83        # task ID control
 84        self.task_id: int
 85        r"""Task ID counter indicating which task is being processed. Self updated during the task loop. Valid from 1 to `cl_dataset.num_tasks`."""
 86
 87    @rank_zero_only
 88    def on_test_start(
 89        self,
 90        trainer: Trainer,
 91        pl_module: CULEvaluation,
 92    ) -> None:
 93        r"""Initialize the metrics for testing each seen task in the beginning of a task's testing."""
 94
 95        # get the device to put the metrics on the same device
 96        device = pl_module.device
 97
 98        # initialize test metrics for evaluation tasks
 99        self.distribution_distance = {
100            task_id: MeanMetricBatch().to(device)
101            for task_id in pl_module.dd_eval_task_ids
102        }
103
104    @rank_zero_only
105    def on_test_batch_end(
106        self,
107        trainer: Trainer,
108        pl_module: CULEvaluation,
109        outputs: dict[str, Any],
110        batch: Any,
111        batch_idx: int,
112        dataloader_idx: int = 0,
113    ) -> None:
114        r"""Accumulating metrics from test batch. We don't need to log and monitor the metrics of test batches.
115
116        **Args:**
117        - **outputs** (`dict[str, Any]`): the outputs of the test step, which is the returns of the `test_step()` method in the `CULEvaluation`.
118        - **batch** (`Any`): the validation data batch.
119        - **dataloader_idx** (`int`): the task ID of seen tasks to be tested. A default value of 0 is given otherwise the LightningModule will raise a `RuntimeError`.
120        """
121
122        # get the batch size
123        batch_size = len(batch)
124
125        test_task_id = pl_module.get_test_task_id_from_dataloader_idx(dataloader_idx)
126
127        # get the raw outputs from the outputs dictionary
128        agg_out_main = outputs["agg_out_main"]  # aggregated outputs from the main model
129        agg_out_ref = outputs[
130            "agg_out_ref"
131        ]  # aggregated outputs from the reference model
132
133        if agg_out_main.dim() != 2:
134            raise ValueError(
135                f"Expected aggregated outputs to be (batch_size, flattened feature), i.e. 2 dimension, but got {agg_out_main.dim()}."
136            )
137
138        if agg_out_ref.dim() != 2:
139            raise ValueError(
140                f"Expected aggregated outputs to be (batch_size, flattened feature), i.e. 2 dimension, but got {agg_out_ref.dim()}."
141            )
142
143        # calculate the distribution distance between the main and reference model outputs
144        if self.distribution_distance_type == "euclidean":
145            distance = torch.norm(
146                agg_out_main - agg_out_ref, p=2, dim=-1
147            ).mean()  # Euclidean distance
148
149        elif self.distribution_distance_type == "cosine":
150            distance = (
151                torch.nn.functional.cosine_similarity(agg_out_main, agg_out_ref, dim=-1)
152            ).mean()  # cosine distance
153        elif self.distribution_distance_type == "manhattan":
154            distance = torch.norm(
155                agg_out_main - agg_out_ref, p=1, dim=-1
156            ).mean()  # Manhattan distance
157        elif self.distribution_distance_type == "cka":
158            distance = 1 - linear_CKA(agg_out_main, agg_out_ref)
159        else:
160            distance = None
161
162        # update the accumulated metrics in order to calculate the metrics of the epoch
163        self.distribution_distance[test_task_id].update(distance, batch_size)
164
165    @rank_zero_only
166    def on_test_epoch_end(
167        self,
168        trainer: Trainer,
169        pl_module: CULEvaluation,
170    ) -> None:
171        r"""Save and plot test metrics at the end of test."""
172
173        self.update_distribution_distance_to_csv(
174            distance_metric=self.distribution_distance,
175            csv_path=self.distribution_distance_csv_path,
176        )
177
178        if hasattr(self, "distribution_distance_plot_path"):
179            self.plot_distribution_distance_from_csv(
180                csv_path=self.distribution_distance_csv_path,
181                plot_path=self.distribution_distance_plot_path,
182            )
183
184    def update_distribution_distance_to_csv(
185        self,
186        distance_metric: dict[int, MeanMetricBatch],
187        csv_path: str,
188    ) -> None:
189        r"""Update the unlearning distribution distance metrics of unlearning tasks to CSV file.
190
191        **Args:**
192        - **distance_metric** (`dict[int, MeanMetricBatch]`): the distribution distance metric of unlearned tasks. Accumulated and calculated from the unlearning test batches.
193        - **csv_path** (`str`): save the test metric to path. E.g. './outputs/expr_name/1970-01-01_00-00-00/results/unlearning_test_after_task_X/distance.csv'.
194        """
195
196        eval_task_ids = list(distance_metric.keys())
197        fieldnames = ["average_distribution_distance"] + [
198            f"unlearning_test_on_task_{task_id}" for task_id in eval_task_ids
199        ]
200
201        new_line = {}
202
203        # write to the columns and calculate the average distribution distance over tasks at the same time
204        average_distribution_distance_over_unlearned_tasks = MeanMetric().to(
205            next(iter(distance_metric.values())).device
206        )
207        for task_id in eval_task_ids:
208            loss_cls = distance_metric[task_id].compute().item()
209            new_line[f"unlearning_test_on_task_{task_id}"] = loss_cls
210            average_distribution_distance_over_unlearned_tasks(loss_cls)
211        new_line["average_distribution_distance"] = (
212            average_distribution_distance_over_unlearned_tasks.compute().item()
213        )
214
215        # write to the csv file
216        is_first = not os.path.exists(csv_path)
217        if not is_first:
218            with open(csv_path, "r", encoding="utf-8") as file:
219                lines = file.readlines()
220                del lines[0]
221        # write header
222        with open(csv_path, "w", encoding="utf-8") as file:
223            writer = csv.DictWriter(file, fieldnames=fieldnames)
224            writer.writeheader()
225        # write metrics
226        with open(csv_path, "a", encoding="utf-8") as file:
227            if not is_first:
228                file.writelines(lines)  # write the previous lines
229            writer = csv.DictWriter(file, fieldnames=fieldnames)
230            writer.writerow(new_line)
231
232    def plot_distribution_distance_from_csv(
233        self, csv_path: str, plot_path: str
234    ) -> None:
235        """Plot the unlearning test distance matrix over different unlearned tasks from saved CSV file and save the plot to the designated directory.
236
237        **Args:**
238        - **csv_path** (`str`): the path to the CSV file where the `utils.save_distribution_distance_to_csv()` saved the unlearning test distance metric.
239        - **plot_path** (`str`): the path to save plot. Better same as the output directory of the experiment. E.g. './outputs/expr_name/1970-01-01_00-00-00/results/unlearning_test_after_task_X/distance.png'.
240        """
241        data = pd.read_csv(csv_path)
242
243        unlearned_task_ids = [
244            int(col.replace("unlearning_test_on_task_", ""))
245            for col in data.columns
246            if col.startswith("unlearning_test_on_task_")
247        ]
248        num_tasks = len(unlearned_task_ids)
249        num_tests = len(data)
250
251        # plot the accuracy matrix
252        fig, ax = plt.subplots(
253            figsize=(2 * (num_tasks + 1), 2 * (num_tests + 1))
254        )  # adaptive figure size
255        cax = ax.imshow(
256            data.drop(["average_distribution_distance"], axis=1),
257            interpolation="nearest",
258            cmap="Greens",
259            vmin=0,
260            vmax=1,
261        )
262
263        colorbar = fig.colorbar(cax)
264        yticks = colorbar.ax.get_yticks()
265        colorbar.ax.set_yticks(yticks)
266        colorbar.ax.set_yticklabels(
267            [f"{tick:.2f}" for tick in yticks], fontsize=10 + num_tasks
268        )  # adaptive font size
269
270        r = 0
271        for r in range(num_tests):
272            for c in range(num_tasks):
273                j = unlearned_task_ids[c]
274                s = (
275                    f"{data.loc[r, f'unlearning_test_on_task_{j}']:.3f}"
276                    if f"unlearning_test_on_task_{j}" in data.columns
277                    else ""
278                )
279                ax.text(
280                    c,
281                    r,
282                    s,
283                    ha="center",
284                    va="center",
285                    color="black",
286                    fontsize=10 + num_tasks,  # adaptive font size
287                )
288
289        ax.set_xticks(range(num_tasks))
290        ax.set_yticks(range(num_tests))
291        ax.set_xticklabels(
292            unlearned_task_ids, fontsize=10 + num_tasks
293        )  # adaptive font size
294        ax.set_yticklabels(
295            range(1, num_tests + 1), fontsize=10 + num_tests
296        )  # adaptive font size
297
298        # Labeling the axes
299        ax.set_xlabel(
300            "Testing unlearning on task τ", fontsize=10 + num_tasks
301        )  # adaptive font size
302        ax.set_ylabel(
303            "Unlearning test after training task t", fontsize=10 + num_tasks
304        )  # adaptive font size
305        fig.savefig(plot_path)
306        plt.close(fig)
307        plt.close(fig)

Provides all actions that are related to CUL distribution distance (DD) metric, which include:

  • Defining, initializing and recording DD metric.
  • Saving DD metric to files.
  • Visualizing DD metric as plots.

The callback is able to produce the following outputs:

  • CSV files for DD in each task.
  • Coloured plot for DD in each task.

Note that this callback is designed to be used with the CULEvaluation module, which is a special evaluation module for continual unlearning. It is not a typical test step in the algorithm, but rather a test protocol that evaluates the performance of the model on unlearned tasks.

CULDistributionDistance( save_dir: str, distribution_distance_type: str, distribution_distance_csv_name: str = 'dd.csv', distribution_distance_plot_name: str | None = None)
45    def __init__(
46        self,
47        save_dir: str,
48        distribution_distance_type: str,
49        distribution_distance_csv_name: str = "dd.csv",
50        distribution_distance_plot_name: str | None = None,
51    ) -> None:
52        r"""
53        **Args:**
54        - **save_dir** (`str`): The directory where data and figures of metrics will be saved. Better inside the output folder.
55        - **distribution_distance_type** (`str`): the type of distribution distance to use; one of:
56            - 'euclidean': Eulidean distance.
57            - 'cosine': Cosine distance.
58            - 'manhattan': Manhattan distance.
59        - **distribution_distance_csv_name** (`str`): file name to save test distribution distance metrics as CSV file.
60        - **distribution_distance_plot_name** (`str` | `None`): file name to save test distribution distance metrics as plot. If `None`, no plot will be saved.
61
62        """
63        super().__init__(save_dir=save_dir)
64
65        self.distribution_distance_type: str = distribution_distance_type
66        r"""The type of distribution distance to use. """
67
68        # paths
69        self.distribution_distance_csv_path: str = os.path.join(
70            self.save_dir, distribution_distance_csv_name
71        )
72        r"""The path to save the test distribution distance metrics CSV file."""
73        if distribution_distance_plot_name:
74            self.distribution_distance_plot_path: str = os.path.join(
75                self.save_dir, distribution_distance_plot_name
76            )
77            r"""The path to save the test distribution distance metrics plot file."""
78
79        # test accumulated metrics
80        self.distribution_distance: dict[int, MeanMetricBatch]
81        r"""Distribution distance unlearning metrics for each seen task. Accumulated and calculated from the test batches. Keys are task IDs and values are the corresponding metrics."""
82
83        # task ID control
84        self.task_id: int
85        r"""Task ID counter indicating which task is being processed. Self updated during the task loop. Valid from 1 to `cl_dataset.num_tasks`."""

Args:

  • save_dir (str): The directory where data and figures of metrics will be saved. Better inside the output folder.
  • distribution_distance_type (str): the type of distribution distance to use; one of:
    • 'euclidean': Eulidean distance.
    • 'cosine': Cosine distance.
    • 'manhattan': Manhattan distance.
  • distribution_distance_csv_name (str): file name to save test distribution distance metrics as CSV file.
  • distribution_distance_plot_name (str | None): file name to save test distribution distance metrics as plot. If None, no plot will be saved.
distribution_distance_type: str

The type of distribution distance to use.

distribution_distance_csv_path: str

The path to save the test distribution distance metrics CSV file.

distribution_distance: dict[int, clarena.utils.metrics.MeanMetricBatch]

Distribution distance unlearning metrics for each seen task. Accumulated and calculated from the test batches. Keys are task IDs and values are the corresponding metrics.

task_id: int

Task ID counter indicating which task is being processed. Self updated during the task loop. Valid from 1 to cl_dataset.num_tasks.

@rank_zero_only
def on_test_start( self, trainer: lightning.pytorch.trainer.trainer.Trainer, pl_module: clarena.utils.eval.CULEvaluation) -> None:
 87    @rank_zero_only
 88    def on_test_start(
 89        self,
 90        trainer: Trainer,
 91        pl_module: CULEvaluation,
 92    ) -> None:
 93        r"""Initialize the metrics for testing each seen task in the beginning of a task's testing."""
 94
 95        # get the device to put the metrics on the same device
 96        device = pl_module.device
 97
 98        # initialize test metrics for evaluation tasks
 99        self.distribution_distance = {
100            task_id: MeanMetricBatch().to(device)
101            for task_id in pl_module.dd_eval_task_ids
102        }

Initialize the metrics for testing each seen task in the beginning of a task's testing.

@rank_zero_only
def on_test_batch_end( self, trainer: lightning.pytorch.trainer.trainer.Trainer, pl_module: clarena.utils.eval.CULEvaluation, outputs: dict[str, typing.Any], batch: Any, batch_idx: int, dataloader_idx: int = 0) -> None:
104    @rank_zero_only
105    def on_test_batch_end(
106        self,
107        trainer: Trainer,
108        pl_module: CULEvaluation,
109        outputs: dict[str, Any],
110        batch: Any,
111        batch_idx: int,
112        dataloader_idx: int = 0,
113    ) -> None:
114        r"""Accumulating metrics from test batch. We don't need to log and monitor the metrics of test batches.
115
116        **Args:**
117        - **outputs** (`dict[str, Any]`): the outputs of the test step, which is the returns of the `test_step()` method in the `CULEvaluation`.
118        - **batch** (`Any`): the validation data batch.
119        - **dataloader_idx** (`int`): the task ID of seen tasks to be tested. A default value of 0 is given otherwise the LightningModule will raise a `RuntimeError`.
120        """
121
122        # get the batch size
123        batch_size = len(batch)
124
125        test_task_id = pl_module.get_test_task_id_from_dataloader_idx(dataloader_idx)
126
127        # get the raw outputs from the outputs dictionary
128        agg_out_main = outputs["agg_out_main"]  # aggregated outputs from the main model
129        agg_out_ref = outputs[
130            "agg_out_ref"
131        ]  # aggregated outputs from the reference model
132
133        if agg_out_main.dim() != 2:
134            raise ValueError(
135                f"Expected aggregated outputs to be (batch_size, flattened feature), i.e. 2 dimension, but got {agg_out_main.dim()}."
136            )
137
138        if agg_out_ref.dim() != 2:
139            raise ValueError(
140                f"Expected aggregated outputs to be (batch_size, flattened feature), i.e. 2 dimension, but got {agg_out_ref.dim()}."
141            )
142
143        # calculate the distribution distance between the main and reference model outputs
144        if self.distribution_distance_type == "euclidean":
145            distance = torch.norm(
146                agg_out_main - agg_out_ref, p=2, dim=-1
147            ).mean()  # Euclidean distance
148
149        elif self.distribution_distance_type == "cosine":
150            distance = (
151                torch.nn.functional.cosine_similarity(agg_out_main, agg_out_ref, dim=-1)
152            ).mean()  # cosine distance
153        elif self.distribution_distance_type == "manhattan":
154            distance = torch.norm(
155                agg_out_main - agg_out_ref, p=1, dim=-1
156            ).mean()  # Manhattan distance
157        elif self.distribution_distance_type == "cka":
158            distance = 1 - linear_CKA(agg_out_main, agg_out_ref)
159        else:
160            distance = None
161
162        # update the accumulated metrics in order to calculate the metrics of the epoch
163        self.distribution_distance[test_task_id].update(distance, batch_size)

Accumulating metrics from test batch. We don't need to log and monitor the metrics of test batches.

Args:

  • outputs (dict[str, Any]): the outputs of the test step, which is the returns of the test_step() method in the CULEvaluation.
  • batch (Any): the validation data batch.
  • dataloader_idx (int): the task ID of seen tasks to be tested. A default value of 0 is given otherwise the LightningModule will raise a RuntimeError.
@rank_zero_only
def on_test_epoch_end( self, trainer: lightning.pytorch.trainer.trainer.Trainer, pl_module: clarena.utils.eval.CULEvaluation) -> None:
165    @rank_zero_only
166    def on_test_epoch_end(
167        self,
168        trainer: Trainer,
169        pl_module: CULEvaluation,
170    ) -> None:
171        r"""Save and plot test metrics at the end of test."""
172
173        self.update_distribution_distance_to_csv(
174            distance_metric=self.distribution_distance,
175            csv_path=self.distribution_distance_csv_path,
176        )
177
178        if hasattr(self, "distribution_distance_plot_path"):
179            self.plot_distribution_distance_from_csv(
180                csv_path=self.distribution_distance_csv_path,
181                plot_path=self.distribution_distance_plot_path,
182            )

Save and plot test metrics at the end of test.

def update_distribution_distance_to_csv( self, distance_metric: dict[int, clarena.utils.metrics.MeanMetricBatch], csv_path: str) -> None:
184    def update_distribution_distance_to_csv(
185        self,
186        distance_metric: dict[int, MeanMetricBatch],
187        csv_path: str,
188    ) -> None:
189        r"""Update the unlearning distribution distance metrics of unlearning tasks to CSV file.
190
191        **Args:**
192        - **distance_metric** (`dict[int, MeanMetricBatch]`): the distribution distance metric of unlearned tasks. Accumulated and calculated from the unlearning test batches.
193        - **csv_path** (`str`): save the test metric to path. E.g. './outputs/expr_name/1970-01-01_00-00-00/results/unlearning_test_after_task_X/distance.csv'.
194        """
195
196        eval_task_ids = list(distance_metric.keys())
197        fieldnames = ["average_distribution_distance"] + [
198            f"unlearning_test_on_task_{task_id}" for task_id in eval_task_ids
199        ]
200
201        new_line = {}
202
203        # write to the columns and calculate the average distribution distance over tasks at the same time
204        average_distribution_distance_over_unlearned_tasks = MeanMetric().to(
205            next(iter(distance_metric.values())).device
206        )
207        for task_id in eval_task_ids:
208            loss_cls = distance_metric[task_id].compute().item()
209            new_line[f"unlearning_test_on_task_{task_id}"] = loss_cls
210            average_distribution_distance_over_unlearned_tasks(loss_cls)
211        new_line["average_distribution_distance"] = (
212            average_distribution_distance_over_unlearned_tasks.compute().item()
213        )
214
215        # write to the csv file
216        is_first = not os.path.exists(csv_path)
217        if not is_first:
218            with open(csv_path, "r", encoding="utf-8") as file:
219                lines = file.readlines()
220                del lines[0]
221        # write header
222        with open(csv_path, "w", encoding="utf-8") as file:
223            writer = csv.DictWriter(file, fieldnames=fieldnames)
224            writer.writeheader()
225        # write metrics
226        with open(csv_path, "a", encoding="utf-8") as file:
227            if not is_first:
228                file.writelines(lines)  # write the previous lines
229            writer = csv.DictWriter(file, fieldnames=fieldnames)
230            writer.writerow(new_line)

Update the unlearning distribution distance metrics of unlearning tasks to CSV file.

Args:

  • distance_metric (dict[int, MeanMetricBatch]): the distribution distance metric of unlearned tasks. Accumulated and calculated from the unlearning test batches.
  • csv_path (str): save the test metric to path. E.g. './outputs/expr_name/1970-01-01_00-00-00/results/unlearning_test_after_task_X/distance.csv'.
def plot_distribution_distance_from_csv(self, csv_path: str, plot_path: str) -> None:
232    def plot_distribution_distance_from_csv(
233        self, csv_path: str, plot_path: str
234    ) -> None:
235        """Plot the unlearning test distance matrix over different unlearned tasks from saved CSV file and save the plot to the designated directory.
236
237        **Args:**
238        - **csv_path** (`str`): the path to the CSV file where the `utils.save_distribution_distance_to_csv()` saved the unlearning test distance metric.
239        - **plot_path** (`str`): the path to save plot. Better same as the output directory of the experiment. E.g. './outputs/expr_name/1970-01-01_00-00-00/results/unlearning_test_after_task_X/distance.png'.
240        """
241        data = pd.read_csv(csv_path)
242
243        unlearned_task_ids = [
244            int(col.replace("unlearning_test_on_task_", ""))
245            for col in data.columns
246            if col.startswith("unlearning_test_on_task_")
247        ]
248        num_tasks = len(unlearned_task_ids)
249        num_tests = len(data)
250
251        # plot the accuracy matrix
252        fig, ax = plt.subplots(
253            figsize=(2 * (num_tasks + 1), 2 * (num_tests + 1))
254        )  # adaptive figure size
255        cax = ax.imshow(
256            data.drop(["average_distribution_distance"], axis=1),
257            interpolation="nearest",
258            cmap="Greens",
259            vmin=0,
260            vmax=1,
261        )
262
263        colorbar = fig.colorbar(cax)
264        yticks = colorbar.ax.get_yticks()
265        colorbar.ax.set_yticks(yticks)
266        colorbar.ax.set_yticklabels(
267            [f"{tick:.2f}" for tick in yticks], fontsize=10 + num_tasks
268        )  # adaptive font size
269
270        r = 0
271        for r in range(num_tests):
272            for c in range(num_tasks):
273                j = unlearned_task_ids[c]
274                s = (
275                    f"{data.loc[r, f'unlearning_test_on_task_{j}']:.3f}"
276                    if f"unlearning_test_on_task_{j}" in data.columns
277                    else ""
278                )
279                ax.text(
280                    c,
281                    r,
282                    s,
283                    ha="center",
284                    va="center",
285                    color="black",
286                    fontsize=10 + num_tasks,  # adaptive font size
287                )
288
289        ax.set_xticks(range(num_tasks))
290        ax.set_yticks(range(num_tests))
291        ax.set_xticklabels(
292            unlearned_task_ids, fontsize=10 + num_tasks
293        )  # adaptive font size
294        ax.set_yticklabels(
295            range(1, num_tests + 1), fontsize=10 + num_tests
296        )  # adaptive font size
297
298        # Labeling the axes
299        ax.set_xlabel(
300            "Testing unlearning on task τ", fontsize=10 + num_tasks
301        )  # adaptive font size
302        ax.set_ylabel(
303            "Unlearning test after training task t", fontsize=10 + num_tasks
304        )  # adaptive font size
305        fig.savefig(plot_path)
306        plt.close(fig)
307        plt.close(fig)

Plot the unlearning test distance matrix over different unlearned tasks from saved CSV file and save the plot to the designated directory.

Args:

  • csv_path (str): the path to the CSV file where the utils.save_distribution_distance_to_csv() saved the unlearning test distance metric.
  • plot_path (str): the path to save plot. Better same as the output directory of the experiment. E.g. './outputs/expr_name/1970-01-01_00-00-00/results/unlearning_test_after_task_X/distance.png'.