Source code for ddmtolab.Algorithms.STMO.PEA

"""
Pareto-based Efficient Algorithm (PEA)

This module implements PEA for computationally expensive multi-objective optimization.
It uses Constrained Probabilistic Pareto Dominance (CPPD) sorting that accounts for
prediction uncertainty from Kriging models during evolutionary search, and selects
promising candidates for expensive re-evaluation using diversity-based strategies.

References
----------
    [1] T. Sonoda and M. Nakata. Multiple Objective Optimization Based on Kriging Surrogate Model with Constrained Probabilistic Pareto Dominance. IEEE Congress on Evolutionary Computation (CEC), 2020.

Notes
-----
Author: Jiangtao Shen
Date: 2026.02.17
Version: 1.0
"""
from tqdm import tqdm
import time
import torch
import numpy as np
from scipy.spatial.distance import cdist
from scipy.stats import norm
from ddmtolab.Methods.Algo_Methods.algo_utils import *
from ddmtolab.Methods.Algo_Methods.bo_utils import mo_gp_build, mo_gp_predict
import warnings

warnings.filterwarnings("ignore")


[docs] class PEA: """ Pareto-based Efficient Algorithm for expensive multi-objective optimization using Constrained Probabilistic Pareto Dominance and Kriging surrogates. """ algorithm_information = { 'n_tasks': '[1, K]', 'dims': 'unequal', 'objs': 'unequal', 'n_objs': '[2, M]', 'cons': 'equal', 'n_cons': '[0, C]', 'expensive': 'True', 'knowledge_transfer': 'False', 'n_initial': 'unequal', 'max_nfes': 'unequal' } @classmethod def get_algorithm_information(cls, print_info=True): return get_algorithm_information(cls, print_info)
[docs] def __init__(self, problem, n_initial=None, max_nfes=None, n=100, wmax=20, mu=5, save_data=True, save_path='./Data', name='PEA', disable_tqdm=True): """ Initialize PEA algorithm. Parameters ---------- problem : MTOP Multi-task optimization problem instance n_initial : int or List[int], optional Number of initial samples per task (default: 11*dim-1) max_nfes : int or List[int], optional Maximum number of function evaluations per task (default: 200) n : int or List[int], optional Population/archive size per task (default: 100) wmax : int, optional Number of inner surrogate evolution generations (default: 20) mu : int, optional Number of candidate solutions per iteration (default: 5) save_data : bool, optional Whether to save optimization data (default: True) save_path : str, optional Path to save results (default: './Data') name : str, optional Name for the experiment (default: 'PEA') disable_tqdm : bool, optional Whether to disable progress bar (default: True) """ self.problem = problem self.n_initial = n_initial self.max_nfes = max_nfes if max_nfes is not None else 200 self.n = n self.wmax = wmax self.mu = mu self.save_data = save_data self.save_path = save_path self.name = name self.disable_tqdm = disable_tqdm
[docs] def optimize(self): """ Execute the PEA algorithm. Returns ------- Results Optimization results containing decision variables, objectives, and runtime """ data_type = torch.float start_time = time.time() problem = self.problem nt = problem.n_tasks dims = problem.dims n_objs = problem.n_objs if self.n_initial is None: n_initial_per_task = [11 * dims[i] - 1 for i in range(nt)] else: n_initial_per_task = par_list(self.n_initial, nt) max_nfes_per_task = par_list(self.max_nfes, nt) n_per_task = par_list(self.n, nt) # Initialize with LHS decs = initialization(problem, n_initial_per_task, method='lhs') objs, _ = evaluation(problem, decs) nfes_per_task = n_initial_per_task.copy() # Initialize working populations (indices into database) pop_indices = [] for i in range(nt): N = n_per_task[i] if decs[i].shape[0] <= N: pop_indices.append(np.arange(decs[i].shape[0])) else: idx = _pop_update(objs[i], None, N) pop_indices.append(np.where(idx)[0]) pbar = tqdm(total=sum(max_nfes_per_task), initial=sum(n_initial_per_task), desc=f"{self.name}", disable=self.disable_tqdm) while sum(nfes_per_task) < sum(max_nfes_per_task): active_tasks = [i for i in range(nt) if nfes_per_task[i] < max_nfes_per_task[i]] if not active_tasks: break for i in active_tasks: M = n_objs[i] N = n_per_task[i] # ===== Build GP models ===== try: obj_models = mo_gp_build(decs[i], objs[i], data_type) except Exception: continue # ===== Evolutionary Search ===== P_decs = decs[i][pop_indices[i]] P_objs = objs[i][pop_indices[i]] pop_decs, pop_objs, pop_mse = _evo_search( P_decs, P_objs, obj_models, M, N, self.wmax, data_type ) # ===== Candidate Selection ===== candidates = _candi_select( pop_decs, pop_objs, pop_mse, decs[i], objs[i], self.mu, N ) if candidates is None or candidates.shape[0] == 0: continue # ===== Evaluate Candidates ===== cand_objs, _ = evaluation_single(problem, candidates, i) # Update database decs[i] = np.vstack([decs[i], candidates]) objs[i] = np.vstack([objs[i], cand_objs]) # Update working population idx = _pop_update(objs[i], None, N) pop_indices[i] = np.where(idx)[0] nfes_per_task[i] += candidates.shape[0] pbar.update(candidates.shape[0]) pbar.close() runtime = time.time() - start_time all_decs, all_objs = build_staircase_history(decs, objs, k=self.mu) results = build_save_results(all_decs=all_decs, all_objs=all_objs, runtime=runtime, max_nfes=nfes_per_task, bounds=problem.bounds, save_path=self.save_path, filename=self.name, save_data=self.save_data) return results
# ============================================================================= # Evolutionary Search # ============================================================================= def _evo_search(P_decs, P_objs, obj_models, M, N, wmax, data_type): """ Surrogate-based evolutionary search using CPPD environmental selection. Parameters ---------- P_decs : np.ndarray Current population decisions, shape (N, D) P_objs : np.ndarray Current population objectives, shape (N, M) obj_models : list List of trained GP models M : int Number of objectives N : int Population size wmax : int Number of generations data_type : torch.dtype Data type for GP Returns ------- pop_decs, pop_objs, pop_mse : np.ndarray Final population with predicted objectives and MSE """ pop_decs = P_decs.copy() pop_objs = P_objs.copy() pop_mse = np.zeros_like(pop_objs) for w in range(wmax): # Generate offspring via GA off_decs = ga_generation(pop_decs, muc=20, mum=20) # Predict objectives and MSE for offspring off_objs, off_mse = _predict_with_mse(off_decs, obj_models, M, data_type) # Merge parent + offspring merged_decs = np.vstack([pop_decs, off_decs]) merged_objs = np.vstack([pop_objs, off_objs]) merged_mse = np.vstack([pop_mse, off_mse]) # Environmental selection using CPPD selected = _environmental_selection(merged_objs, merged_mse, N) pop_decs = merged_decs[selected] pop_objs = merged_objs[selected] pop_mse = merged_mse[selected] return pop_decs, pop_objs, pop_mse def _predict_with_mse(pop_decs, obj_models, M, data_type): """Predict objectives and MSE using GP models.""" N = pop_decs.shape[0] pop_objs = np.zeros((N, M)) pop_mse = np.zeros((N, M)) for j in range(M): pred, std = gp_predict_single(obj_models[j], pop_decs, data_type) pop_objs[:, j] = pred.flatten() pop_mse[:, j] = (std.flatten()) ** 2 return pop_objs, pop_mse def gp_predict_single(gp, test_X, data_type=torch.float): """Predict using a single GP model. Wrapper around bo_utils gp_predict.""" from ddmtolab.Methods.Algo_Methods.bo_utils import gp_predict return gp_predict(gp, test_X, data_type) # ============================================================================= # Constrained Probabilistic Pareto Dominance (CPPD) Sorting # ============================================================================= def _ndsort_cppd(pop_objs, obj_mse, n_sort): """ Non-dominated sorting based on Constrained Probabilistic Pareto Dominance. For unconstrained problems, feasibility probability = 1 for all solutions. Parameters ---------- pop_objs : np.ndarray Predicted objectives, shape (N, M) obj_mse : np.ndarray Prediction MSE, shape (N, M) n_sort : int Number of solutions to sort Returns ------- front_no : np.ndarray Front assignment, shape (N,). inf = unassigned. max_fno : int Last assigned front number """ N, M = pop_objs.shape # Build probabilistic dominance matrix # For each pair (i, j), check if i probabilistically dominates j # x_PD[i,j,k] = P(obj_j_k <= obj_i_k) -> probability j is at least as good as i on obj k # y_PD[i,j,k] = P(obj_i_k <= obj_j_k) -> probability i is at least as good as j on obj k # Pairwise differences and combined std # mean_diff[i,j,k] = obj_i_k - obj_j_k mean_diff = pop_objs[:, np.newaxis, :] - pop_objs[np.newaxis, :, :] # (N, N, M) sigma_sum = obj_mse[:, np.newaxis, :] + obj_mse[np.newaxis, :, :] # (N, N, M) sigma = np.sqrt(np.maximum(sigma_sum, 1e-20)) # P(obj_i_k - obj_j_k <= 0) = Phi(-mean_diff / sigma) = Phi((obj_j - obj_i) / sigma) # This is probability i is better than or equal to j on objective k z = -mean_diff / sigma y_PD = norm.cdf(z) # P(i <= j) on each objective x_PD = 1.0 - y_PD # P(j <= i) on each objective # For unconstrained: feasibility = 1, so no modification needed # x_PD and y_PD already represent dominance probabilities # Dominance: i dominates j iff all(x_PD[i,j,:] <= y_PD[i,j,:]) and not all equal # This means: for all k, P(j <= i) <= P(i <= j), with strict somewhere # i.e., i is probabilistically at least as good as j on all objectives dominates = np.all(x_PD <= y_PD, axis=2) & ~np.all( np.abs(x_PD - y_PD) < 1e-10, axis=2) # (N, N) np.fill_diagonal(dominates, False) # Non-dominated sorting front_no = np.full(N, np.inf) max_fno = 0 n_assigned = 0 # Count how many solutions dominate each solution dom_count = np.sum(dominates, axis=0) # dom_count[j] = how many dominate j remaining = np.ones(N, dtype=bool) while n_assigned < min(n_sort, N): max_fno += 1 current = np.where(remaining)[0] if len(current) == 0: break # Compute domination count among remaining sub_dom_count = np.sum(dominates[np.ix_(current, current)], axis=0) # Find minimum domination count min_count = np.min(sub_dom_count) frontal = current[sub_dom_count == min_count] front_no[frontal] = max_fno remaining[frontal] = False n_assigned += len(frontal) return front_no, max_fno def _environmental_selection(pop_objs, pop_mse, N): """ Environmental selection using CPPD sorting + diversity truncation. Parameters ---------- pop_objs : np.ndarray Objectives, shape (n_total, M) pop_mse : np.ndarray MSE, shape (n_total, M) N : int Target size Returns ------- selected : np.ndarray Indices of selected solutions """ n_total = pop_objs.shape[0] front_no, max_fno = _ndsort_cppd(pop_objs, pop_mse, N) # Select all solutions from fronts before last selected = np.where(front_no < max_fno)[0] last_front = np.where(front_no == max_fno)[0] n_remaining = N - len(selected) if n_remaining <= 0: return selected[:N] if len(last_front) <= n_remaining: selected = np.concatenate([selected, last_front]) elif max_fno == 1: # Only one front: truncation by distance chosen = spea2_truncation_fast(pop_objs[last_front], n_remaining) selected = np.concatenate([selected, last_front[chosen]]) else: # Multiple fronts: diversity selection from last front chosen = _dist_selection(pop_objs[selected], pop_objs[last_front], n_remaining) selected = np.concatenate([selected, last_front[chosen]]) return selected # ============================================================================= # Candidate Selection # ============================================================================= def _candi_select(pop_decs, pop_objs, pop_mse, db_decs, db_objs, mu, N): """ Select mu candidate solutions for expensive re-evaluation. Parameters ---------- pop_decs : np.ndarray Evolved population decisions pop_objs : np.ndarray Evolved population predicted objectives pop_mse : np.ndarray Evolved population prediction MSE db_decs : np.ndarray All evaluated decisions (database) db_objs : np.ndarray All evaluated objectives (database) mu : int Number of candidates to select N : int Population size Returns ------- candidates : np.ndarray or None Selected candidate decisions, shape (n_selected, D) """ n_pop = pop_decs.shape[0] # Check for duplicates against database if db_decs.shape[0] > 0: dist_to_db = cdist(pop_decs, db_decs) min_dist = np.min(dist_to_db, axis=1) novel_mask = min_dist > 1e-5 else: novel_mask = np.ones(n_pop, dtype=bool) n_novel = np.sum(novel_mask) if n_novel == 0: return None if n_novel <= mu: return pop_decs[novel_mask] # Keep only novel solutions novel_decs = pop_decs[novel_mask] novel_objs = pop_objs[novel_mask] novel_mse = pop_mse[novel_mask] # Normalize objectives all_objs = np.vstack([db_objs, novel_objs]) zmin = np.min(all_objs, axis=0) zmax = np.max(all_objs, axis=0) obj_range = np.maximum(zmax - zmin, 1e-10) norm_objs = (novel_objs - zmin) / obj_range norm_mse = novel_mse / (obj_range ** 2) norm_db_objs = (db_objs - zmin) / obj_range # Build reference set from database (non-dominated front) n_db = db_objs.shape[0] front_no_db, _ = nd_sort(db_objs, n_db) ref_objs = norm_db_objs[front_no_db == 1] # CPPD sorting on novel solutions front_no, max_fno = _ndsort_cppd(norm_objs, norm_mse, mu) # Select from best fronts selected = [] for fno in range(1, max_fno + 1): front_idx = np.where(front_no == fno)[0] if len(selected) + len(front_idx) <= mu: selected.extend(front_idx.tolist()) else: # Need to select from this front with diversity n_need = mu - len(selected) if len(selected) == 0: # First front only, use truncation chosen = spea2_truncation_fast(norm_objs[front_idx], n_need) selected.extend(front_idx[chosen].tolist()) else: # Use distance-based selection already_objs = norm_objs[np.array(selected)] chosen = _dist_selection(already_objs, norm_objs[front_idx], n_need) selected.extend(front_idx[chosen].tolist()) break if len(selected) == 0: return None # Final duplicate check candidates = [] for idx in selected: c = novel_decs[idx] if db_decs.shape[0] > 0: d = cdist(c.reshape(1, -1), db_decs) if np.min(d) > 1e-5: candidates.append(c) else: candidates.append(c) if len(candidates) == 0: return None return np.array(candidates) # ============================================================================= # Population Update (for real evaluated solutions) # ============================================================================= def _pop_update(pop_objs, pop_cons, N): """ Standard non-dominated sorting based population update. Parameters ---------- pop_objs : np.ndarray All evaluated objectives pop_cons : np.ndarray or None All evaluated constraints N : int Target population size Returns ------- next_mask : np.ndarray Boolean mask for selected solutions """ n_total = pop_objs.shape[0] if n_total <= N: return np.ones(n_total, dtype=bool) front_no, max_fno = nd_sort(pop_objs, N) next_mask = front_no < max_fno last_front = np.where(front_no == max_fno)[0] n_remaining = N - np.sum(next_mask) if n_remaining <= 0: return next_mask if len(last_front) <= n_remaining: next_mask[last_front] = True elif max_fno == 1: chosen = spea2_truncation_fast(pop_objs[last_front], n_remaining) next_mask[last_front[chosen]] = True else: selected_objs = pop_objs[next_mask] chosen = _dist_selection(selected_objs, pop_objs[last_front], n_remaining) next_mask[last_front[chosen]] = True return next_mask # ============================================================================= # Diversity Helper Functions # ============================================================================= def _dist_selection(selected_objs, candidate_objs, n_select): """ Select n_select solutions from candidates maximizing min distance to selected. Parameters ---------- selected_objs : np.ndarray Already selected objectives, shape (N1, M) candidate_objs : np.ndarray Candidate objectives, shape (N2, M) n_select : int Number to select Returns ------- chosen : list Indices into candidate_objs """ N2 = candidate_objs.shape[0] if N2 <= n_select: return list(range(N2)) # Combined distance matrix all_objs = np.vstack([selected_objs, candidate_objs]) dist = cdist(all_objs, all_objs) np.fill_diagonal(dist, np.inf) N1 = selected_objs.shape[0] chosen_set = set(range(N1)) remaining = set(range(N1, N1 + N2)) chosen = [] for _ in range(n_select): if not remaining: break best_idx = None best_min_dist = -1.0 for idx in remaining: min_d = min(dist[idx, c] for c in chosen_set) if min_d > best_min_dist: best_min_dist = min_d best_idx = idx chosen_set.add(best_idx) remaining.remove(best_idx) chosen.append(best_idx - N1) return chosen