"""
Lipschitz Surrogate-Assisted Differential Evolution (LSADE)
This module implements LSADE for expensive single-objective optimization problems.
References
----------
[1] Kudela, J., & Matousek, R. (2023). Combining Lipschitz and RBF surrogate models for high-dimensional computationally expensive problems. Information Sciences, 619, 457-477.
Notes
-----
Author: Jiangtao Shen
Email: j.shen5@exeter.ac.uk
Date: 2026.02.19
Version: 1.0
"""
import time
import numpy as np
from tqdm import tqdm
from scipy.interpolate import RBFInterpolator
from scipy.optimize import minimize
from ddmtolab.Methods.Algo_Methods.algo_utils import *
import warnings
warnings.filterwarnings("ignore")
[docs]
class LSADE:
"""
Lipschitz Surrogate-Assisted Differential Evolution for expensive optimization problems.
Uses three surrogate strategies in deterministic rotation:
1. RBF prescreening: DE/best/1 on Gaussian RBF model
2. Lipschitz prescreening: DE/best/1 on Lipschitz lower-bound surrogate
3. Local optimization: SQP on local RBF model around best solutions
"""
algorithm_information = {
'n_tasks': '[1, K]',
'dims': 'unequal',
'objs': 'equal',
'n_objs': '1',
'cons': 'equal',
'n_cons': '0',
'expensive': 'True',
'knowledge_transfer': 'False',
'n_initial': 'unequal',
'max_nfes': 'unequal'
}
@classmethod
def get_algorithm_information(cls, print_info=True):
return get_algorithm_information(cls, print_info)
[docs]
def __init__(self, problem, n_initial=None, max_nfes=None, save_data=True,
save_path='./Data', name='LSADE', disable_tqdm=True):
"""
Initialize LSADE algorithm.
Parameters
----------
problem : MTOP
Multi-task optimization problem instance
n_initial : int or List[int], optional
Number of initial samples per task (default: 50)
max_nfes : int or List[int], optional
Maximum number of function evaluations per task (default: 300)
save_data : bool, optional
Whether to save optimization data (default: True)
save_path : str, optional
Path to save results (default: './Data')
name : str, optional
Name for the experiment (default: 'LSADE')
disable_tqdm : bool, optional
Whether to disable progress bar (default: True)
"""
self.problem = problem
self.n_initial = n_initial if n_initial is not None else 50
self.max_nfes = max_nfes if max_nfes is not None else 300
self.save_data = save_data
self.save_path = save_path
self.name = name
self.disable_tqdm = disable_tqdm
[docs]
def optimize(self):
"""
Execute the LSADE algorithm.
Returns
-------
Results
Optimization results containing decision variables, objectives, and runtime
"""
start_time = time.time()
problem = self.problem
nt = problem.n_tasks
dims = problem.dims
n_initial_per_task = par_list(self.n_initial, nt)
max_nfes_per_task = par_list(self.max_nfes, nt)
# Generate initial samples using Latin Hypercube Sampling
decs = initialization(problem, self.n_initial, method='lhs')
objs, _ = evaluation(problem, decs)
nfes_per_task = n_initial_per_task.copy()
# Current working dataset
current_decs = [decs[i].copy() for i in range(nt)]
current_objs = [objs[i].copy() for i in range(nt)]
pbar = tqdm(total=sum(max_nfes_per_task), initial=sum(n_initial_per_task),
desc=f"{self.name}", disable=self.disable_tqdm)
while sum(nfes_per_task) < sum(max_nfes_per_task):
active_tasks = [i for i in range(nt) if nfes_per_task[i] < max_nfes_per_task[i]]
if not active_tasks:
break
for i in active_tasks:
dim = dims[i]
X = current_decs[i]
Y = current_objs[i]
# Deterministic state rotation: 0=RBF, 1=Lipschitz, 2=Local
state = (nfes_per_task[i] - n_initial_per_task[i]) % 3
if state == 0:
candidate = self._state_rbf(X, Y, dim)
elif state == 1:
candidate = self._state_lipschitz(X, Y, dim)
else:
candidate = self._state_local(X, Y, dim)
# Ensure uniqueness
candidate = self._ensure_uniqueness(candidate, X, dim)
# Evaluate
obj, _ = evaluation_single(problem, candidate, i)
# Update dataset
current_decs[i] = np.vstack([current_decs[i], candidate])
current_objs[i] = np.vstack([current_objs[i], obj])
nfes_per_task[i] += 1
pbar.update(1)
pbar.close()
runtime = time.time() - start_time
# Convert database to staircase history structure for results
db_decs = [current_decs[i].copy() for i in range(nt)]
db_objs = [current_objs[i].copy() for i in range(nt)]
all_decs, all_objs = build_staircase_history(db_decs, db_objs, k=1)
results = build_save_results(
all_decs=all_decs, all_objs=all_objs, runtime=runtime,
max_nfes=nfes_per_task, bounds=problem.bounds,
save_path=self.save_path, filename=self.name,
save_data=self.save_data
)
return results
# ==================== State Methods ====================
def _state_rbf(self, X, Y, dim):
"""
State 0: RBF prescreening with DE/best/1.
Builds Gaussian RBF on all data, generates one set of DE/best/1
offspring, and returns the best offspring by surrogate prediction.
"""
model = self._build_rbf_model(X, Y)
return self._prescreening(model, X, Y, dim)
def _state_lipschitz(self, X, Y, dim):
"""
State 1: Lipschitz prescreening with DE/best/1.
Estimates the Lipschitz constant, builds a Lipschitz lower-bound
surrogate, and performs DE/best/1 prescreening.
"""
Y_flat = Y.flatten()
k = self._k_est(Y_flat, X)
def lip_model(x):
if x.ndim == 1:
x = x.reshape(1, -1)
return self._f_lip(Y_flat, X, k, x)
return self._prescreening(lip_model, X, Y, dim)
def _state_local(self, X, Y, dim):
"""
State 2: Local optimization (SQP) on RBF.
Selects top min(N, 3*dim) solutions by fitness, builds local RBF,
and runs SQP optimization from a random starting point within
local bounds.
"""
N = len(X)
n_local = min(N, 3 * dim)
Y_flat = Y.flatten()
idx = np.argsort(Y_flat)[:n_local]
X_local = X[idx]
Y_local = Y[idx]
lb_local = np.min(X_local, axis=0)
ub_local = np.max(X_local, axis=0)
# Handle degenerate bounds
mask = (ub_local - lb_local) < 1e-10
lb_local[mask] = np.maximum(0.0, lb_local[mask] - 0.05)
ub_local[mask] = np.minimum(1.0, ub_local[mask] + 0.05)
model = self._build_rbf_model(X_local, Y_local)
# Random starting point within local bounds (matching MATLAB behavior)
x0 = lb_local + (ub_local - lb_local) * np.random.rand(dim)
bounds = list(zip(lb_local, ub_local))
def obj_func(x):
return model(x.reshape(1, -1)).flatten()[0]
try:
result = minimize(obj_func, x0, method='SLSQP', bounds=bounds,
options={'maxiter': 20, 'disp': False})
candidate = result.x.reshape(1, -1)
except Exception:
candidate = x0.reshape(1, -1)
candidate = np.clip(candidate, 0.0, 1.0)
return candidate
# ==================== Surrogate Models ====================
def _build_rbf_model(self, X, Y):
"""Build Gaussian RBF surrogate model (matching MATLAB newrbe)."""
Y_flat = Y.flatten()
n_samples, dim = X.shape
if n_samples > 1:
dist_matrix = cdist(X, X, metric='euclidean')
max_dist = dist_matrix.max()
spread = max_dist / (dim * n_samples) ** (1.0 / dim)
else:
spread = 1.0
try:
rbf = RBFInterpolator(X, Y_flat, kernel='gaussian', epsilon=1.0 / spread)
except Exception:
rbf = RBFInterpolator(X, Y_flat, kernel='thin_plate_spline')
def model(x):
if x.ndim == 1:
x = x.reshape(1, -1)
return rbf(x).reshape(-1, 1)
return model
def _k_est(self, Y, X):
"""
Estimate the Lipschitz constant from sampled data.
Computes pairwise Lipschitz estimates k = |f(i)-f(j)| / ||x(i)-x(j)||,
takes the maximum, and applies exponential scaling.
Parameters
----------
Y : np.ndarray, shape (n,)
Function values at sampled points
X : np.ndarray, shape (n, d)
Decision variables at sampled points
Returns
-------
float
Estimated Lipschitz constant
"""
alpha = 0.01
# Pairwise distances and objective differences
dist_matrix = cdist(X, X)
np.fill_diagonal(dist_matrix, np.inf)
Y_diff = np.abs(Y.reshape(-1, 1) - Y.reshape(1, -1))
with np.errstate(divide='ignore', invalid='ignore'):
k_matrix = Y_diff / dist_matrix
k_matrix[~np.isfinite(k_matrix)] = 0.0
k_max = np.max(k_matrix)
if k_max > 0:
i_t = int(np.ceil(np.log(k_max) / np.log(1 + alpha)))
est = (1 + alpha) ** i_t
else:
est = 1.0
return est
def _f_lip(self, Y, X, k, X_query):
"""
Evaluate the Lipschitz lower-bound surrogate at query points.
For each query point, computes min_i { Y(i) + k * ||X(i) - x_query|| }
which provides a lower bound on the true function value.
Parameters
----------
Y : np.ndarray, shape (n,)
Observed function values
X : np.ndarray, shape (n, d)
Observed decision variables
k : float
Lipschitz constant
X_query : np.ndarray, shape (nq, d)
Query points
Returns
-------
np.ndarray, shape (nq, 1)
Predicted surrogate values
"""
# dist: (nq, n) pairwise distances
dist = cdist(X_query, X)
# vals[i,j] = Y[j] + k * ||X_query[i] - X[j]||
vals = Y.flatten() + k * dist
# Take minimum over all observed points for each query
result = np.min(vals, axis=1)
return result.reshape(-1, 1)
# ==================== DE Prescreening ====================
def _prescreening(self, surrogate_func, X_full, Y_full, dim, popsize=50):
"""
Single-generation DE/best/1 prescreening with best+randperm initialization.
1. Initialize population: best individual + (popsize-1) random permutation from database
2. Generate offspring via DE/best/1 (one generation)
3. Evaluate offspring on surrogate
4. Return the best offspring
Parameters
----------
surrogate_func : callable
Surrogate model function, accepts (n, d) array, returns (n, 1) predictions
X_full : np.ndarray
Full database of decision variables
Y_full : np.ndarray
Full database of objectives
dim : int
Problem dimension
popsize : int
Population size for DE (default: 50)
"""
Y_flat = Y_full.flatten()
N = len(Y_flat)
# best+randperm initialization: best individual first, then random permutation
best_idx = np.argmin(Y_flat)
seq = np.random.permutation(N)
if N >= popsize:
init_idx = np.concatenate([[best_idx], seq[:popsize - 1]])
else:
init_idx = np.concatenate([[best_idx], seq])
pop = X_full[init_idx].copy()
pop_objs = Y_flat[init_idx].copy()
# Fill remaining slots with random solutions if database is small
if len(pop) < popsize:
n_extra = popsize - len(pop)
extra = np.random.rand(n_extra, dim)
pop = np.vstack([pop, extra])
pop_objs = np.concatenate([pop_objs, surrogate_func(extra).flatten()])
# Generate offspring via DE/best/1 (single generation)
offspring = self._de_best1(pop, pop_objs, F=0.5, CR=0.8)
# Evaluate offspring on surrogate
off_objs = surrogate_func(offspring).flatten()
# Return the best offspring
best_off_idx = np.argmin(off_objs)
return offspring[best_off_idx:best_off_idx + 1]
def _de_best1(self, parents, objs, F=0.5, CR=0.8):
"""
DE/best/1/bin offspring generation in [0,1] space.
Parameters
----------
parents : np.ndarray
Parent population, shape (n, d), in [0,1]
objs : np.ndarray
Objective values, shape (n,)
F : float
Differential weight
CR : float
Crossover rate
Returns
-------
offspring : np.ndarray
Offspring population, shape (n, d), clipped to [0,1]
"""
popsize, dim = parents.shape
best_idx = np.argmin(objs)
best = parents[best_idx]
offspring = parents.copy()
for i in range(popsize):
idxs = np.delete(np.arange(popsize), i)
r1, r2 = np.random.choice(idxs, 2, replace=False)
mutant = best + F * (parents[r1] - parents[r2])
j_rand = np.random.randint(dim)
mask = (np.random.rand(dim) <= CR) | (np.arange(dim) == j_rand)
offspring[i, mask] = mutant[mask]
return np.clip(offspring, 0, 1)
# ==================== Utilities ====================
def _ensure_uniqueness(self, candidate, X, dim, epsilon=5e-3, max_trials=50):
"""Ensure candidate is not too close to existing samples."""
scales = np.linspace(0.1, 1.0, max_trials)
for t in range(max_trials):
dist = cdist(candidate, X, metric='chebyshev').min()
if dist >= epsilon:
break
perturbation = scales[t] * (np.random.rand(1, dim) - 0.5)
candidate = np.clip(candidate + perturbation, 0.0, 1.0)
return candidate