"""
Global-Local Surrogate-Assisted Differential Evolution (GL-SADE)
This module implements GL-SADE for expensive single-objective optimization problems.
References
----------
[1] Wang, Weizhong, Hai-Lin Liu, and Kay Chen Tan. "A surrogate-assisted differential evolution algorithm for high-dimensional expensive optimization problems." IEEE Transactions on Cybernetics 53.4 (2022): 2685-2697.
Notes
-----
Author: Jiangtao Shen
Email: j.shen5@exeter.ac.uk
Date: 2025.01.13
Version: 1.0
"""
import time
import numpy as np
from tqdm import tqdm
from scipy.interpolate import RBFInterpolator
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import RBF, ConstantKernel as C, WhiteKernel
from ddmtolab.Methods.Algo_Methods.algo_utils import *
import warnings
warnings.filterwarnings("ignore")
[docs]
class GL_SADE:
"""
Global-Local Surrogate-Assisted Differential Evolution for expensive optimization problems.
This algorithm adaptively switches between:
1. Global search: RBF model with plain acquisition
2. Local search: GPR model with LCB-decay acquisition
"""
algorithm_information = {
'n_tasks': '[1, K]',
'dims': 'unequal',
'objs': 'equal',
'n_objs': '1',
'cons': 'equal',
'n_cons': '0',
'expensive': 'True',
'knowledge_transfer': 'False',
'n_initial': 'unequal',
'max_nfes': 'unequal'
}
@classmethod
def get_algorithm_information(cls, print_info=True):
return get_algorithm_information(cls, print_info)
[docs]
def __init__(self, problem, n_initial=None, max_nfes=None, save_data=True,
save_path='./Data', name='GL-SADE', disable_tqdm=True):
"""
Initialize GL-SADE algorithm.
Parameters
----------
problem : MTOP
Multi-task optimization problem instance
n_initial : int or List[int], optional
Number of initial samples per task (default: 100)
max_nfes : int or List[int], optional
Maximum number of function evaluations per task (default: 300)
save_data : bool, optional
Whether to save optimization data (default: True)
save_path : str, optional
Path to save results (default: './TestData')
name : str, optional
Name for the experiment (default: 'GLSADE_test')
disable_tqdm : bool, optional
Whether to disable progress bar (default: True)
"""
self.problem = problem
self.n_initial = n_initial if n_initial is not None else 100
self.max_nfes = max_nfes if max_nfes is not None else 300
self.save_data = save_data
self.save_path = save_path
self.name = name
self.disable_tqdm = disable_tqdm
[docs]
def optimize(self):
"""
Execute the GL-SADE algorithm.
Returns
-------
Results
Optimization results containing decision variables, objectives, and runtime
"""
start_time = time.time()
problem = self.problem
nt = problem.n_tasks
dims = problem.dims
n_initial_per_task = par_list(self.n_initial, nt)
max_nfes_per_task = par_list(self.max_nfes, nt)
# Generate initial samples using Latin Hypercube Sampling
decs = initialization(problem, self.n_initial, method='lhs')
objs, _ = evaluation(problem, decs)
nfes_per_task = n_initial_per_task.copy()
# Current working dataset (accumulated samples)
current_decs = [decs[i].copy() for i in range(nt)]
current_objs = [objs[i].copy() for i in range(nt)]
pbar = tqdm(total=sum(max_nfes_per_task), initial=sum(n_initial_per_task),
desc=f"{self.name}", disable=self.disable_tqdm)
while sum(nfes_per_task) < sum(max_nfes_per_task):
# Skip tasks that have exhausted their evaluation budget
active_tasks = [i for i in range(nt) if nfes_per_task[i] < max_nfes_per_task[i]]
if not active_tasks:
break
for i in active_tasks:
dim = dims[i]
X = current_decs[i]
Y = current_objs[i]
# Determine search state based on improvement
if len(Y) > 1:
if Y[-1, 0] < np.min(Y[:-1, 0]):
state = 1 # Local search (last point improved)
else:
state = 0 # Global search (no improvement)
else:
state = 0 # Default to global if only initial samples
# Execute search based on state
if state == 0:
candidate_np = self._global_search(X, Y, dim, nfes_per_task[i])
else:
candidate_np = self._local_search(X, Y, dim, nfes_per_task[i])
# Ensure uniqueness: avoid duplicate sampling
candidate_np = self._ensure_uniqueness(candidate_np, X, dim)
# Evaluate the candidate solution
obj, _ = evaluation_single(problem, candidate_np, i)
# Update current working dataset
current_decs[i] = np.vstack([current_decs[i], candidate_np])
current_objs[i] = np.vstack([current_objs[i], obj])
nfes_per_task[i] += 1
pbar.update(1)
pbar.close()
runtime = time.time() - start_time
# Convert database to staircase history structure for results
db_decs = [current_decs[i].copy() for i in range(nt)]
db_objs = [current_objs[i].copy() for i in range(nt)]
all_decs, all_objs = build_staircase_history(db_decs, db_objs, k=1)
# Build and save results
results = build_save_results(all_decs=all_decs, all_objs=all_objs, runtime=runtime, max_nfes=nfes_per_task,
bounds=problem.bounds, save_path=self.save_path, filename=self.name,
save_data=self.save_data)
return results
# Build RBF surrogate model
def _build_rbf_model(self, X, Y):
Y = Y.flatten()
n_samples, dim = X.shape
if n_samples > 1:
# Compute pairwise distances
dist_matrix = cdist(X, X, metric='euclidean')
max_dist = dist_matrix.max()
# Adaptive spread estimation
spread = max_dist / (dim * n_samples) ** (1.0 / dim)
else:
spread = 1.0
# Use Gaussian RBF
try:
rbf_interpolator = RBFInterpolator(X, Y, kernel='gaussian', epsilon=1.0 / spread)
except Exception:
# Fallback
rbf_interpolator = RBFInterpolator(X, Y, kernel='thin_plate_spline')
# Return a function that only returns predictions
def rbf_model(x):
if x.ndim == 1:
x = x.reshape(1, -1)
pred = rbf_interpolator(x)
return pred.reshape(-1, 1)
return rbf_model
# Build GPR surrogate model
def _build_gpr_model(self, X, Y):
Y = Y.flatten()
# Define kernel: Constant * RBF + WhiteKernel (noise)
kernel = C(1.0, (1e-3, 1e3)) * RBF(1.0, (1e-2, 1e2)) + WhiteKernel(1e-5, (1e-10, 1e-1))
# Fit GPR with normalization
gpr = GaussianProcessRegressor(
kernel=kernel,
alpha=1e-6,
normalize_y=True,
n_restarts_optimizer=5,
random_state=42
)
gpr.fit(X, Y)
# Return a function that returns (mean, std)
def gpr_model(x):
if x.ndim == 1:
x = x.reshape(1, -1)
mean, std = gpr.predict(x, return_std=True)
return mean.reshape(-1, 1), std.reshape(-1, 1)
return gpr_model
# Global search using RBF model with plain acquisition
def _global_search(self, X, Y, dim, nfes_used):
# Build RBF model
rbf_model = self._build_rbf_model(X, Y)
# Define acquisition function
def acquisition_func(x):
return rbf_model(x)
# Optimize acquisition using DE/best/1 with elite init from database
candidate = self._de_best1_search(acquisition_func, X, Y, dim,
popsize=50, max_gen=50)
return candidate
# Local search using GPR model with LCB-decay acquisition
def _local_search(self, X, Y, dim, nfes_used):
# Use only recent samples (min(n, 2*dim))
n_local = min(len(X), 2 * dim)
X_local = X[-n_local:]
Y_local = Y[-n_local:]
# Build GPR model
gpr_model = self._build_gpr_model(X_local, Y_local)
# Calculate LCB weight with decay
w = 2.0 - 2.0 / (1.0 + np.exp(5.0 - 20.0 * nfes_used / 500.0))
# Define acquisition function (LCB: mean - w * std)
def acquisition_func(x):
mean, std = gpr_model(x)
return mean - w * std
# Optimize using DE/best/1 with elite init (1 generation = prescreening)
candidate = self._de_best1_search(acquisition_func, X, Y, dim,
popsize=50, max_gen=1)
return candidate
def _de_best1_search(self, acquisition_func, X_db, Y_db, dim,
popsize=50, max_gen=50, F=0.5, CR=0.8):
"""DE/best/1/bin with elite initialization and 1-to-1 comparison selection."""
N = len(X_db)
Y_flat = Y_db.flatten()
# Elite initialization: top-popsize solutions from database
if N >= popsize:
idx = np.argsort(Y_flat)[:popsize]
pop = X_db[idx].copy()
pop_acq = acquisition_func(pop).flatten()
else:
extra = np.random.rand(popsize - N, dim)
pop = np.vstack([X_db.copy(), extra])
pop_acq = acquisition_func(pop).flatten()
for gen in range(max_gen):
# Find current best
best_idx = np.argmin(pop_acq)
x_best = pop[best_idx]
offspring = np.empty_like(pop)
for j in range(popsize):
# Mutation: DE/best/1
candidates = [k for k in range(popsize) if k != j]
r1, r2 = np.random.choice(candidates, 2, replace=False)
mutant = x_best + F * (pop[r1] - pop[r2])
mutant = np.clip(mutant, 0.0, 1.0)
# Binomial crossover
trial = pop[j].copy()
j_rand = np.random.randint(dim)
for d in range(dim):
if np.random.rand() < CR or d == j_rand:
trial[d] = mutant[d]
offspring[j] = trial
# 1-to-1 greedy comparison selection
offspring_acq = acquisition_func(offspring).flatten()
for j in range(popsize):
if offspring_acq[j] <= pop_acq[j]:
pop[j] = offspring[j]
pop_acq[j] = offspring_acq[j]
# Return best solution
best_idx = np.argmin(pop_acq)
return pop[best_idx:best_idx + 1].copy()
# Ensure candidate is not too close to existing samples
def _ensure_uniqueness(self, candidate, X, dim, epsilon=5e-3, max_trials=50):
scales = np.linspace(0.1, 1.0, max_trials)
trial_count = 0
while trial_count < max_trials:
# Compute minimum distance to existing samples
dist = cdist(candidate, X, metric='chebyshev')
min_dist = np.min(dist)
if min_dist >= epsilon:
break # Candidate is sufficiently unique
# Apply perturbation
scale = scales[trial_count % max_trials]
perturbation = scale * (np.random.rand(1, dim) - 0.5)
candidate = candidate + perturbation
# Clip to [0, 1]
candidate = np.clip(candidate, 0.0, 1.0)
trial_count += 1
return candidate