"""
Evolutionary Expected Improvement based Bayesian Optimization for MTOP (EEI-BO+)
This module implements Bayesian Optimization for expensive single-objective optimization problems
using an evolutionary approach to optimize the Expected Improvement acquisition function.
References
----------
[1] Liu, Jiao, et al. "Solving highly expensive optimization problems via evolutionary expected improvement." IEEE Transactions on Systems, Man, and Cybernetics: Systems 53.8 (2023): 4843-4855.
Notes
-----
Author: Jiangtao Shen
Email: j.shen5@exeter.ac.uk
Date: 2025.12.17
Version: 1.0
"""
from tqdm import tqdm
from scipy.interpolate import RBFInterpolator
from ddmtolab.Algorithms.STSO.CMA_ES import CMA_ES
import torch
from ddmtolab.Methods.mtop import MTOP
from botorch.models import SingleTaskGP
from botorch.fit import fit_gpytorch_mll
from gpytorch.mlls import ExactMarginalLogLikelihood
from botorch.models.transforms import Standardize
from ddmtolab.Algorithms.STSO.DE import DE
from botorch.acquisition import LogExpectedImprovement
from ddmtolab.Methods.Algo_Methods.algo_utils import *
import warnings
import time
warnings.filterwarnings("ignore")
[docs]
class EEI_BO_plus:
"""
Evolutionary Expected Improvement based Bayesian Optimization for MTOP.
Attributes
----------
algorithm_information : dict
Dictionary containing algorithm capabilities and requirements
"""
algorithm_information = {
'n_tasks': '[2, K]',
'dims': 'unequal',
'objs': 'equal',
'n_objs': '1',
'cons': 'equal',
'n_cons': '0',
'expensive': 'True',
'knowledge_transfer': 'True',
'n_initial': 'unequal',
'max_nfes': 'unequal'
}
@classmethod
def get_algorithm_information(cls, print_info=True):
return get_algorithm_information(cls, print_info)
[docs]
def __init__(self, problem, n_initial=None, max_nfes=None, switch_interval=6, n1=50, max_nfes1=500, n2=30,
max_nfes2=6000, save_data=True, save_path='./Data', name='EEI-BO+', disable_tqdm=True):
"""
Initialize EEI-BO+ algorithm.
Parameters
----------
problem : MTOP
Multi-task optimization problem instance
n_initial : int or List[int], optional
Number of initial samples per task (default: 50)
max_nfes : int or List[int], optional
Maximum number of function evaluations per task (default: 100)
n1: int, optional
Population size of CMA-ES (default: 50)
max_nfes1: int, optional
Maximum number of function evaluations of CMA-ES (default: 500)
n2: int, optional
Population size of DE (default: 30)
max_nfes2: int, optional
Maximum number of function evaluations of DE (default: 6000)
save_data : bool, optional
Whether to save optimization data (default: True)
save_path : str, optional
Path to save results (default: './TestData')
name : str, optional
Name for the experiment (default: 'EEIBO_test')
disable_tqdm : bool, optional
Whether to disable progress bar (default: True)
"""
self.problem = problem
self.n_initial = n_initial if n_initial is not None else 50
self.max_nfes = max_nfes if max_nfes is not None else 100
self.switch_interval = switch_interval
self.n1 = n1
self.max_nfes1 = max_nfes1
self.n2 = n2
self.max_nfes2 = max_nfes2
self.save_data = save_data
self.save_path = save_path
self.name = name
self.disable_tqdm = disable_tqdm
[docs]
def optimize(self):
"""
Execute the Evolutionary Expected Improvement based Bayesian Optimization algorithm.
Returns
-------
Results
Optimization results containing decision variables, objectives, and runtime
"""
data_type = torch.float
start_time = time.time()
problem = self.problem
nt = problem.n_tasks
dims = problem.dims
n_initial_per_task = par_list(self.n_initial, nt)
max_nfes_per_task = par_list(self.max_nfes, nt)
# Generate initial samples using Latin Hypercube Sampling
decs = initialization(problem, self.n_initial, method='lhs')
objs, _ = evaluation(problem, decs)
nfes_per_task = n_initial_per_task.copy()
pbar = tqdm(total=sum(max_nfes_per_task), initial=sum(n_initial_per_task), desc=f"{self.name}",
disable=self.disable_tqdm)
params_per_task = [None] * nt
modeflag_per_task = [0] * nt # 0=no transfer, 1=transfer
mode_counter_per_task = [1] * nt # counter starts at 1 (matching MATLAB)
while sum(nfes_per_task) < sum(max_nfes_per_task):
# Identify tasks that have not exhausted their evaluation budget
active_tasks = [i for i in range(nt) if nfes_per_task[i] < max_nfes_per_task[i]]
if not active_tasks:
break
for i in active_tasks:
# Build RBF surrogate model from current samples
rbf_i = RBFInterpolator(decs[i], objs[i].flatten())
def rbf(x):
return rbf_i(x)
surrogate_problem = MTOP()
surrogate_problem.add_task(rbf, dim=dims[i])
# Use CMA-ES to extract distribution parameters from surrogate
cmaes = CMA_ES(surrogate_problem, n=self.n1, max_nfes=self.max_nfes1, save_data=False)
cmaes.optimize()
params_i = cmaes.cmaes_params[0]
params_per_task[i] = {
'mu': params_i['m_dec'],
'sigma': params_i['sigma'],
'C': params_i['C']
}
if modeflag_per_task[i] == 0:
# No transfer: use own CMA-ES distribution
mu = params_per_task[i]['mu']
sigma = params_per_task[i]['sigma']
C = params_per_task[i]['C']
else:
# Transfer mode: use adapted params from another task
other_tasks = [t for t in range(nt) if t != i and params_per_task[t] is not None]
if other_tasks:
source = np.random.choice(other_tasks)
mu, sigma, C = _adapt_distribution_params(
params_per_task[source]['mu'],
params_per_task[source]['sigma'],
params_per_task[source]['C'],
dims[source], dims[i]
)
else:
mu = params_per_task[i]['mu']
sigma = params_per_task[i]['sigma']
C = params_per_task[i]['C']
# Select next candidate using Evolutionary Expected Improvement
candidate = eei_bo_next_point(decs[i], objs[i], dims[i], mu, sigma, C, self.n2, self.max_nfes2,
data_type=data_type)
# Evaluate the candidate solution on true objective
new_objs, _ = evaluation_single(problem, candidate, i)
# Update dataset with new sample
decs[i], objs[i] = vstack_groups((decs[i], candidate), (objs[i], new_objs))
nfes_per_task[i] += 1
# Mode switching: 6 no-transfer + 1 transfer (matching MATLAB 6:1 pattern)
if modeflag_per_task[i] == 0 and mode_counter_per_task[i] == self.switch_interval:
modeflag_per_task[i] = 1
mode_counter_per_task[i] = 0
elif modeflag_per_task[i] == 1:
modeflag_per_task[i] = 0
mode_counter_per_task[i] += 1
pbar.update(1)
pbar.close()
runtime = time.time() - start_time
all_decs, all_objs = build_staircase_history(decs, objs, k=1)
# Build and save optimization results
results = build_save_results(all_decs=all_decs, all_objs=all_objs, runtime=runtime, max_nfes=nfes_per_task,
bounds=problem.bounds, save_path=self.save_path,
filename=self.name, save_data=self.save_data)
return results
def eei_acquisition_function(mu, sigma, C, dim, logEI, data_type=torch.float):
"""
Create Evolutionary Expected Improvement acquisition function.
Parameters
----------
mu : ndarray
Mean vector from CMA-ES distribution
sigma : float
Step size from CMA-ES
C : ndarray
Covariance matrix from CMA-ES
dim : int
Dimensionality of the problem
logEI : LogExpectedImprovement
Log Expected Improvement acquisition function
data_type : torch.dtype, optional
Data type for PyTorch tensors (default: torch.float)
Returns
-------
callable
EEI acquisition function
"""
# Compute real covariance matrix and its properties
Sigma_Real = sigma ** 2 * C
det_C = np.linalg.det(Sigma_Real)
C_inv = np.linalg.inv(Sigma_Real)
d = dim
def EEI_func(x):
"""
Evolutionary Expected Improvement acquisition function.
EEI(x) = EI(x) * P(x), where:
- EI(x) is the Expected Improvement
- P(x) is the probability density from CMA-ES distribution
Parameters
----------
x : ndarray
Candidate point(s) to evaluate
Returns
-------
float or ndarray
Negative EEI value(s) for minimization
"""
x = x.reshape(1, -1) if x.ndim == 1 else x
# Compute Expected Improvement
x_torch = torch.tensor(x, dtype=data_type)
with torch.no_grad():
log_ei = logEI(x_torch).detach().cpu().numpy().flatten()
ei = np.exp(log_ei)
# Compute probability density from multivariate normal distribution
normalization = 1.0 / (det_C * (2 * np.pi) ** (d / 2))
diff = x - mu
mahalanobis = np.sum(diff @ C_inv * diff, axis=1)
P = normalization * np.exp(-0.5 * mahalanobis)
# Combine EI with probability density
eei = ei * P
return -float(eei) if x.shape[0] == 1 else -eei
return EEI_func
def eei_bo_next_point(decs, objs, dim, mu, sigma, C, n2, max_nfes2, data_type=torch.float):
"""
Select next candidate point using Evolutionary Expected Improvement criterion.
This function combines the traditional Expected Improvement acquisition function
with a probability distribution derived from CMA-ES, creating an Evolutionary
Expected Improvement (EEI) acquisition function that balances exploration and
exploitation while incorporating evolutionary search information.
Parameters
----------
decs : ndarray
Current decision variables (samples)
objs : ndarray
Current objective values
dim : int
Dimensionality of the problem
mu : ndarray
Mean vector from CMA-ES distribution
sigma : float
Step size from CMA-ES
C : ndarray
Covariance matrix from CMA-ES
n2 : int
Population size of DE
max_nfes2 : int
Maximum number of function evaluations of DE
data_type : torch.dtype, optional
Data type for PyTorch tensors (default: torch.float)
Returns
-------
ndarray
Next candidate point to evaluate
"""
# Fit Gaussian Process surrogate model
train_X = torch.tensor(decs, dtype=data_type)
train_Y = torch.tensor(-objs, dtype=data_type) # Negative for maximization
gp = SingleTaskGP(train_X=train_X, train_Y=train_Y, outcome_transform=Standardize(m=1))
mll = ExactMarginalLogLikelihood(gp.likelihood, gp)
fit_gpytorch_mll(mll)
# Prepare Expected Improvement acquisition function
best_f = train_Y.max()
logEI = LogExpectedImprovement(model=gp, best_f=best_f)
# Create EEI acquisition function
EEI_func = eei_acquisition_function(mu, sigma, C, dim, logEI, data_type)
# Optimize EEI acquisition function using Differential Evolution
problem = MTOP()
problem.add_task(EEI_func, dim=dim)
result = DE(problem, n=n2, max_nfes=max_nfes2, F=0.5, CR=0.9, save_data=False, disable_tqdm=True).optimize()
return result.best_decs
def _adapt_distribution_params(mu_source, sigma_source, C_source, dim_source, dim_target):
"""
Adapt CMA-ES distribution parameters for cross-task transfer with different dimensions.
Matches MATLAB MT_EEI_BO.m dimension handling logic. In [0,1] space (DDMTOLab),
no bound rescaling is needed — only dimension adjustment via truncation or padding.
Parameters
----------
mu_source : ndarray
Mean vector from source task
sigma_source : float
Step size from source task
C_source : ndarray
Covariance matrix from source task
dim_source : int
Dimension of source task
dim_target : int
Dimension of target task
Returns
-------
mu_new, sigma_new, C_new : tuple
Adapted distribution parameters
"""
if dim_source == dim_target:
return mu_source.copy(), sigma_source, C_source.copy()
# Compute full covariance in [0,1] space
Sigma_source = sigma_source ** 2 * C_source
if dim_source > dim_target:
# Source is higher-dim: truncate to first dim_target dimensions
mu_new = mu_source[:dim_target].copy()
C_new = Sigma_source[:dim_target, :dim_target]
else:
# Target is higher-dim: pad mean with random [0,1], pad covariance with identity
mu_new = np.concatenate([mu_source, np.random.rand(dim_target - dim_source)])
C_new = np.eye(dim_target)
C_new[:dim_source, :dim_source] = Sigma_source
# Return sigma=1.0 since covariance already includes sigma^2
return mu_new, 1.0, C_new