"""
Data-Driven Evolutionary Algorithm with Multi-Evolutionary Sampling Strategy (DDEA-MESS)
This module implements DDEA-MESS for expensive single-objective optimization problems.
References
----------
[1] Yu, F., Gong, W., & Zhen, H. (2022). A data-driven evolutionary algorithm with multi-evolutionary sampling strategy for expensive optimization. Knowledge-Based Systems, 242, 108436.
Notes
-----
Author: Jiangtao Shen
Email: j.shen5@exeter.ac.uk
Date: 2026.02.19
Version: 1.0
"""
import time
import numpy as np
from tqdm import tqdm
from scipy.interpolate import RBFInterpolator
from scipy.optimize import minimize
from ddmtolab.Methods.Algo_Methods.algo_utils import *
from ddmtolab.Methods.mtop import MTOP
import warnings
warnings.filterwarnings("ignore")
[docs]
class DDEA_MESS:
"""
Data-Driven Evolutionary Algorithm with Multi-Evolutionary Sampling Strategy.
Dynamically selects from three search strategies based on evaluation budget usage:
1. Global search: DE/rand/1 prescreening on RBF model built from first min(N, 300) samples
2. Local search: DE/best/1 on RBF model built from top tau samples by fitness
3. Trust region search: Local optimization (L-BFGS-B) on RBF model around best solution
"""
algorithm_information = {
'n_tasks': '[1, K]',
'dims': 'unequal',
'objs': 'equal',
'n_objs': '1',
'cons': 'equal',
'n_cons': '0',
'expensive': 'True',
'knowledge_transfer': 'False',
'n_initial': 'unequal',
'max_nfes': 'unequal'
}
@classmethod
def get_algorithm_information(cls, print_info=True):
return get_algorithm_information(cls, print_info)
[docs]
def __init__(self, problem, n_initial=None, max_nfes=None, save_data=True,
save_path='./Data', name='DDEA-MESS', disable_tqdm=True):
"""
Initialize DDEA-MESS algorithm.
Parameters
----------
problem : MTOP
Multi-task optimization problem instance
n_initial : int or List[int], optional
Number of initial samples per task (default: 100)
max_nfes : int or List[int], optional
Maximum number of function evaluations per task (default: 300)
save_data : bool, optional
Whether to save optimization data (default: True)
save_path : str, optional
Path to save results (default: './Data')
name : str, optional
Name for the experiment (default: 'DDEA-MESS')
disable_tqdm : bool, optional
Whether to disable progress bar (default: True)
"""
self.problem = problem
self.n_initial = n_initial if n_initial is not None else 100
self.max_nfes = max_nfes if max_nfes is not None else 300
self.save_data = save_data
self.save_path = save_path
self.name = name
self.disable_tqdm = disable_tqdm
[docs]
def optimize(self):
"""
Execute the DDEA-MESS algorithm.
Returns
-------
Results
Optimization results containing decision variables, objectives, and runtime
"""
start_time = time.time()
problem = self.problem
nt = problem.n_tasks
dims = problem.dims
n_initial_per_task = par_list(self.n_initial, nt)
max_nfes_per_task = par_list(self.max_nfes, nt)
# Generate initial samples using Latin Hypercube Sampling
decs = initialization(problem, self.n_initial, method='lhs')
objs, _ = evaluation(problem, decs)
nfes_per_task = n_initial_per_task.copy()
# Current working dataset
current_decs = [decs[i].copy() for i in range(nt)]
current_objs = [objs[i].copy() for i in range(nt)]
pbar = tqdm(total=sum(max_nfes_per_task), initial=sum(n_initial_per_task),
desc=f"{self.name}", disable=self.disable_tqdm)
while sum(nfes_per_task) < sum(max_nfes_per_task):
active_tasks = [i for i in range(nt) if nfes_per_task[i] < max_nfes_per_task[i]]
if not active_tasks:
break
for i in active_tasks:
dim = dims[i]
X = current_decs[i]
Y = current_objs[i]
# Select strategy using MESS
strategy_id = self._mess(nfes_per_task[i], 500)
if strategy_id == 1:
candidate = self._strategy_global(X, Y, dim)
elif strategy_id == 2:
candidate = self._strategy_local(X, Y, dim)
else:
candidate = self._strategy_trust_region(X, Y, dim)
# Ensure uniqueness
candidate = self._ensure_uniqueness(candidate, X, dim)
# Evaluate
obj, _ = evaluation_single(problem, candidate, i)
# Update dataset
current_decs[i] = np.vstack([current_decs[i], candidate])
current_objs[i] = np.vstack([current_objs[i], obj])
nfes_per_task[i] += 1
pbar.update(1)
pbar.close()
runtime = time.time() - start_time
# Convert database to staircase history structure for results
db_decs = [current_decs[i].copy() for i in range(nt)]
db_objs = [current_objs[i].copy() for i in range(nt)]
all_decs, all_objs = build_staircase_history(db_decs, db_objs, k=1)
results = build_save_results(
all_decs=all_decs, all_objs=all_objs, runtime=runtime,
max_nfes=nfes_per_task, bounds=problem.bounds,
save_path=self.save_path, filename=self.name,
save_data=self.save_data
)
return results
def _mess(self, fes_used, fes_max):
"""
Multi-Evolutionary Sampling Strategy selector.
Dynamically computes probabilities for three strategies based on
the ratio of used evaluations to total budget.
Parameters
----------
fes_used : int
Number of function evaluations used (including initial samples)
fes_max : int
Maximum number of function evaluations (total budget)
Returns
-------
int
Strategy ID: 1 (global), 2 (local), or 3 (trust region)
"""
ratio = fes_used / fes_max
beta = (1 - ratio ** 3) ** 2
alpha = abs(beta * np.sin(3 * np.pi / 2 + np.sin(beta * 3 * np.pi / 2)))
P3 = (1 - alpha) if alpha > 2 / 3 else 1 / 3
P1 = (1 - P3) / 2
r = np.random.rand()
if r <= P1:
return 1 # global search
elif r <= 2 * P1:
return 2 # local search
else:
return 3 # trust region search
def _build_rbf_model(self, X, Y):
"""Build Gaussian RBF surrogate model."""
Y_flat = Y.flatten()
n_samples, dim = X.shape
if n_samples > 1:
dist_matrix = cdist(X, X, metric='euclidean')
max_dist = dist_matrix.max()
spread = max_dist / (dim * n_samples) ** (1.0 / dim)
else:
spread = 1.0
try:
rbf = RBFInterpolator(X, Y_flat, kernel='gaussian', epsilon=1.0 / spread)
except Exception:
rbf = RBFInterpolator(X, Y_flat, kernel='thin_plate_spline')
def model(x):
if x.ndim == 1:
x = x.reshape(1, -1)
return rbf(x).reshape(-1, 1)
return model
def _strategy_global(self, X, Y, dim):
"""
Strategy 1: Global search with DE/rand/1 prescreening.
Builds RBF on first min(N, 300) samples (initial LHS provides good coverage).
Runs 1 generation of DE/rand/1 with elite initialization from full database.
"""
N = len(X)
m = min(N, 300)
# Build RBF on first m samples
model = self._build_rbf_model(X[:m], Y[:m])
# DE/rand/1 prescreening (1 generation)
candidate = self._de_search(
model, X, Y,
lb=np.zeros(dim), ub=np.ones(dim),
dim=dim, popsize=50, max_gen=1, mode='rand'
)
return candidate
def _strategy_local(self, X, Y, dim):
"""
Strategy 2: Local search with DE/best/1.
Selects top tau = min(dim+25, N) solutions by fitness.
Builds RBF on selected data within local bounds.
Runs 10 generations of DE/best/1 with elite initialization from full database.
"""
N = len(X)
tau = min(dim + 25, N)
# Select top tau solutions by fitness
Y_flat = Y.flatten()
idx = np.argsort(Y_flat)[:tau]
X_local = X[idx]
Y_local = Y[idx]
lb_local = np.min(X_local, axis=0)
ub_local = np.max(X_local, axis=0)
# Handle degenerate bounds
mask = (ub_local - lb_local) < 1e-10
lb_local[mask] = np.maximum(0.0, lb_local[mask] - 0.05)
ub_local[mask] = np.minimum(1.0, ub_local[mask] + 0.05)
# Build RBF on local data
model = self._build_rbf_model(X_local, Y_local)
# DE/best/1 search (10 generations)
candidate = self._de_search(
model, X, Y,
lb=lb_local, ub=ub_local,
dim=dim, popsize=50, max_gen=10, mode='best'
)
return candidate
def _strategy_trust_region(self, X, Y, dim):
"""
Strategy 3: Trust region search with local optimization.
Selects min(N, 5*dim) nearest neighbors to the best solution.
Builds RBF on selected data and runs L-BFGS-B optimization.
"""
N = len(X)
m = min(N, 5 * dim)
Y_flat = Y.flatten()
idx_min = np.argmin(Y_flat)
# Select m nearest neighbors to best
dist = cdist(X, X[idx_min:idx_min + 1]).flatten()
idx = np.argsort(dist)[:m]
X_trs = X[idx]
Y_trs = Y[idx]
# Build RBF on trust region data
model = self._build_rbf_model(X_trs, Y_trs)
lb_trs = np.min(X_trs, axis=0)
ub_trs = np.max(X_trs, axis=0)
# Handle degenerate bounds
mask = (ub_trs - lb_trs) < 1e-10
lb_trs[mask] = np.maximum(0.0, lb_trs[mask] - 0.05)
ub_trs[mask] = np.minimum(1.0, ub_trs[mask] + 0.05)
# Local optimization starting from best solution
x0 = X[idx_min]
bounds = list(zip(lb_trs, ub_trs))
def obj_func(x):
return model(x.reshape(1, -1)).flatten()[0]
try:
result = minimize(obj_func, x0, method='trust-constr', bounds=bounds,
options={'maxiter': 20, 'disp': False})
candidate = result.x.reshape(1, -1)
except Exception:
candidate = x0.reshape(1, -1)
candidate = np.clip(candidate, 0.0, 1.0)
return candidate
def _de_search(self, surrogate_func, X_full, Y_full, lb, ub, dim,
popsize=50, max_gen=10, mode='rand'):
"""
Run DE on surrogate model with elite initialization.
Parameters
----------
surrogate_func : callable
Surrogate model, accepts (n, d) array, returns (n, 1) predictions
X_full : np.ndarray
Full database of decision variables (for elite initialization)
Y_full : np.ndarray
Full database of objectives (for elite initialization)
lb, ub : np.ndarray
Search bounds, shape (d,)
dim : int
Problem dimension
popsize : int
Population size
max_gen : int
Maximum DE generations
mode : str
'rand' for DE/rand/1/bin, 'best' for DE/best/1/bin
"""
CR, F = 0.8, 0.5
Y_flat = Y_full.flatten()
N = len(Y_flat)
# Elite initialization: top popsize from full database
if N >= popsize:
idx = np.argsort(Y_flat)[:popsize]
pop = X_full[idx].copy()
pop_objs = Y_flat[idx].copy()
else:
extra = lb + (ub - lb) * np.random.rand(popsize - N, dim)
pop = np.vstack([X_full.copy(), extra])
pop_objs = np.concatenate([Y_flat, surrogate_func(extra).flatten()])
# Compute range for normalization
range_vec = ub - lb
range_vec = np.maximum(range_vec, 1e-30)
for gen in range(max_gen):
# Normalize to [0,1] within [lb, ub]
pop_norm = np.clip((pop - lb) / range_vec, 0, 1)
# Generate offspring in normalized space
if mode == 'rand':
off_norm = de_generation(pop_norm, F, CR)
else:
off_norm = self._de_best1(pop_norm, pop_objs, F, CR)
# Denormalize to original space
offspring = lb + np.clip(off_norm, 0, 1) * range_vec
# Evaluate on surrogate
off_objs = surrogate_func(offspring).flatten()
# Comparison selection (standard DE)
improved = off_objs < pop_objs
pop[improved] = offspring[improved]
pop_objs[improved] = off_objs[improved]
best_idx = np.argmin(pop_objs)
return pop[best_idx:best_idx + 1]
def _de_best1(self, parents, objs, F=0.5, CR=0.8):
"""
DE/best/1/bin offspring generation in [0,1] normalized space.
Parameters
----------
parents : np.ndarray
Parent population, shape (n, d), in [0,1]
objs : np.ndarray
Objective values, shape (n,)
F : float
Differential weight
CR : float
Crossover rate
Returns
-------
offspring : np.ndarray
Offspring population, shape (n, d), clipped to [0,1]
"""
popsize, dim = parents.shape
best_idx = np.argmin(objs)
best = parents[best_idx]
offspring = parents.copy()
for i in range(popsize):
idxs = np.delete(np.arange(popsize), i)
r1, r2 = np.random.choice(idxs, 2, replace=False)
mutant = best + F * (parents[r1] - parents[r2])
j_rand = np.random.randint(dim)
mask = (np.random.rand(dim) <= CR) | (np.arange(dim) == j_rand)
offspring[i, mask] = mutant[mask]
return np.clip(offspring, 0, 1)
def _ensure_uniqueness(self, candidate, X, dim, epsilon=5e-3, max_trials=50):
"""Ensure candidate is not too close to existing samples."""
scales = np.linspace(0.1, 1.0, max_trials)
for t in range(max_trials):
dist = cdist(candidate, X, metric='chebyshev').min()
if dist >= epsilon:
break
perturbation = scales[t] * (np.random.rand(1, dim) - 0.5)
candidate = np.clip(candidate + perturbation, 0.0, 1.0)
return candidate