"""
Adaptive Dropout based Surrogate-Assisted Particle Swarm Optimization (ADSAPSO)
This module implements ADSAPSO for computationally expensive multi/many-objective optimization.
It uses adaptive dropout to select important decision variables, builds RBF surrogate models
in the reduced space, and applies PSO on surrogates to find promising solutions.
References
----------
[1] J. Lin, C. He, and R. Cheng. Adaptive dropout for high-dimensional expensive multiobjective optimization. Complex & Intelligent Systems, 2022, 8(1): 271-285.
Notes
-----
Author: Jiangtao Shen
Date: 2026.02.17
Version: 1.0
"""
from tqdm import tqdm
import time
import numpy as np
from scipy.spatial.distance import cdist
from ddmtolab.Methods.Algo_Methods.algo_utils import *
import warnings
warnings.filterwarnings("ignore")
[docs]
class ADSAPSO:
"""
Adaptive Dropout based Surrogate-Assisted Particle Swarm Optimization
for expensive multi/many-objective optimization.
Attributes
----------
algorithm_information : dict
Dictionary containing algorithm capabilities and requirements
"""
algorithm_information = {
'n_tasks': '[1, K]',
'dims': 'unequal',
'objs': 'unequal',
'n_objs': '[2, M]',
'cons': 'equal',
'n_cons': '0',
'expensive': 'True',
'knowledge_transfer': 'False',
'n_initial': 'unequal',
'max_nfes': 'unequal'
}
@classmethod
def get_algorithm_information(cls, print_info=True):
return get_algorithm_information(cls, print_info)
[docs]
def __init__(self, problem, n_initial=100, max_nfes=None, n=100, k=5, beta=0.5,
n_a=200, n_s=50, save_data=True, save_path='./Data', name='ADSAPSO',
disable_tqdm=True):
"""
Initialize ADSAPSO algorithm.
Parameters
----------
problem : MTOP
Multi-task optimization problem instance
n_initial : int or List[int], optional
Number of initial samples per task (default: 100)
max_nfes : int or List[int], optional
Maximum number of function evaluations per task (default: 200)
n : int or List[int], optional
Population size for PSO per task (default: 100)
k : int, optional
Number of re-evaluated solutions per generation (default: 5)
beta : float, optional
Percentage of dropout (fraction of dimensions to keep) (default: 0.5)
n_a : int, optional
Number of solutions for building surrogate models (default: 200)
n_s : int, optional
Number of well/poorly performing solutions for dimension analysis (default: 50)
save_data : bool, optional
Whether to save optimization data (default: True)
save_path : str, optional
Path to save results (default: './Data')
name : str, optional
Name for the experiment (default: 'ADSAPSO')
disable_tqdm : bool, optional
Whether to disable progress bar (default: True)
"""
self.problem = problem
self.n_initial = n_initial
self.max_nfes = max_nfes if max_nfes is not None else 200
self.n = n
self.k = k
self.beta = beta
self.n_a = n_a
self.n_s = n_s
self.save_data = save_data
self.save_path = save_path
self.name = name
self.disable_tqdm = disable_tqdm
[docs]
def optimize(self):
"""
Execute the ADSAPSO algorithm.
Returns
-------
Results
Optimization results containing decision variables, objectives, and runtime
"""
start_time = time.time()
problem = self.problem
nt = problem.n_tasks
dims = problem.dims
n_objs = problem.n_objs
n_initial_per_task = par_list(self.n_initial, nt)
max_nfes_per_task = par_list(self.max_nfes, nt)
n_per_task = par_list(self.n, nt)
# Generate initial samples using Latin Hypercube Sampling
decs = initialization(problem, n_initial_per_task, method='lhs')
objs, _ = evaluation(problem, decs)
nfes_per_task = n_initial_per_task.copy()
pbar = tqdm(total=sum(max_nfes_per_task), initial=sum(n_initial_per_task),
desc=f"{self.name}", disable=self.disable_tqdm)
stall_counter = [0] * nt
max_stall = 20 # Break if no new solutions found for this many consecutive iterations
while sum(nfes_per_task) < sum(max_nfes_per_task):
active_tasks = [i for i in range(nt) if nfes_per_task[i] < max_nfes_per_task[i]]
if not active_tasks:
break
for i in active_tasks:
M = n_objs[i]
D = dims[i]
# Main operator: dimension dropout + RBF + PSO
offspring_decs = _operator(
decs[i], objs[i], self.k, self.beta, self.n_a, self.n_s,
n_per_task[i], M, D, nfes_per_task[i], max_nfes_per_task[i]
)
# Remove duplicates against existing evaluated solutions
offspring_decs = remove_duplicates(offspring_decs, decs[i])
if offspring_decs.shape[0] == 0:
stall_counter[i] += 1
if stall_counter[i] >= max_stall:
nfes_per_task[i] = max_nfes_per_task[i] # Force exit
continue
stall_counter[i] = 0
if offspring_decs.shape[0] > 0:
# Limit to remaining budget
remaining = max_nfes_per_task[i] - nfes_per_task[i]
if offspring_decs.shape[0] > remaining:
offspring_decs = offspring_decs[:remaining]
# Evaluate with expensive objective function
offspring_objs, _ = evaluation_single(problem, offspring_decs, i)
# Update archive
decs[i] = np.vstack([decs[i], offspring_decs])
objs[i] = np.vstack([objs[i], offspring_objs])
nfes_per_task[i] += offspring_decs.shape[0]
pbar.update(offspring_decs.shape[0])
pbar.close()
runtime = time.time() - start_time
all_decs, all_objs = build_staircase_history(decs, objs, k=self.k)
results = build_save_results(all_decs=all_decs, all_objs=all_objs, runtime=runtime,
max_nfes=nfes_per_task, bounds=problem.bounds,
save_path=self.save_path, filename=self.name,
save_data=self.save_data)
return results
# =============================================================================
# Main Operator
# =============================================================================
def _operator(arc_decs, arc_objs, k, beta, n_a, n_s, n, M, D, nfes, max_nfes):
"""
Main operator: environmental selection, dimension dropout, RBF construction, PSO.
Parameters
----------
arc_decs : np.ndarray
Archive decision variables, shape (N_arc, D)
arc_objs : np.ndarray
Archive objective values, shape (N_arc, M)
k : int
Number of re-evaluated solutions
beta : float
Dropout percentage
n_a : int
Archive size for surrogate building
n_s : int
Number of well/poorly performing solutions
n : int
Population size for PSO
M : int
Number of objectives
D : int
Number of decision variables
nfes : int
Current function evaluation count
max_nfes : int
Maximum function evaluations
Returns
-------
candidate_decs : np.ndarray
New candidate solutions, shape (k, D)
"""
N_arc = arc_decs.shape[0]
# Step 1: Environmental Selection to get N_a solutions
n_a_actual = min(n_a, N_arc)
sel_decs, sel_objs, front_no, crowd_dis = _environmental_selection(
arc_decs, arc_objs, n_a_actual
)
N_sel = sel_decs.shape[0]
# Step 2: Sort by [front_no ascending, crowd_dis descending]
sort_idx = np.lexsort((-crowd_dis, front_no))
# Step 3: Select top k as candidate solutions
k_actual = min(k, N_sel)
candidate_decs = sel_decs[sort_idx[:k_actual]].copy()
# Step 4: Identify well and poorly performing solutions
n_s_actual = min(n_s, N_sel)
index_well = sort_idx[:n_s_actual]
index_poor = sort_idx[-n_s_actual:]
# Step 5: Compute dimension importance via mean difference
model_dif = np.mean(sel_decs[index_well], axis=0) - np.mean(sel_decs[index_poor], axis=0)
abs_dif = np.abs(model_dif)
# Select top ceil(beta*D) dimensions
n_selected_dims = max(1, int(np.ceil(beta * D)))
sorted_dif = np.sort(abs_dif)[::-1]
threshold = sorted_dif[min(n_selected_dims - 1, len(sorted_dif) - 1)]
# Match MATLAB: find(abs(Model_Dif) >= threshold) - may select more due to ties
index_dif = np.where(abs_dif >= threshold)[0]
# Step 6: Build RBF models for selected dimensions
decs_surrogate = sel_decs[:, index_dif]
rbf_models = []
for j in range(M):
model = _rbf_create(decs_surrogate, sel_objs[:, j])
rbf_models.append(model)
# Step 7: Environmental selection for PSO population
n_actual = min(n, N_sel)
pop_decs_full, pop_objs, _, _ = _environmental_selection(sel_decs, sel_objs, n_actual)
pop_decs_reduced = pop_decs_full[:, index_dif]
# Step 8: PSO on surrogates in reduced dimension space
offspring_decs_reduced, offspring_objs = _reproduction(
pop_decs_reduced, pop_objs, rbf_models, M, nfes, max_nfes
)
# Step 9: Environmental selection on offspring to select top k
k_out = min(k_actual, offspring_decs_reduced.shape[0])
if k_out < offspring_decs_reduced.shape[0]:
sel_off_decs, _, _, _ = _environmental_selection(
offspring_decs_reduced, offspring_objs, k_out
)
else:
sel_off_decs = offspring_decs_reduced
# Step 10: Replace selected dimensions in candidate solutions
n_replace = min(k_actual, sel_off_decs.shape[0])
candidate_decs[np.ix_(np.arange(n_replace), index_dif)] = sel_off_decs[:n_replace]
# Clip to [0, 1]
candidate_decs = np.clip(candidate_decs, 0, 1)
return candidate_decs
# =============================================================================
# Environmental Selection (NSGA-II style)
# =============================================================================
def _environmental_selection(pop_decs, pop_objs, n):
"""
NSGA-II style environmental selection using non-dominated sorting
and crowding distance.
Parameters
----------
pop_decs : np.ndarray
Decision variables, shape (N, D)
pop_objs : np.ndarray
Objective values, shape (N, M)
n : int
Number of solutions to select
Returns
-------
sel_decs : np.ndarray
Selected decision variables
sel_objs : np.ndarray
Selected objective values
front_no : np.ndarray
Front numbers of selected solutions
crowd_dis : np.ndarray
Crowding distances of selected solutions
"""
N = pop_decs.shape[0]
n = min(n, N)
# Non-dominated sorting
front_no, max_fno = nd_sort(pop_objs, n)
# Crowding distance
crowd_dis = crowding_distance(pop_objs, front_no)
# Select solutions from fronts below max_fno
next_mask = front_no < max_fno
# Fill remaining from last front based on crowding distance
last = np.where(front_no == max_fno)[0]
if last.size > 0 and np.sum(next_mask) < n:
rank = np.argsort(crowd_dis[last])[::-1]
n_remaining = n - np.sum(next_mask)
next_mask[last[rank[:n_remaining]]] = True
selected = np.where(next_mask)[0]
return (pop_decs[selected], pop_objs[selected],
front_no[selected], crowd_dis[selected])
# =============================================================================
# RBF Surrogate Model
# =============================================================================
def _rbf_create(X, y, kernel='gaussian'):
"""
Create RBF surrogate model with augmented linear system.
Parameters
----------
X : np.ndarray, shape (N, D)
Training inputs
y : np.ndarray, shape (N,) or (N, 1)
Training targets
kernel : str
Kernel type: 'gaussian' or 'cubic'
Returns
-------
para : dict
RBF model parameters
"""
if y.ndim == 1:
y = y.reshape(-1, 1)
N, D = X.shape
# Normalize X to [-1, 1]
xmin = X.min(axis=0)
xmax = X.max(axis=0)
x_range = xmax - xmin
x_range[x_range < 1e-10] = 1.0
X_norm = 2.0 / x_range * (X - xmin) - 1.0
# Normalize y to [-1, 1]
ymin = y.min(axis=0)
ymax = y.max(axis=0)
y_range = ymax - ymin
y_range[y_range < 1e-10] = 1.0
y_norm = 2.0 / y_range * (y - ymin) - 1.0
# Compute pairwise distance matrix
r = cdist(X_norm, X_norm, 'euclidean')
# Compute kernel matrix
if kernel == 'gaussian':
# MATLAB: radbas(sqrt(-log(0.5)) * r) = exp(-log(2) * r^2)
Phi = np.exp(-np.log(2) * r ** 2)
elif kernel == 'cubic':
Phi = r ** 3
else:
raise ValueError(f"Unknown kernel: {kernel}")
# Augmented linear system: [Phi, P; P', 0] * [alpha; beta] = [y; 0]
P = np.hstack([np.ones((N, 1)), X_norm])
A = np.block([
[Phi, P],
[P.T, np.zeros((D + 1, D + 1))]
])
b = np.vstack([y_norm, np.zeros((D + 1, y_norm.shape[1]))])
# Add small regularization for numerical stability
A += np.eye(A.shape[0]) * 1e-8
# Solve the linear system
theta = np.linalg.lstsq(A, b, rcond=None)[0]
return {
'alpha': theta[:N],
'beta': theta[N:],
'xmin': xmin,
'xmax': xmax,
'ymin': ymin,
'ymax': ymax,
'nodes': X_norm,
'kernel': kernel
}
def _rbf_predict_single(X, para):
"""
Predict using a single RBF model.
Parameters
----------
X : np.ndarray, shape (nx, D)
Test inputs
para : dict
RBF model parameters
Returns
-------
y : np.ndarray, shape (nx, 1)
Predicted values
"""
nx = X.shape[0]
nodes = para['nodes']
# Normalize X to [-1, 1]
x_range = para['xmax'] - para['xmin']
x_range[x_range < 1e-10] = 1.0
X_norm = 2.0 / x_range * (X - para['xmin']) - 1.0
# Compute distances to training nodes
r = cdist(X_norm, nodes, 'euclidean')
# Compute kernel values
if para['kernel'] == 'gaussian':
Phi = np.exp(-np.log(2) * r ** 2)
elif para['kernel'] == 'cubic':
Phi = r ** 3
# Predict in normalized space
P = np.hstack([np.ones((nx, 1)), X_norm])
y_norm = Phi @ para['alpha'] + P @ para['beta']
# Denormalize
y_range = para['ymax'] - para['ymin']
y = y_range / 2.0 * (y_norm + 1.0) + para['ymin']
return y
def _rbf_predict(X, rbf_models, M):
"""
Predict objectives using multiple RBF models.
Parameters
----------
X : np.ndarray, shape (N, D_reduced)
Test inputs in reduced dimension space
rbf_models : list
List of RBF model dicts, one per objective
M : int
Number of objectives
Returns
-------
pred_objs : np.ndarray, shape (N, M)
Predicted objective values
"""
N = X.shape[0]
pred_objs = np.zeros((N, M))
for j in range(M):
pred_objs[:, j:j + 1] = _rbf_predict_single(X, rbf_models[j])
return pred_objs
# =============================================================================
# PSO-based Reproduction on Surrogates
# =============================================================================
def _reproduction(pop_decs, pop_objs, rbf_models, M, nfes, max_nfes):
"""
PSO-based reproduction on RBF surrogates with BFE-guided Gbest management.
Parameters
----------
pop_decs : np.ndarray, shape (N, D_reduced)
Population in reduced dimension space
pop_objs : np.ndarray, shape (N, M)
Population objectives
rbf_models : list
RBF models for each objective
M : int
Number of objectives
nfes : int
Current function evaluation count
max_nfes : int
Maximum function evaluations
Returns
-------
off_decs : np.ndarray, shape (N, D_reduced)
Offspring decision variables in reduced space
off_objs : np.ndarray, shape (N, M)
Offspring surrogate-predicted objectives
"""
N, D = pop_decs.shape
# Initialize Pbest as current particles
pbest_decs = pop_decs.copy()
pbest_objs = pop_objs.copy()
# Initialize Archive: non-dominated solutions sorted by BFE
front_no_all, _ = nd_sort(pop_objs, N)
nd_mask = front_no_all == 1
archive_decs = pop_decs[nd_mask].copy()
archive_objs = pop_objs[nd_mask].copy()
if archive_decs.shape[0] > 0:
bfe = _compute_bfe(archive_objs)
rank = np.argsort(bfe)[::-1]
archive_decs = archive_decs[rank]
archive_objs = archive_objs[rank]
if archive_decs.shape[0] > N:
archive_decs = archive_decs[:N]
archive_objs = archive_objs[:N]
# Initialize Gbest: random from top 10% of archive
arch_size = archive_decs.shape[0]
if arch_size > 0:
top_count = max(1, int(np.ceil(arch_size / 10)))
gbest_idx = np.random.randint(0, top_count, size=N)
gbest_decs = archive_decs[gbest_idx].copy()
gbest_objs = archive_objs[gbest_idx].copy()
else:
gbest_decs = pop_decs.copy()
gbest_objs = pop_objs.copy()
# Initialize velocity and particles
velocity = np.zeros((N, D))
particle_decs = pop_decs.copy()
off_decs = particle_decs.copy()
off_objs = pop_objs.copy()
# PSO loop (100 generations)
for gen in range(100):
W = 0.5
r1 = np.random.rand(N, 1) * np.ones((1, D))
r2 = np.random.rand(N, 1) * np.ones((1, D))
off_vel = (W * velocity
+ r1 * (pbest_decs - particle_decs)
+ r2 * (gbest_decs - particle_decs))
off_decs = particle_decs + off_vel
# Predict objectives using RBF surrogates
off_objs = _rbf_predict(off_decs, rbf_models, M)
# Update particle state
particle_decs = off_decs.copy()
velocity = off_vel.copy()
# Update Pbest: replace if new solution is better in at least one objective
replace = ~np.all(off_objs >= pbest_objs, axis=1)
pbest_decs[replace] = off_decs[replace]
pbest_objs[replace] = off_objs[replace]
# Update Gbest: keep non-dominated, sort by crowding distance
if gbest_decs.shape[0] > 1:
gbest_front, _ = nd_sort(gbest_objs, gbest_objs.shape[0])
nd_g = gbest_front == 1
gbest_decs = gbest_decs[nd_g]
gbest_objs = gbest_objs[nd_g]
if gbest_decs.shape[0] > 1:
cd_g = _crowding_distance_same_front(gbest_objs)
rank_g = np.argsort(cd_g)[::-1]
keep = min(N, gbest_decs.shape[0])
gbest_decs = gbest_decs[rank_g[:keep]]
gbest_objs = gbest_objs[rank_g[:keep]]
# Ensure N Gbest guides are available (pad by repeating if needed)
if gbest_decs.shape[0] < N:
indices = np.random.randint(0, gbest_decs.shape[0], size=N)
gbest_decs = gbest_decs[indices]
gbest_objs = gbest_objs[indices]
# Apply polynomial mutation if late in optimization (>= 75% budget used)
if nfes >= 0.75 * max_nfes:
proM = 1
disM = 20
site = np.random.rand(N, D) < proM / D
mu_arr = np.random.rand(N, D)
# Clip to [0, 1] before mutation
off_decs = np.clip(off_decs, 0.0, 1.0)
# Polynomial mutation (bounds [0, 1])
temp = site & (mu_arr <= 0.5)
off_decs[temp] = off_decs[temp] + (
(2 * mu_arr[temp] + (1 - 2 * mu_arr[temp])
* (1 - off_decs[temp]) ** (disM + 1)) ** (1.0 / (disM + 1)) - 1
)
temp = site & (mu_arr > 0.5)
off_decs[temp] = off_decs[temp] + (
1 - (2 * (1 - mu_arr[temp]) + 2 * (mu_arr[temp] - 0.5)
* off_decs[temp] ** (disM + 1)) ** (1.0 / (disM + 1))
)
# Re-predict after mutation
off_objs = _rbf_predict(off_decs, rbf_models, M)
return off_decs, off_objs
# =============================================================================
# BFE (Balanceable Fitness Estimation)
# =============================================================================
def _compute_bfe(pop_objs):
"""
Compute BFE values for a population of non-dominated solutions.
Used for archive management in PSO Gbest initialization.
Parameters
----------
pop_objs : np.ndarray, shape (N, M)
Objective values
Returns
-------
bfe : np.ndarray, shape (N,)
BFE values (higher is better)
"""
N, M = pop_objs.shape
if N <= 1:
return np.ones(N)
# Normalize objectives
fmin = pop_objs.min(axis=0)
fmax = pop_objs.max(axis=0)
f_range = fmax - fmin
f_range[f_range < 1e-10] = 1.0
norm_objs = (pop_objs - fmin) / f_range
# Compute shifted distance (SDE)
sde = np.full((N, N), np.inf)
for i in range(N):
shifted = np.maximum(norm_objs, norm_objs[i:i + 1, :])
diffs = norm_objs[i:i + 1, :] - shifted
dists = np.sqrt(np.sum(diffs ** 2, axis=1))
sde[i, :] = dists
sde[i, i] = np.inf
# Cd: diversity metric based on minimum SDE
SDE_min = np.min(sde, axis=1)
sde_range = SDE_min.max() - SDE_min.min()
if sde_range < 1e-10:
Cd = np.zeros(N)
else:
Cd = (SDE_min - SDE_min.min()) / sde_range
# Cv: convergence value
dis = np.sqrt(np.sum(norm_objs ** 2, axis=1))
Cv = 1 - dis
# d1 and d2: projected distances
ones_vec = np.ones((1, M))
cosine = 1 - cdist(norm_objs, ones_vec, metric='cosine').flatten()
cosine = np.clip(cosine, -1, 1)
d1 = dis * cosine
d2 = dis * np.sqrt(np.clip(1 - cosine ** 2, 0, None))
# Determine alpha and beta for each solution (8 cases)
alpha = np.zeros(N)
beta_arr = np.zeros(N)
meanCd = np.mean(Cd)
meanCv = np.mean(Cv)
meand1 = np.mean(d1)
meand2 = np.mean(d2)
case111 = (Cv > meanCv) & (d1 <= meand1) & (Cd <= meanCd)
case112 = (Cv > meanCv) & (d1 <= meand1) & (Cd > meanCd)
case121 = (Cv > meanCv) & (d1 > meand1) & (Cd <= meanCd)
case122 = (Cv > meanCv) & (d1 > meand1) & (Cd > meanCd)
case211 = (Cv <= meanCv) & (d1 <= meand1) & (d2 > meand2) & (Cd <= meanCd)
case212 = (Cv <= meanCv) & (d1 <= meand1) & (d2 > meand2) & (Cd > meanCd)
case221 = (Cv <= meanCv) & ((d1 > meand1) | (d2 <= meand2)) & (Cd <= meanCd)
case222 = (Cv <= meanCv) & ((d1 > meand1) | (d2 <= meand2)) & (Cd > meanCd)
alpha[case111] = np.random.rand(np.sum(case111)) * 0.3 + 0.8
beta_arr[case111] = 1.0
alpha[case112] = 1.0
beta_arr[case112] = 1.0
alpha[case121] = 0.6
beta_arr[case121] = 1.0
alpha[case122] = 0.9
beta_arr[case122] = 1.0
alpha[case211] = np.random.rand(np.sum(case211)) * 0.3 + 0.8
beta_arr[case211] = np.random.rand(np.sum(case211)) * 0.3 + 0.8
alpha[case212] = 1.0
beta_arr[case212] = 1.0
alpha[case221] = 0.2
beta_arr[case221] = 0.2
alpha[case222] = 1.0
beta_arr[case222] = 0.2
bfe = alpha * Cd + beta_arr * Cv
return bfe
# =============================================================================
# Crowding Distance for Same Front
# =============================================================================
def _crowding_distance_same_front(pop_objs):
"""
Calculate crowding distance for solutions assumed to be in the same front.
Parameters
----------
pop_objs : np.ndarray, shape (N, M)
Objective values
Returns
-------
crowd_dis : np.ndarray, shape (N,)
Crowding distance values
"""
N, M = pop_objs.shape
if N <= 2:
return np.full(N, np.inf)
crowd_dis = np.zeros(N)
fmax = pop_objs.max(axis=0)
fmin = pop_objs.min(axis=0)
for m in range(M):
rank = np.argsort(pop_objs[:, m])
crowd_dis[rank[0]] = np.inf
crowd_dis[rank[-1]] = np.inf
denom = fmax[m] - fmin[m]
if denom < 1e-10:
continue
for j in range(1, N - 1):
crowd_dis[rank[j]] += (pop_objs[rank[j + 1], m] - pop_objs[rank[j - 1], m]) / denom
return crowd_dis