"""
Multigranularity Surrogate-Assisted Evolutionary Algorithm (MGSAEA)
This module implements MGSAEA for computationally expensive constrained multi-objective
optimization. It uses a two-stage framework:
- Stage 1 (convergence stage): Builds surrogates for objectives only, ignoring constraints
- Stage 2 (constraint stage): Adaptively selects constraint handling strategy based on
constraint satisfaction status (all violated, partially violated, all satisfied)
Stage transition is triggered when the ideal point change rate drops below a threshold,
indicating convergence of the unconstrained search.
References
----------
[1] Y. Zhang, H. Jiang, Y. Tian, H. Ma, and X. Zhang. Multigranularity surrogate modeling for evolutionary multiobjective optimization with expensive constraints. IEEE Transactions on Neural Networks and Learning Systems, 2024, 35(3): 2956-2968.
Notes
-----
Author: Jiangtao Shen
Date: 2026.02.17
Version: 1.0
"""
from tqdm import tqdm
import time
import torch
import numpy as np
from scipy.spatial.distance import cdist
from ddmtolab.Methods.Algo_Methods.algo_utils import *
from ddmtolab.Methods.Algo_Methods.bo_utils import gp_build, gp_predict
import warnings
warnings.filterwarnings("ignore")
def _spea2_select(fitness, pop_obj, N, use_real_obj=None):
"""
SPEA2 selection: prefer fitness < 1, fill/truncate to N.
Parameters
----------
fitness : np.ndarray
pop_obj : np.ndarray, used for truncation distance
N : int, target size
use_real_obj : np.ndarray, optional, use this for truncation distance instead
Returns
-------
next_mask : np.ndarray, bool
"""
n_total = len(fitness)
next_mask = fitness < 1
if np.sum(next_mask) < N:
rank = np.argsort(fitness)
next_mask[:] = False
next_mask[rank[:min(N, n_total)]] = True
elif np.sum(next_mask) > N:
trunc_obj = use_real_obj[next_mask] if use_real_obj is not None else pop_obj[next_mask]
kept_indices = spea2_truncation(trunc_obj, N)
temp = np.where(next_mask)[0]
next_mask[:] = False
next_mask[temp[kept_indices]] = True
return next_mask
def _env_selection(pop_dec, pop_obj, NI, M, status=None):
"""
Environmental selection using SPEA2 fitness and truncation.
Parameters
----------
pop_dec : np.ndarray, shape (N, D)
pop_obj : np.ndarray, shape (N, M_ext)
NI : int, target size
M : int, number of real objectives
status : int or None, constraint handling mode (1/2/3 or None for unconstrained)
Returns
-------
pop_dec, pop_obj, fitness
"""
# Remove duplicates on real objectives
_, unique_idx = np.unique(pop_obj[:, :M], axis=0, return_index=True)
unique_idx = np.sort(unique_idx)
pop_dec = pop_dec[unique_idx]
pop_obj = pop_obj[unique_idx]
N = pop_dec.shape[0]
if N == 0:
return pop_dec, pop_obj, np.array([])
# Calculate fitness based on status
if status is None:
# Stage 1: unconstrained
fitness = spea2_fitness(pop_obj)
next_mask = _spea2_select(fitness, pop_obj, NI)
elif status == 1:
real_obj = pop_obj[:, :M]
cv = np.maximum(0, pop_obj[:, -1:])
fitness = spea2_fitness(real_obj, cv)
next_mask = _spea2_select(fitness, pop_obj, NI, use_real_obj=real_obj)
elif status == 2:
real_obj = pop_obj[:, :M]
pop_con = pop_obj[:, M:]
cv = np.sum(np.maximum(0, pop_con), axis=1, keepdims=True)
fitness = spea2_fitness(np.hstack([real_obj, cv]))
next_mask = _spea2_select(fitness, pop_obj, NI, use_real_obj=real_obj)
else: # status == 3
fitness = spea2_fitness(pop_obj)
next_mask = _spea2_select(fitness, pop_obj, NI)
return pop_dec[next_mask], pop_obj[next_mask], fitness[next_mask]
# ============================================================================
# Archive and Population Update
# ============================================================================
def _update_archive(arc_decs, arc_objs, arc_cons, N):
"""Update archive with SPEA2 fitness-based selection and truncation."""
_, unique_idx = np.unique(arc_objs, axis=0, return_index=True)
unique_idx = np.sort(unique_idx)
arc_decs = arc_decs[unique_idx]
arc_objs = arc_objs[unique_idx]
arc_cons = arc_cons[unique_idx]
if arc_decs.shape[0] > N:
fitness = spea2_fitness(arc_objs, arc_cons)
next_mask = _spea2_select(fitness, arc_objs, N)
arc_decs = arc_decs[next_mask]
arc_objs = arc_objs[next_mask]
arc_cons = arc_cons[next_mask]
return arc_decs, arc_objs, arc_cons
def _update_population(pop_decs, pop_objs, pop_cons, new_decs, new_objs, new_cons,
N, status=None):
"""
Update population: remove duplicates with new, SPEA2 selection to N, append new.
Parameters
----------
status : int or None
None for Stage 1 (unconstrained), 1/2/3 for Stage 2
"""
# Remove solutions duplicating new
if new_objs.shape[0] > 0:
keep = []
for i in range(pop_objs.shape[0]):
is_dup = np.any(np.all(np.abs(pop_objs[i] - new_objs) < 1e-10, axis=1))
if not is_dup:
keep.append(i)
if len(keep) > 0:
keep = np.array(keep)
pop_decs = pop_decs[keep]
pop_objs = pop_objs[keep]
pop_cons = pop_cons[keep]
else:
pop_decs = np.empty((0, pop_decs.shape[1]))
pop_objs = np.empty((0, pop_objs.shape[1]))
pop_cons = np.empty((0, pop_cons.shape[1]))
# Deduplicate within population
if pop_objs.shape[0] > 0:
_, unique_idx = np.unique(pop_objs, axis=0, return_index=True)
unique_idx = np.sort(unique_idx)
pop_decs = pop_decs[unique_idx]
pop_objs = pop_objs[unique_idx]
pop_cons = pop_cons[unique_idx]
# SPEA2 selection to N
if pop_decs.shape[0] > N:
if status is None or status == 3:
# Unconstrained or all constraints satisfied
fitness = spea2_fitness(pop_objs, pop_cons)
elif status == 1:
fitness = spea2_fitness(pop_objs, pop_cons)
elif status == 2:
cv = np.sum(np.maximum(0, pop_cons), axis=1, keepdims=True)
fitness = spea2_fitness(np.hstack([pop_objs, cv]))
next_mask = _spea2_select(fitness, pop_objs, N)
pop_decs = pop_decs[next_mask]
pop_objs = pop_objs[next_mask]
pop_cons = pop_cons[next_mask]
# Append new
if new_decs.shape[0] > 0:
pop_decs = np.vstack([pop_decs, new_decs])
pop_objs = np.vstack([pop_objs, new_objs])
pop_cons = np.vstack([pop_cons, new_cons])
return pop_decs, pop_objs, pop_cons
# ============================================================================
# Constraint Normalization
# ============================================================================
def _normalize_cv(pop_con):
"""Normalize constraint violations and return aggregated scalar CV."""
pop_con = np.maximum(0, pop_con)
cmin = np.min(pop_con, axis=0)
cmax = np.max(pop_con, axis=0)
denom = cmax - cmin
denom[denom == 0] = 1.0
pop_con_norm = (pop_con - cmin) / denom
pop_con_norm = np.nan_to_num(pop_con_norm, nan=0.0)
return np.sum(pop_con_norm, axis=1, keepdims=True)
# ============================================================================
# Ideal Point Change Rate
# ============================================================================
def _calc_max_change(ideal_points, current_iter, gap):
"""
Calculate the maximum change rate of ideal points over the last `gap` iterations.
Parameters
----------
ideal_points : list of np.ndarray
Ideal points at each iteration.
current_iter : int
Current iteration index (0-based).
gap : int
Window size for change rate computation.
Returns
-------
max_change : float
"""
delta = 1e-6
current = ideal_points[current_iter]
previous = ideal_points[current_iter - gap]
rz = np.abs((current - previous) / np.maximum(np.abs(previous), delta))
return np.max(rz)
# ============================================================================
# Main Algorithm
# ============================================================================
[docs]
class MGSAEA:
"""
Multigranularity Surrogate-Assisted Evolutionary Algorithm for expensive
constrained multi-objective optimization.
Two-stage approach:
- Stage 1: Objective-only surrogates until ideal points converge
- Stage 2: Constraint-aware surrogates with adaptive granularity
Attributes
----------
algorithm_information : dict
Dictionary containing algorithm capabilities and requirements
"""
algorithm_information = {
'n_tasks': '[1, K]',
'dims': 'unequal',
'objs': 'unequal',
'n_objs': '[2, M]',
'cons': 'unequal',
'n_cons': '[0, C]',
'expensive': 'True',
'knowledge_transfer': 'False',
'n_initial': 'unequal',
'max_nfes': 'unequal'
}
@classmethod
def get_algorithm_information(cls, print_info=True):
return get_algorithm_information(cls, print_info)
[docs]
def __init__(self, problem, n_initial=None, max_nfes=None, n=100,
wmax=20, mu=5, gap=20, lam=1e-3,
save_data=True, save_path='./Data', name='MGSAEA', disable_tqdm=True):
"""
Initialize MGSAEA algorithm.
Parameters
----------
problem : MTOP
Multi-task optimization problem instance
n_initial : int or List[int], optional
Number of initial samples per task (default: 11*dim-1)
max_nfes : int or List[int], optional
Maximum number of function evaluations per task (default: 200)
n : int or List[int], optional
Archive/population size per task (default: 100)
wmax : int, optional
Number of inner GA generations on surrogates (default: 20)
mu : int, optional
Number of real evaluated solutions per iteration (default: 5)
gap : int, optional
Window for ideal point change rate computation (default: 20)
lam : float, optional
Threshold for stage transition (default: 1e-3)
save_data : bool, optional
Whether to save optimization data (default: True)
save_path : str, optional
Path to save results (default: './Data')
name : str, optional
Name for the experiment (default: 'MGSAEA')
disable_tqdm : bool, optional
Whether to disable progress bar (default: True)
"""
self.problem = problem
self.n_initial = n_initial
self.max_nfes = max_nfes if max_nfes is not None else 200
self.n = n
self.wmax = wmax
self.mu = mu
self.gap = gap
self.lam = lam
self.save_data = save_data
self.save_path = save_path
self.name = name
self.disable_tqdm = disable_tqdm
[docs]
def optimize(self):
"""
Execute the MGSAEA algorithm.
Returns
-------
Results
Optimization results containing decision variables, objectives, and runtime
"""
data_type = torch.float
start_time = time.time()
problem = self.problem
nt = problem.n_tasks
dims = problem.dims
n_objs = problem.n_objs
n_cons = problem.n_cons
# Set default initial samples: 11*dim - 1
if self.n_initial is None:
n_initial_per_task = [11 * dims[i] - 1 for i in range(nt)]
else:
n_initial_per_task = par_list(self.n_initial, nt)
max_nfes_per_task = par_list(self.max_nfes, nt)
n_per_task = par_list(self.n, nt)
# Generate initial samples using LHS
decs = initialization(problem, n_initial_per_task, method='lhs')
objs, cons = evaluation(problem, decs)
nfes_per_task = n_initial_per_task.copy()
# History tracking
has_cons = any(c.shape[1] > 0 for c in cons)
pbar = tqdm(total=sum(max_nfes_per_task), initial=sum(n_initial_per_task),
desc=f"{self.name}", disable=self.disable_tqdm)
# Per-task optimization
for task_i in range(nt):
m = n_objs[task_i]
dim = dims[task_i]
nc = n_cons[task_i]
NI = n_initial_per_task[task_i]
N_archive = n_per_task[task_i]
# Population and archive for this task
pop_decs = decs[task_i].copy()
pop_objs = objs[task_i].copy()
pop_cons = cons[task_i].copy() if nc > 0 else np.zeros((NI, 0))
# Initialize archive
arc_decs = pop_decs.copy()
arc_objs = pop_objs.copy()
arc_cons = pop_cons.copy() if nc > 0 else np.zeros((NI, 0))
if nc > 0:
arc_cons_for_update = arc_cons.copy()
else:
arc_cons_for_update = np.zeros((arc_decs.shape[0], 1))
arc_decs, arc_objs, arc_cons_for_update = _update_archive(
arc_decs, arc_objs, arc_cons_for_update, N_archive
)
if nc > 0:
arc_cons = arc_cons_for_update
else:
arc_cons = np.zeros((arc_decs.shape[0], 0))
# Stage flag: 0 = Stage 1 (convergence), 1 = Stage 2 (constraint)
flag = 0
iteration = 0
ideal_points = []
while nfes_per_task[task_i] < max_nfes_per_task[task_i]:
# Track ideal points
ideal_points.append(np.min(pop_objs, axis=0))
# Check stage transition
if iteration > self.gap and flag == 0:
max_change = _calc_max_change(ideal_points, iteration, self.gap)
if max_change <= self.lam:
flag = 1
if flag == 0:
# ===== Stage 1: Objective-only surrogates =====
surr_objs = pop_objs.copy()
M_surr = m
models = []
for j in range(M_surr):
gp = gp_build(pop_decs, surr_objs[:, j:j+1], data_type)
models.append(gp)
fitness = spea2_fitness(pop_objs)
# Inner GA loop on surrogates
inner_decs = pop_decs.copy()
inner_objs = surr_objs.copy()
inner_fitness = fitness.copy()
for w in range(self.wmax):
mating_pool = tournament_selection(2, NI, -inner_fitness)
off_decs = ga_generation(inner_decs[mating_pool], muc=20.0, mum=20.0)
inner_decs = np.vstack([inner_decs, off_decs])
N_inner = inner_decs.shape[0]
pred_objs = np.zeros((N_inner, M_surr))
for j in range(M_surr):
pred_j, _ = gp_predict(models[j], inner_decs, data_type)
pred_objs[:, j] = pred_j.ravel()
inner_objs = pred_objs
inner_decs, inner_objs, inner_fitness = _env_selection(
inner_decs, inner_objs, NI, m, status=None
)
# Select mu solutions for real evaluation
sel_decs, _, _ = _env_selection(inner_decs, inner_objs, self.mu, m, status=None)
status_for_update = None
else:
# ===== Stage 2: Constraint-aware surrogates =====
if nc > 0:
max_con_per_col = np.max(np.maximum(0, pop_cons), axis=0)
n_inf = np.sum(max_con_per_col > 0)
else:
n_inf = 0
if nc > 0 and n_inf == nc:
# Status 1: All constraints violated
status = 1
cv_col = _normalize_cv(pop_cons)
surr_objs = np.hstack([pop_objs, cv_col])
M_surr = m + 1
models = []
for j in range(M_surr):
gp = gp_build(pop_decs, surr_objs[:, j:j+1], data_type)
models.append(gp)
fitness = spea2_fitness(pop_objs, pop_cons)
elif nc > 0 and 0 < n_inf < nc:
# Status 2: Partial constraint violation
status = 2
raw_con = np.maximum(0, pop_cons)
max_con_per_col = np.max(raw_con, axis=0)
active_idx = np.where(max_con_per_col > 0)[0]
active_con = raw_con[:, active_idx]
cmin = np.min(active_con, axis=0)
cmax = np.max(active_con, axis=0)
denom = cmax - cmin
denom[denom == 0] = 1.0
active_con_norm = (active_con - cmin) / denom
surr_objs = np.hstack([pop_objs, active_con_norm])
M_surr = surr_objs.shape[1]
models = []
for j in range(M_surr):
gp = gp_build(pop_decs, surr_objs[:, j:j+1], data_type)
models.append(gp)
cv_agg = np.sum(np.maximum(0, pop_cons), axis=1, keepdims=True)
fitness = spea2_fitness(np.hstack([pop_objs, cv_agg]))
else:
# Status 3: All constraints satisfied (or unconstrained)
status = 3
surr_objs = pop_objs.copy()
M_surr = m
models = []
for j in range(M_surr):
gp = gp_build(pop_decs, surr_objs[:, j:j+1], data_type)
models.append(gp)
fitness = spea2_fitness(pop_objs)
# Inner GA loop on surrogates
inner_decs = pop_decs.copy()
inner_objs = surr_objs.copy()
inner_fitness = fitness.copy()
for w in range(self.wmax):
mating_pool = tournament_selection(2, NI, -inner_fitness)
off_decs = ga_generation(inner_decs[mating_pool], muc=20.0, mum=20.0)
inner_decs = np.vstack([inner_decs, off_decs])
N_inner = inner_decs.shape[0]
pred_objs = np.zeros((N_inner, M_surr))
for j in range(M_surr):
pred_j, _ = gp_predict(models[j], inner_decs, data_type)
pred_objs[:, j] = pred_j.ravel()
inner_objs = pred_objs
inner_decs, inner_objs, inner_fitness = _env_selection(
inner_decs, inner_objs, NI, m, status=status
)
# Select mu solutions for real evaluation
sel_decs, _, _ = _env_selection(
inner_decs, inner_objs, self.mu, m, status=status
)
status_for_update = status
# Remove duplicates
sel_decs = remove_duplicates(sel_decs, decs[task_i])
if sel_decs.shape[0] == 0:
sel_decs = np.random.rand(self.mu, dim)
sel_decs = remove_duplicates(sel_decs, decs[task_i])
if sel_decs.shape[0] == 0:
iteration += 1
continue
# Expensive evaluation
new_objs, new_cons = evaluation_single(problem, sel_decs, task_i)
n_new = sel_decs.shape[0]
# Update cumulative dataset
decs[task_i] = np.vstack([decs[task_i], sel_decs])
objs[task_i] = np.vstack([objs[task_i], new_objs])
if nc > 0:
cons[task_i] = np.vstack([cons[task_i], new_cons])
# Update population
pop_cons_for_update = pop_cons if nc > 0 else np.zeros((pop_decs.shape[0], max(1, nc)))
new_cons_for_update = new_cons if nc > 0 else np.zeros((n_new, max(1, nc)))
pop_decs, pop_objs, pop_cons_updated = _update_population(
pop_decs, pop_objs, pop_cons_for_update,
sel_decs, new_objs, new_cons_for_update,
NI - self.mu, status=status_for_update
)
if nc == 0:
pop_cons = np.zeros((pop_decs.shape[0], 0))
else:
pop_cons = pop_cons_updated
# Update archive
combined_arc_decs = np.vstack([arc_decs, sel_decs])
combined_arc_objs = np.vstack([arc_objs, new_objs])
if nc > 0:
combined_arc_cons = np.vstack([arc_cons, new_cons])
else:
combined_arc_cons = np.zeros((combined_arc_objs.shape[0], 1))
arc_decs, arc_objs, arc_cons_full = _update_archive(
combined_arc_decs, combined_arc_objs, combined_arc_cons, N_archive
)
if nc == 0:
arc_cons = np.zeros((arc_decs.shape[0], 0))
else:
arc_cons = arc_cons_full
nfes_per_task[task_i] += n_new
pbar.update(n_new)
iteration += 1
pbar.close()
runtime = time.time() - start_time
if has_cons:
all_decs, all_objs, all_cons = build_staircase_history(decs, objs, k=self.mu, db_cons=cons)
else:
all_decs, all_objs = build_staircase_history(decs, objs, k=self.mu)
all_cons = None
results = build_save_results(
all_decs, all_objs, runtime, max_nfes_per_task,
all_cons=all_cons if has_cons else None,
bounds=problem.bounds,
save_path=self.save_path,
filename=self.name,
save_data=self.save_data
)
return results