Source code for ddmtolab.Algorithms.STMO.ParEGO

"""
Pareto Efficient Global Optimization (ParEGO)

This module implements ParEGO for expensive multi-objective optimization problems.

References
----------
    [1] Knowles, Joshua. "ParEGO: A hybrid algorithm with on-line landscape approximation for expensive multiobjective optimization problems." IEEE Transactions on Evolutionary Computation 10.1 (2006): 50-66.

Notes
-----
Author: Jiangtao Shen
Date: 2025.01.10
Version: 1.0
"""
from tqdm import tqdm
import torch
import numpy as np
from ddmtolab.Methods.Algo_Methods.bo_utils import bo_next_point
from ddmtolab.Methods.Algo_Methods.uniform_point import uniform_point
from ddmtolab.Methods.Algo_Methods.algo_utils import *
import warnings
import time

warnings.filterwarnings("ignore")


[docs] class ParEGO: """ Pareto Efficient Global Optimization algorithm for expensive multi-objective optimization. ParEGO uses scalarization with randomly selected weight vectors to convert multi-objective problems into single-objective problems, which are then solved using Bayesian Optimization with Expected Improvement. Attributes ---------- algorithm_information : dict Dictionary containing algorithm capabilities and requirements """ algorithm_information = { 'n_tasks': '[1, K]', 'dims': 'unequal', 'objs': 'unequal', 'n_objs': '[2, M]', 'cons': 'equal', 'n_cons': '0', 'expensive': 'True', 'knowledge_transfer': 'False', 'n_initial': 'unequal', 'n_weights': 'unequal', 'max_nfes': 'unequal' } @classmethod def get_algorithm_information(cls, print_info=True): return get_algorithm_information(cls, print_info)
[docs] def __init__(self, problem, n_initial=None, n_weights=None, max_nfes=None, rho=0.05, save_data=True, save_path='./Data', name='ParEGO', disable_tqdm=True): """ Initialize ParEGO algorithm. Parameters ---------- problem : MTOP Multi-task optimization problem instance n_initial : int or List[int], optional Number of initial samples per task (default: 11*dim - 1, following Knowles 2006) n_weights : int or List[int], optional Number of reference weight vectors per task (default: 10) max_nfes : int or List[int], optional Maximum number of function evaluations per task (default: 100) rho : float, optional Augmentation coefficient for augmented Tchebycheff scalarization (default: 0.05) save_data : bool, optional Whether to save optimization data (default: True) save_path : str, optional Path to save results (default: './TestData') name : str, optional Name for the experiment (default: 'ParEGO_test') disable_tqdm : bool, optional Whether to disable progress bar (default: True) """ self.problem = problem self.n_initial = n_initial self.n_weights = n_weights if n_weights is not None else 100 self.max_nfes = max_nfes if max_nfes is not None else 200 self.rho = rho self.save_data = save_data self.save_path = save_path self.name = name self.disable_tqdm = disable_tqdm
[docs] def optimize(self): """ Execute the ParEGO algorithm. Returns ------- Results Optimization results containing decision variables, objectives, and runtime """ data_type = torch.float start_time = time.time() problem = self.problem nt = problem.n_tasks dims = problem.dims no = problem.n_objs # Set default initial samples: 11*dim - 1 if self.n_initial is None: n_initial_per_task = [11 * dims[i] - 1 for i in range(nt)] else: n_initial_per_task = par_list(self.n_initial, nt) max_nfes_per_task = par_list(self.max_nfes, nt) n_weights_per_task = par_list(self.n_weights, nt) # Generate uniformly distributed weight vectors for each task W = [] for i in range(nt): w_i, actual_n = uniform_point(n_weights_per_task[i], no[i]) W.append(w_i) n_weights_per_task[i] = actual_n # Generate initial samples using Latin Hypercube Sampling decs = initialization(problem, n_initial_per_task, method='lhs') objs, _ = evaluation(problem, decs) nfes_per_task = n_initial_per_task.copy() pbar = tqdm(total=sum(max_nfes_per_task), initial=sum(n_initial_per_task), desc=f"{self.name}", disable=self.disable_tqdm) while sum(nfes_per_task) < sum(max_nfes_per_task): # Skip tasks that have exhausted their evaluation budget active_tasks = [i for i in range(nt) if nfes_per_task[i] < max_nfes_per_task[i]] if not active_tasks: break for i in active_tasks: # Randomly select a weight vector weight_idx = np.random.randint(0, n_weights_per_task[i]) weight = W[i][weight_idx] # Scalarize multi-objective values to single objective using augmented Tchebycheff scalar_objs = self._scalarize(objs[i], weight) # Fit GP surrogate and select next candidate via BO with EI candidate_np = bo_next_point(dims[i], decs[i], scalar_objs, data_type=data_type) # Evaluate the candidate solution (get true multi-objective values) obj, _ = evaluation_single(problem, candidate_np, i) # Update dataset with new sample decs[i], objs[i] = vstack_groups((decs[i], candidate_np), (objs[i], obj)) nfes_per_task[i] += 1 pbar.update(1) pbar.close() runtime = time.time() - start_time # Convert database to staircase history structure for results db_decs = [decs[i].copy() for i in range(nt)] db_objs = [objs[i].copy() for i in range(nt)] all_decs, all_objs = build_staircase_history(db_decs, db_objs, k=1) results = build_save_results(all_decs=all_decs, all_objs=all_objs, runtime=runtime, max_nfes=nfes_per_task, bounds=problem.bounds, save_path=self.save_path, filename=self.name, save_data=self.save_data) return results
def _scalarize(self, objs, weight): """ Scalarize multi-objective values using augmented Tchebycheff approach. The augmented Tchebycheff function is defined as: g(f|w) = max_j(w_j * f_j) + rho * sum_j(w_j * f_j) This scalarization is used in ParEGO to convert multi-objective problems to single-objective problems that can be optimized using standard BO. Parameters ---------- objs : np.ndarray Multi-objective values of shape (N, M), where N is the number of samples and M is the number of objectives weight : np.ndarray Weight vector of shape (M,) Returns ------- scalar_objs : np.ndarray Scalarized objective values of shape (N, 1) """ # Normalize objectives to [0, 1] range for each objective dimension obj_min = np.min(objs, axis=0) obj_max = np.max(objs, axis=0) obj_range = obj_max - obj_min # Avoid division by zero obj_range = np.maximum(obj_range, 1e-10) normalized_objs = (objs - obj_min) / obj_range # Augmented Tchebycheff scalarization # g(f|w) = max_j(w_j * f_j) + rho * sum_j(w_j * f_j) weighted_objs = normalized_objs * weight # Max term (Tchebycheff) max_term = np.max(weighted_objs, axis=1) # Augmentation term aug_term = self.rho * np.sum(weighted_objs, axis=1) # Combined scalarized objective scalar_objs = max_term + aug_term return scalar_objs.reshape(-1, 1)