"""
Surrogate-assisted Hierarchical Particle Swarm Optimization (SHPSO)
This module implements SHPSO for expensive single-objective optimization problems.
References
----------
[1] Yu, Haibo, et al. "Surrogate-assisted hierarchical particle swarm optimization." Information Sciences 454 (2018): 59-72.
Notes
-----
Author: Jiangtao Shen
Email: j.shen5@exeter.ac.uk
Date: 2025.01.13
Version: 1.0
"""
import time
import numpy as np
from tqdm import tqdm
from scipy.interpolate import RBFInterpolator
from ddmtolab.Methods.Algo_Methods.algo_utils import *
import warnings
warnings.filterwarnings("ignore")
[docs]
class SHPSO:
"""
Surrogate-assisted Hierarchical Particle Swarm Optimization for expensive optimization problems.
This algorithm uses a two-level hierarchical structure:
1. Upper level: RBF surrogate model optimized by SL-PSO to find promising regions
2. Lower level: PSO swarm guided by surrogate model optimum with prescreening strategy
"""
algorithm_information = {
'n_tasks': '[1, K]',
'dims': 'unequal',
'objs': 'equal',
'n_objs': '1',
'cons': 'equal',
'n_cons': '0',
'expensive': 'True',
'knowledge_transfer': 'False',
'n_initial': 'unequal',
'max_nfes': 'unequal'
}
@classmethod
def get_algorithm_information(cls, print_info=True):
return get_algorithm_information(cls, print_info)
[docs]
def __init__(self, problem, n_initial=None, max_nfes=None, ps=None, mu=5,
save_data=True, save_path='./Data', name='SHPSO', disable_tqdm=True):
"""
Initialize SHPSO algorithm.
Parameters
----------
problem : MTOP
Multi-task optimization problem instance
n_initial : int or List[int], optional
Number of initial samples per task (default: 100)
max_nfes : int or List[int], optional
Maximum number of function evaluations per task (default: 300)
ps : int or List[int], optional
Particle swarm size per task (default: 20)
mu : int, optional
Number of new samples per iteration (default: 1)
- 1: Only evaluate surrogate optimum (most conservative)
- k: Evaluate surrogate optimum + top (k-1) prescreened particles
save_data : bool, optional
Whether to save optimization data (default: True)
save_path : str, optional
Path to save results (default: './TestData')
name : str, optional
Name for the experiment (default: 'SHPSO_test')
disable_tqdm : bool, optional
Whether to disable progress bar (default: True)
"""
self.problem = problem
self.n_initial = n_initial if n_initial is not None else 100
self.max_nfes = max_nfes if max_nfes is not None else 300
self.ps = ps if ps is not None else 50
self.mu = mu
self.save_data = save_data
self.save_path = save_path
self.name = name
self.disable_tqdm = disable_tqdm
self.c1 = 2.05 # Cognitive coefficient
self.c2 = 2.05 # Social coefficient
self.w = 0.7298 # Inertia weight (constriction factor)
[docs]
def optimize(self):
"""
Execute the SHPSO algorithm.
Returns
-------
Results
Optimization results containing decision variables, objectives, and runtime
"""
start_time = time.time()
problem = self.problem
nt = problem.n_tasks
dims = problem.dims
n_initial_per_task = par_list(self.n_initial, nt)
max_nfes_per_task = par_list(self.max_nfes, nt)
ps_per_task = par_list(self.ps, nt)
# Generate initial samples using Latin Hypercube Sampling
decs = initialization(problem, self.n_initial, method='lhs')
objs, _ = evaluation(problem, decs)
nfes_per_task = n_initial_per_task.copy()
# Historical database for each task
hx = [decs[i].copy() for i in range(nt)] # Historical positions
hf = [objs[i].flatten().copy() for i in range(nt)] # Historical fitness
# Initialize PSO swarm for each task
pos = [] # Current positions
vel = [] # Current velocities
pbest = [] # Personal best positions
pbestval = [] # Personal best values
gbest = [] # Global best position
gbestval = [] # Global best value
for i in range(nt):
dim = dims[i]
ps = ps_per_task[i]
# Initialize particle positions from initial samples
if n_initial_per_task[i] >= ps:
# Select ps best particles from initial samples
idx_sorted = np.argsort(hf[i])[:ps]
pos_i = hx[i][idx_sorted].copy()
obj_i = hf[i][idx_sorted].copy()
else:
# Use all initial samples and add random particles
pos_i = hx[i].copy()
obj_i = hf[i].copy()
extra = ps - len(pos_i)
if extra > 0:
extra_pos = np.random.rand(extra, dim)
pos_i = np.vstack([pos_i, extra_pos])
# Evaluate extra particles
extra_objs, _ = evaluation_single(problem, extra_pos, i)
obj_i = np.concatenate([obj_i, extra_objs.flatten()])
# Update history
hx[i] = np.vstack([hx[i], extra_pos])
hf[i] = np.concatenate([hf[i], extra_objs.flatten()])
nfes_per_task[i] += extra
pos.append(pos_i)
# Initialize velocities
mv = 0.5
vel_i = -mv + 2 * mv * np.random.rand(ps, dim)
vel.append(vel_i)
# Initialize personal best
pbest.append(pos_i.copy())
pbestval.append(obj_i.copy())
# Initialize global best
gbestidx = np.argmin(obj_i)
gbest.append(pos_i[gbestidx].copy())
gbestval.append(obj_i[gbestidx])
# Best surrogate model optimum for each task
bestp = [gbest[i].copy() for i in range(nt)]
besty = [gbestval[i] for i in range(nt)]
pbar = tqdm(total=sum(max_nfes_per_task), initial=sum(nfes_per_task),
desc=f"{self.name}", disable=self.disable_tqdm)
while sum(nfes_per_task) < sum(max_nfes_per_task):
# Skip tasks that have exhausted their evaluation budget
active_tasks = [i for i in range(nt) if nfes_per_task[i] < max_nfes_per_task[i]]
if not active_tasks:
break
for i in active_tasks:
dim = dims[i]
ps = ps_per_task[i]
# Determine training sample size
if dim < 100:
gs = 100
else:
gs = 150
# Build global RBF surrogate model using best historical samples
rbf_model = self._build_rbf_model(hx[i], hf[i], gs, dim)
# Record old best
besty_old = besty[i]
bestp_old = bestp[i].copy()
# ===== Upper Level: Optimize surrogate model using SL-PSO =====
# MATLAB passes ghx (top-gs samples) to SLPSO for search bounds
idx_sorted = np.argsort(hf[i])
n_select = min(gs, len(hf[i]))
ghx = hx[i][idx_sorted[:n_select]]
bestp_new = self._optimize_surrogate_slpso(rbf_model, ghx, dim)
# Exact evaluate the surrogate optimum
besty_new_arr, _ = evaluation_single(problem, bestp_new.reshape(1, -1), i)
besty_new = besty_new_arr[0, 0]
nfes_per_task[i] += 1
pbar.update(1)
# Update surrogate optimum (MATLAB: compare before adding to history)
if besty_new < besty_old:
besty[i] = besty_new
bestp[i] = bestp_new.copy()
else:
besty[i] = besty_old
bestp[i] = bestp_old.copy()
# Update history database with the WINNER (matching MATLAB)
if not is_duplicate(bestp[i], hx[i]):
hx[i] = np.vstack([hx[i], bestp[i]])
hf[i] = np.concatenate([hf[i], [besty[i]]])
# Rebuild RBF model with updated database (matching MATLAB: ghf(end) > besty)
rbf_model = self._build_rbf_model(hx[i], hf[i], gs, dim)
# ===== Lower Level: PSO guided by surrogate optimum =====
# Determine guidance strategy
if besty[i] < gbestval[i]:
# Use surrogate optimum to guide particles
guide_pos = bestp[i]
# Update gbest's corresponding pbest
gbestidx = np.argmin(pbestval[i])
pbest[i][gbestidx] = bestp[i].copy()
pbestval[i][gbestidx] = besty[i]
else:
# Use current gbest
guide_pos = gbest[i]
# PSO velocity and position update
r1 = np.random.rand(ps, dim)
r2 = np.random.rand(ps, dim)
# Velocity update with constriction factor
vel[i] = self.w * (vel[i] + self.c1 * r1 * (pbest[i] - pos[i]) + self.c2 * r2 * (guide_pos - pos[i]))
# Velocity clamping
mv = 0.5
vel[i] = np.clip(vel[i], -mv, mv)
# Position update
pos[i] = pos[i] + vel[i]
# Boundary handling with random reinitialization
out_lower = pos[i] < 0
out_upper = pos[i] > 1
pos[i] = np.where(out_lower, 0.25 * np.random.rand(ps, dim), pos[i])
pos[i] = np.where(out_upper, 1 - 0.25 * np.random.rand(ps, dim), pos[i])
# ===== Prescreening: e-pbest strategy (matching MATLAB) =====
# Predict fitness using surrogate model
e_pred = rbf_model(pos[i]).flatten()
# Select candidates where surrogate predicts improvement over personal best
candidates_to_eval = []
for idx in range(ps):
if e_pred[idx] < pbestval[i][idx]:
if not is_duplicate(pos[i][idx], hx[i]):
candidates_to_eval.append(idx)
# Exact evaluate prescreened candidates
for idx in candidates_to_eval:
if nfes_per_task[i] >= max_nfes_per_task[i]:
break
candidate_pos = pos[i][idx:idx + 1]
candidate_obj, _ = evaluation_single(problem, candidate_pos, i)
e_true = candidate_obj[0, 0]
nfes_per_task[i] += 1
pbar.update(1)
# Update history database
hx[i] = np.vstack([hx[i], candidate_pos])
hf[i] = np.concatenate([hf[i], [e_true]])
# Update personal best
if e_true < pbestval[i][idx]:
pbest[i][idx] = candidate_pos.flatten()
pbestval[i][idx] = e_true
# Update global best
gbestidx = np.argmin(pbestval[i])
gbest[i] = pbest[i][gbestidx].copy()
gbestval[i] = pbestval[i][gbestidx]
pbar.close()
runtime = time.time() - start_time
# Convert database to staircase history structure for results
db_decs = [hx[i].copy() for i in range(nt)]
db_objs = [hf[i].reshape(-1, 1).copy() for i in range(nt)]
all_decs, all_objs = build_staircase_history(db_decs, db_objs, k=1)
# Build and save results
results = build_save_results(all_decs=all_decs, all_objs=all_objs, runtime=runtime, max_nfes=nfes_per_task,
bounds=problem.bounds, save_path=self.save_path, filename=self.name,
save_data=self.save_data)
return results
# Build RBF surrogate model using best historical samples
def _build_rbf_model(self, hx, hf, gs, dim):
# Sort by fitness and select best gs samples
idx_sorted = np.argsort(hf)
n_select = min(gs, len(hf))
idx_select = idx_sorted[:n_select]
ghx = hx[idx_select]
ghf = hf[idx_select]
# Calculate spread parameter
n_samples = len(ghf)
if n_samples > 1:
dist_matrix = cdist(ghx, ghx, metric='euclidean')
max_dist = dist_matrix.max()
spread = max_dist / (dim * n_samples) ** (1.0 / dim)
else:
spread = 1.0
# Build RBF interpolator
try:
rbf_interpolator = RBFInterpolator(ghx, ghf, kernel='gaussian', epsilon=np.sqrt(np.log(2)) / spread)
except:
rbf_interpolator = RBFInterpolator(ghx, ghf, kernel='thin_plate_spline')
def rbf_model(x):
if x.ndim == 1:
x = x.reshape(1, -1)
pred = rbf_interpolator(x)
return pred.reshape(-1, 1)
return rbf_model
# Optimize surrogate model using inline SLPSO (matching MATLAB exactly)
def _optimize_surrogate_slpso(self, rbf_model, ghx, dim):
"""
Find near-optimum of RBF surrogate using Social Learning PSO.
Key difference from general SL_PSO: search bounds are restricted to
[min(ghx), max(ghx)] per dimension, matching MATLAB's implementation.
This prevents SLPSO from exploring regions where the Gaussian RBF
extrapolates to spurious low values.
"""
# MATLAB: lu = [min(ghx); max(ghx)]
lb = ghx.min(axis=0)
ub = ghx.max(axis=0)
# SLPSO parameters (matching MATLAB)
M = 100
beta = 0.01
m = M + int(np.floor(dim / 10))
c3 = dim / M * beta
maxgen = 50 * dim
minerror = 1e-6
# Learning probability: PL(i) = (1 - (i-1)/m)^log(sqrt(ceil(d/M)))
PL = np.array([(1 - i / m) ** np.log(np.sqrt(np.ceil(dim / M))) for i in range(m)])
# Initialize population with LHS in [lb, ub]
p = lb + (ub - lb) * np.random.rand(m, dim)
v = np.zeros((m, dim))
bestever = 1e200
best_pos = np.zeros(dim)
flag_er = 0
for gen in range(maxgen):
best_old = bestever
# Evaluate fitness using surrogate model
fitness = rbf_model(p).flatten()
# Sort descending (worst first, best last — matching MATLAB)
rank = np.argsort(-fitness)
fitness = fitness[rank]
p = p[rank]
v = v[rank]
# Best in current population (last element after descending sort)
besty = fitness[-1]
bestp = p[-1].copy()
if besty < bestever:
bestever = besty
best_pos = bestp.copy()
# Early stopping: 20 consecutive gens with improvement <= minerror
error = abs(best_old - bestever)
if error <= minerror:
flag_er += 1
else:
flag_er = 0
if flag_er >= 20:
break
# Center position
center = np.mean(p, axis=0)
# Random coefficients
randco1 = np.random.rand(m, dim)
randco2 = np.random.rand(m, dim)
randco3 = np.random.rand(m, dim)
# Social learning: select demonstrators from better particles
# MATLAB: winidx = i + ceil(rand*(m-i)), selecting from [i+1, m] (1-indexed)
idx = np.arange(m).reshape(-1, 1)
max_range = m - 1 - idx # max offset for each particle
offsets = np.ceil(np.random.rand(m, dim) * max_range).astype(int)
winidx = idx + offsets
winidx = np.clip(winidx, 0, m - 1)
# Get winner positions (per-dimension indexing)
pwin = np.zeros((m, dim))
for j in range(dim):
pwin[:, j] = p[winidx[:, j], j]
# Learning mask
lpmask_1d = np.random.rand(m) < PL
lpmask_1d[-1] = False # Best particle doesn't learn
lpmask = np.tile(lpmask_1d.reshape(-1, 1), (1, dim))
# Velocity and position update
v1 = randco1 * v + randco2 * (pwin - p) + c3 * randco3 * (center - p)
p1 = p + v1
v = np.where(lpmask, v1, v)
p = np.where(lpmask, p1, p)
# Boundary handling: clip to [lb, ub]
p = np.clip(p, lb, ub)
best_pos = np.clip(best_pos, 0.0, 1.0)
return best_pos