"""
Exploration-exploitation Switching Assisted Optimization (ESAO)
This module implements ESAO for expensive single-objective optimization problems.
References
----------
[1] Wang, Xinjing, et al. "A novel evolutionary sampling assisted optimization method for high-dimensional expensive problems." IEEE Transactions on Evolutionary Computation 23.5 (2019): 815-827.
Notes
-----
Author: Jiangtao Shen
Email: j.shen5@exeter.ac.uk
Date: 2025.01.13
Version: 1.0
"""
import time
import numpy as np
from tqdm import tqdm
from scipy.interpolate import RBFInterpolator
from ddmtolab.Methods.Algo_Methods.algo_utils import *
from ddmtolab.Algorithms.STSO.DE import DE
from ddmtolab.Methods.mtop import MTOP
import warnings
warnings.filterwarnings("ignore")
[docs]
class ESAO:
"""
Exploration-exploitation Switching Assisted Optimization for expensive optimization problems.
This algorithm adaptively switches between global and local search based on improvement:
1. Global search: RBF model built on all data points
2. Local search: RBF model built on top 2*dim points nearest to the best
"""
algorithm_information = {
'n_tasks': '[1, K]',
'dims': 'unequal',
'objs': 'equal',
'n_objs': '1',
'cons': 'equal',
'n_cons': '0',
'expensive': 'True',
'knowledge_transfer': 'False',
'n_initial': 'unequal',
'max_nfes': 'unequal'
}
@classmethod
def get_algorithm_information(cls, print_info=True):
return get_algorithm_information(cls, print_info)
[docs]
def __init__(self, problem, n_initial=None, max_nfes=None, save_data=True,
save_path='./Data', name='ESAO', disable_tqdm=True):
"""
Initialize ESAO algorithm.
Parameters
----------
problem : MTOP
Multi-task optimization problem instance
n_initial : int or List[int], optional
Number of initial samples per task (default: 100)
max_nfes : int or List[int], optional
Maximum number of function evaluations per task (default: 300)
save_data : bool, optional
Whether to save optimization data (default: True)
save_path : str, optional
Path to save results (default: './TestData')
name : str, optional
Name for the experiment (default: 'ESAO_test')
disable_tqdm : bool, optional
Whether to disable progress bar (default: True)
"""
self.problem = problem
self.n_initial = n_initial if n_initial is not None else 100
self.max_nfes = max_nfes if max_nfes is not None else 300
self.save_data = save_data
self.save_path = save_path
self.name = name
self.disable_tqdm = disable_tqdm
# DE parameters for surrogate optimization
self.global_popsize = 100
self.global_max_gen = 5
self.local_popsize = 100
self.local_max_gen = 50
[docs]
def optimize(self):
"""
Execute the ESAO algorithm.
Returns
-------
Results
Optimization results containing decision variables, objectives, and runtime
"""
start_time = time.time()
problem = self.problem
nt = problem.n_tasks
dims = problem.dims
n_initial_per_task = par_list(self.n_initial, nt)
max_nfes_per_task = par_list(self.max_nfes, nt)
# Generate initial samples using Latin Hypercube Sampling
decs = initialization(problem, self.n_initial, method='lhs')
objs, _ = evaluation(problem, decs)
nfes_per_task = n_initial_per_task.copy()
# Current working dataset (accumulated samples)
current_decs = [decs[i].copy() for i in range(nt)]
current_objs = [objs[i].copy() for i in range(nt)]
# Track search state for each task: 0 = global, 1 = local
search_state = [0] * nt
# Track best objective value before each iteration for improvement check
best_before = [np.min(current_objs[i]) for i in range(nt)]
pbar = tqdm(total=sum(max_nfes_per_task), initial=sum(n_initial_per_task),
desc=f"{self.name}", disable=self.disable_tqdm)
while sum(nfes_per_task) < sum(max_nfes_per_task):
# Skip tasks that have exhausted their evaluation budget
active_tasks = [i for i in range(nt) if nfes_per_task[i] < max_nfes_per_task[i]]
if not active_tasks:
break
for i in active_tasks:
dim = dims[i]
X = current_decs[i]
Y = current_objs[i]
# Build surrogate models
global_model = self._build_rbf_model(X, Y)
local_model = self._build_local_rbf_model(X, Y, dim)
# Execute search based on current state
if search_state[i] == 0:
# Global search
candidate_np = self._global_search(global_model, dim)
else:
# Local search
candidate_np = self._local_search(local_model, X, Y, dim)
# Ensure uniqueness: avoid duplicate sampling
candidate_np = self._ensure_uniqueness(candidate_np, X, dim)
# Evaluate the candidate solution
obj, _ = evaluation_single(problem, candidate_np, i)
# Update current working dataset
current_decs[i] = np.vstack([current_decs[i], candidate_np])
current_objs[i] = np.vstack([current_objs[i], obj])
nfes_per_task[i] += 1
pbar.update(1)
# Check improvement and update search state
best_after = np.min(current_objs[i])
if best_after < best_before[i]:
# Improvement found, continue current search
pass
else:
# No improvement, switch search state
search_state[i] = 1 - search_state[i]
# Update best_before for next iteration
best_before[i] = best_after
pbar.close()
runtime = time.time() - start_time
# Convert database to staircase history structure for results
db_decs = [current_decs[i].copy() for i in range(nt)]
db_objs = [current_objs[i].copy() for i in range(nt)]
all_decs, all_objs = build_staircase_history(db_decs, db_objs, k=1)
# Build and save results
results = build_save_results(all_decs=all_decs, all_objs=all_objs, runtime=runtime, max_nfes=nfes_per_task,
bounds=problem.bounds, save_path=self.save_path, filename=self.name,
save_data=self.save_data)
return results
# Build RBF surrogate model using all data points
def _build_rbf_model(self, X, Y):
Y = Y.flatten()
n_samples, dim = X.shape
if n_samples > 1:
# Compute pairwise distances
dist_matrix = cdist(X, X, metric='euclidean')
max_dist = dist_matrix.max()
# Adaptive spread estimation (MATLAB newrbe formula)
spread = max_dist / (dim * n_samples) ** (1.0 / dim)
else:
spread = 1.0
# Use Gaussian RBF
try:
rbf_interpolator = RBFInterpolator(X, Y, kernel='gaussian', epsilon=1.0 / spread)
except:
# Fallback to thin_plate_spline if gaussian fails
rbf_interpolator = RBFInterpolator(X, Y, kernel='thin_plate_spline')
def rbf_model(x):
if x.ndim == 1:
x = x.reshape(1, -1)
pred = rbf_interpolator(x)
return pred.reshape(-1, 1)
return rbf_model
# Build local RBF surrogate model using top 2*dim points nearest to the best
def _build_local_rbf_model(self, X, Y, dim):
# Number of local samples: min(2*dim, available samples)
n_local = min(2 * dim, len(X))
n_local = max(n_local, 2)
# Find the best point
idx_best = np.argmin(Y)
best_point = X[idx_best:idx_best + 1]
# Compute distances to the best point
distances = cdist(X, best_point, metric='euclidean').flatten()
# Select the n_local nearest points
idx_sorted = np.argsort(distances)[:n_local]
X_local = X[idx_sorted]
Y_local = Y[idx_sorted]
# Build RBF model on local data
return self._build_rbf_model(X_local, Y_local)
# Global search using DE to optimize the global surrogate model
def _global_search(self, global_model, dim):
# Create surrogate problem wrapper
def surrogate_objective(x):
return global_model(x)
surrogate_problem = MTOP()
surrogate_problem.add_task(objective_func=surrogate_objective, dim=dim)
# Run DE
max_nfes = self.global_popsize * self.global_max_gen
de = DE(surrogate_problem, n=self.global_popsize, max_nfes=max_nfes, disable_tqdm=True)
results = de.optimize()
# Get best solution
best_solution = results.best_decs[0].reshape(1, -1)
# Ensure solution is within [0, 1]
best_solution = np.clip(best_solution, 0.0, 1.0)
return best_solution
# Local search using DE to optimize the local surrogate model
def _local_search(self, local_model, X, Y, dim):
# Define local search bounds based on local data region
n_local = min(2 * dim, len(X))
idx_best = np.argmin(Y)
best_point = X[idx_best:idx_best + 1]
distances = cdist(X, best_point, metric='euclidean').flatten()
idx_sorted = np.argsort(distances)[:n_local]
X_local = X[idx_sorted]
# Get bounds of local region
lb_local = np.min(X_local, axis=0)
ub_local = np.max(X_local, axis=0)
# Handle case where lb == ub (add small margin)
range_mask = (ub_local - lb_local) < 1e-10
if np.any(range_mask):
lb_local[range_mask] = np.maximum(0.0, lb_local[range_mask] - 0.05)
ub_local[range_mask] = np.minimum(1.0, ub_local[range_mask] + 0.05)
# Create surrogate problem wrapper with local bounds
def surrogate_objective(x):
# x is in [0,1], denormalize to local bounds
x_denorm = lb_local + x * (ub_local - lb_local)
return local_model(x_denorm)
surrogate_problem = MTOP()
surrogate_problem.add_task(objective_func=surrogate_objective, dim=dim)
# Run DE
max_nfes = self.local_popsize * self.local_max_gen
de = DE(surrogate_problem, n=self.local_popsize, max_nfes=max_nfes, disable_tqdm=True)
results = de.optimize()
# Get best solution in [0,1] space and denormalize
best_x_normalized = results.best_decs[0].reshape(1, -1)
best_solution = lb_local + best_x_normalized * (ub_local - lb_local)
# Ensure solution is within [0, 1]
best_solution = np.clip(best_solution, 0.0, 1.0)
return best_solution
# Ensure candidate is not too close to existing samples
def _ensure_uniqueness(self, candidate, X, dim, epsilon=5e-3, max_trials=50):
scales = np.linspace(0.1, 1.0, max_trials)
trial_count = 0
while trial_count < max_trials:
# Compute minimum distance to existing samples
dist = cdist(candidate, X, metric='euclidean')
min_dist = np.min(dist)
if min_dist >= epsilon:
break # Candidate is sufficiently unique
# Apply perturbation
scale = scales[trial_count % max_trials]
perturbation = scale * (np.random.rand(1, dim) - 0.5)
candidate = candidate + perturbation
# Clip to [0, 1]
candidate = np.clip(candidate, 0.0, 1.0)
trial_count += 1
return candidate