Source code for ddmtolab.Algorithms.MTMO.ParEGO_KT

"""
ParEGO with Knowledge Transfer (ParEGO-KT)

This module implements ParEGO-KT for expensive multitask multiobjective optimization.
It extends ParEGO with cross-task knowledge transfer using Spearman rank correlation
to identify and leverage beneficial task relationships.

References
----------
    [1] J. Knowles. "ParEGO: A hybrid algorithm with on-line landscape approximation for expensive multiobjective optimization problems." IEEE Transactions on Evolutionary Computation, 2006, 10(1): 50-66.

Notes
-----
Author: Jiangtao Shen
Email: j.shen5@exeter.ac.uk
Date: 2025.12.18
Version: 1.0
"""
import numpy as np
from scipy.stats import spearmanr
from tqdm import tqdm
import torch
from ddmtolab.Methods.Algo_Methods.bo_utils import bo_next_point
from ddmtolab.Methods.Algo_Methods.uniform_point import uniform_point
from ddmtolab.Methods.Algo_Methods.algo_utils import *
import warnings
import time

warnings.filterwarnings("ignore")


[docs] class ParEGO_KT: algorithm_information = { 'n_tasks': '[2, K]', 'dims': 'unequal', 'objs': 'unequal', 'n_objs': '[2, M]', 'cons': 'equal', 'n_cons': '0', 'expensive': 'True', 'knowledge_transfer': 'True', 'n_initial': 'unequal', 'n_weights': 'unequal', 'max_nfes': 'unequal' }
[docs] def __init__(self, problem, n_initial=None, n_weights=None, max_nfes=None, rho=0.05, save_data=True, save_path='./Data', name='ParEGO-KT', disable_tqdm=True): """ Initialize ParEGO-KT algorithm with knowledge transfer. Parameters ---------- problem : MTOP Multi-task optimization problem instance n_initial : int or List[int], optional Number of initial samples per task (default: 11*dim - 1) n_weights : int or List[int], optional Number of reference weight vectors per task (default: 100) max_nfes : int or List[int], optional Maximum number of function evaluations per task (default: 200) rho : float, optional Augmentation coefficient for augmented Tchebycheff scalarization (default: 0.05) save_data : bool, optional Whether to save optimization data (default: True) save_path : str, optional Path to save results (default: './TestData') name : str, optional Name for the experiment (default: 'ParEGO_KT_test') disable_tqdm : bool, optional Whether to disable progress bar (default: True) """ self.problem = problem self.n_initial = n_initial self.n_weights = n_weights if n_weights is not None else 100 self.max_nfes = max_nfes if max_nfes is not None else 200 self.rho = rho self.save_data = save_data self.save_path = save_path self.name = name self.disable_tqdm = disable_tqdm
[docs] def optimize(self): """ Execute the ParEGO-KT algorithm with knowledge transfer. Returns ------- Results Optimization results containing decision variables, objectives, and runtime """ data_type = torch.float start_time = time.time() problem = self.problem nt = problem.n_tasks dims = problem.dims no = problem.n_objs # Set default initial samples: 11*dim - 1 if self.n_initial is None: n_initial_per_task = [11 * dims[i] - 1 for i in range(nt)] else: n_initial_per_task = par_list(self.n_initial, nt) max_nfes_per_task = par_list(self.max_nfes, nt) n_weights_per_task = par_list(self.n_weights, nt) # Generate uniformly distributed weight vectors for each task W = [] for i in range(nt): w_i, actual_n = uniform_point(n_weights_per_task[i], no[i]) W.append(w_i) n_weights_per_task[i] = actual_n # ============ Knowledge Transfer: Initialize with shared samples ============ # Generate initial samples in [0,1] space using LHS (the_same=True) decs = initialization(problem, n_initial_per_task, method='lhs', the_same=True) objs, _ = evaluation(problem, decs) nfes_per_task = n_initial_per_task.copy() # ============ Compute task similarity based on initial samples ============ # Compute ranking for each task using NSGA-II sorting ranks = [] for i in range(nt): rank_i, _, _ = nsga2_sort(objs[i]) ranks.append(rank_i) # Compute similarity matrix using Spearman's rank correlation similarity_matrix = self._compute_similarity_matrix(ranks, n_initial_per_task) pbar = tqdm(total=sum(max_nfes_per_task), initial=sum(n_initial_per_task), desc=f"{self.name}", disable=self.disable_tqdm) while sum(nfes_per_task) < sum(max_nfes_per_task): # ============ BO Sampling Phase ============ active_tasks = [i for i in range(nt) if nfes_per_task[i] < max_nfes_per_task[i]] if not active_tasks: break for i in active_tasks: # Randomly select a weight vector weight_idx = np.random.randint(0, n_weights_per_task[i]) weight = W[i][weight_idx] # Scalarize multi-objective values using augmented Tchebycheff scalar_objs = self._scalarize(objs[i], weight) # Fit GP surrogate and select next candidate via BO with EI candidate_np = bo_next_point(dims[i], decs[i], scalar_objs, data_type=data_type) # Evaluate the candidate solution obj, _ = evaluation_single(problem, candidate_np, i) # Update dataset with new sample decs[i], objs[i] = vstack_groups((decs[i], candidate_np), (objs[i], obj)) nfes_per_task[i] += 1 pbar.update(1) # ============ Knowledge Transfer Phase ============ # After all tasks complete one BO iteration, perform knowledge transfer for i in active_tasks: if nfes_per_task[i] >= max_nfes_per_task[i]: continue # Find the most similar task to task i most_similar_task = self._find_most_similar_task(i, similarity_matrix) # Generate random number for transfer decision transfer_prob = similarity_matrix[i, most_similar_task] if np.random.rand() < transfer_prob: # Find the best solution in the most similar task weight_idx = np.random.randint(0, n_weights_per_task[most_similar_task]) weight = W[most_similar_task][weight_idx] scalar_objs_source = self._scalarize(objs[most_similar_task], weight) best_idx = np.argmin(scalar_objs_source) best_solution = decs[most_similar_task][best_idx] # Transfer solution: truncate or pad to match target task dimension transferred_solution = self._adapt_solution(best_solution, dims[most_similar_task], dims[i]) # Evaluate transferred solution on target task obj_transfer, _ = evaluation_single(problem, transferred_solution, i) # Update dataset decs[i], objs[i] = vstack_groups((decs[i], transferred_solution), (objs[i], obj_transfer)) nfes_per_task[i] += 1 pbar.update(1) pbar.close() runtime = time.time() - start_time all_decs, all_objs = build_staircase_history(decs, objs, k=1) results = build_save_results(all_decs=all_decs, all_objs=all_objs, runtime=runtime, max_nfes=nfes_per_task, bounds=problem.bounds, save_path=self.save_path, filename=self.name, save_data=self.save_data) return results
def _compute_similarity_matrix(self, ranks, n_initial_per_task): """ Compute task similarity matrix based on Spearman's rank correlation. Parameters ---------- ranks : List[np.ndarray] List of rank arrays for each task n_initial_per_task : List[int] Number of initial samples per task Returns ------- similarity_matrix : np.ndarray Similarity matrix of shape (n_tasks, n_tasks) with values in [0, 1] """ nt = len(ranks) similarity_matrix = np.zeros((nt, nt)) # Use minimum number of samples for correlation computation min_samples = min(n_initial_per_task) for i in range(nt): for j in range(nt): if i == j: similarity_matrix[i, j] = 1.0 else: # Compute Spearman correlation on the common samples rank_i = ranks[i][:min_samples] rank_j = ranks[j][:min_samples] correlation, _ = spearmanr(rank_i, rank_j) # Convert correlation from [-1, 1] to [0, 1] # correlation = 1 means identical ranking (high similarity) # correlation = -1 means opposite ranking (low similarity) similarity = (correlation + 1) / 2 similarity_matrix[i, j] = max(0.0, min(1.0, similarity)) return similarity_matrix def _find_most_similar_task(self, task_idx, similarity_matrix): """ Find the most similar task to the given task. Parameters ---------- task_idx : int Index of the target task similarity_matrix : np.ndarray Task similarity matrix Returns ------- most_similar_idx : int Index of the most similar task """ similarities = similarity_matrix[task_idx].copy() similarities[task_idx] = -1 # Exclude self most_similar_idx = np.argmax(similarities) return most_similar_idx def _adapt_solution(self, solution, source_dim, target_dim): """ Adapt solution from source task to target task by truncating or padding. Parameters ---------- solution : np.ndarray Solution from source task of shape (source_dim,) source_dim : int Dimension of source task target_dim : int Dimension of target task Returns ------- adapted_solution : np.ndarray Adapted solution of shape (target_dim,) """ if source_dim >= target_dim: # Truncate to target dimension return solution[:target_dim].reshape(1, -1) else: # Pad with random values in [0, 1] padding = np.random.rand(target_dim - source_dim) return np.concatenate([solution, padding]).reshape(1, -1) def _scalarize(self, objs, weight): """ Scalarize multi-objective values using augmented Tchebycheff approach. Parameters ---------- objs : np.ndarray Multi-objective values of shape (N, M) weight : np.ndarray Weight vector of shape (M,) Returns ------- scalar_objs : np.ndarray Scalarized objective values of shape (N, 1) """ # Normalize objectives to [0, 1] range obj_min = np.min(objs, axis=0) obj_max = np.max(objs, axis=0) obj_range = obj_max - obj_min obj_range = np.maximum(obj_range, 1e-10) normalized_objs = (objs - obj_min) / obj_range # Augmented Tchebycheff scalarization weighted_objs = normalized_objs * weight max_term = np.max(weighted_objs, axis=1) aug_term = self.rho * np.sum(weighted_objs, axis=1) scalar_objs = max_term + aug_term return scalar_objs.reshape(-1, 1)
def nsga2_sort(objs, cons=None): """ Sort solutions based on NSGA-II criteria using non-dominated sorting and crowding distance. Parameters ---------- objs : np.ndarray Objective value matrix of shape (pop_size, n_obj) cons : np.ndarray, optional Constraint matrix of shape (pop_size, n_con). If None, no constraints are considered (default: None) Returns ------- rank : np.ndarray Ranking of each solution (0-based index after sorting) of shape (pop_size,). rank[i] indicates the position of solution i in the sorted order front_no : np.ndarray Non-dominated front number of each solution of shape (pop_size,) crowd_dis : np.ndarray Crowding distance of each solution of shape (pop_size,) Notes ----- Solutions are sorted first by front number (ascending), then by crowding distance (descending). Larger crowding distance values indicate better diversity preservation. """ pop_size = objs.shape[0] # Perform non-dominated sorting if cons is not None: front_no, _ = nd_sort(objs, cons, pop_size) else: front_no, _ = nd_sort(objs, pop_size) # Calculate crowding distance for diversity preservation crowd_dis = crowding_distance(objs, front_no) # Sort by front number (ascending), then by crowding distance (descending) sorted_indices = np.lexsort((-crowd_dis, front_no)) # Create rank array: rank[i] gives the sorted position of solution i rank = np.empty(pop_size, dtype=int) rank[sorted_indices] = np.arange(pop_size) return rank, front_no, crowd_dis