"""
SaEF-AKT: Surrogate-Assisted Evolutionary Framework with Adaptive Knowledge Transfer
This module implements SaEF-AKT for expensive multi-task single-objective optimization.
References
----------
[1] Z. Huang, J. Zhong, and W. N. N. Yu, "Surrogate-Assisted Evolutionary Framework with Adaptive Knowledge Transfer for Multi-Task Optimization," IEEE Trans. Emerg. Topics Comput., vol. 9, no. 4, pp. 1930-1944, 2021.
Notes
-----
Author: Jiangtao Shen
Email: j.shen5@exeter.ac.uk
Date: 2026.02.22
Version: 1.0
"""
import time
import warnings
import numpy as np
import torch
from tqdm import tqdm
from botorch.models import SingleTaskGP
from botorch.fit import fit_gpytorch_mll
from gpytorch.mlls import ExactMarginalLogLikelihood
from scipy.spatial.distance import cdist
from ddmtolab.Methods.Algo_Methods.algo_utils import *
warnings.filterwarnings("ignore")
[docs]
class SaEF_AKT:
"""
Surrogate-Assisted Evolutionary Framework with Adaptive Knowledge Transfer.
This algorithm features:
- Local Gaussian Process modeling (NC nearest + NR recent points)
- Multiple merit functions with different exploration-exploitation balance (g = 0, 1, 2, 4)
- DE-based optimization of merit functions on GP surrogate
- KL divergence-based task similarity measurement
- Pheromone-based adaptive auxiliary task selection for knowledge transfer
Attributes
----------
algorithm_information : dict
Dictionary containing algorithm capabilities and requirements
"""
algorithm_information = {
'n_tasks': '[2, K]',
'dims': 'unequal',
'objs': 'equal',
'n_objs': '1',
'cons': 'equal',
'n_cons': '0',
'expensive': 'True',
'knowledge_transfer': 'True',
'n_initial': 'unequal',
'max_nfes': 'unequal'
}
@classmethod
def get_algorithm_information(cls, print_info=True):
return get_algorithm_information(cls, print_info)
[docs]
def __init__(self, problem, n_initial=None, max_nfes=None,
NC=60, NR=60, Phe_a=0.1, Phe_max=1.0, Phe_min=0.01,
P_max=0.9, g_values=None,
de_pop=15, de_merit_evals=2000,
save_data=True, save_path='./Data',
name='SaEF-AKT', disable_tqdm=True):
"""
Initialize SaEF-AKT algorithm.
Parameters
----------
problem : MTOP
Multi-task optimization problem instance
n_initial : int or List[int], optional
Number of initial samples per task (default: 50)
max_nfes : int or List[int], optional
Maximum number of function evaluations per task (default: 100)
NC : int, optional
Number of nearest neighbor points for local GP (default: 60)
NR : int, optional
Number of most recent evaluation points for local GP (default: 60)
Phe_a : float, optional
Pheromone evaporation rate (default: 0.1)
Phe_max : float, optional
Maximum pheromone concentration (default: 1.0)
Phe_min : float, optional
Minimum pheromone concentration (default: 0.01)
P_max : float, optional
Probability of selecting the task with maximum transfer probability (default: 0.9)
g_values : list of float, optional
Exploration weights for merit functions (default: [0, 1, 2, 4])
de_pop : int, optional
DE population size for merit function optimization (default: 15)
de_merit_evals : int, optional
Max DE evaluations for merit function optimization (default: 2000)
save_data : bool, optional
Whether to save optimization data (default: True)
save_path : str, optional
Path to save results (default: './Data')
name : str, optional
Name for the experiment (default: 'SaEF-AKT')
disable_tqdm : bool, optional
Whether to disable progress bar (default: True)
"""
self.problem = problem
self.n_initial = n_initial if n_initial is not None else 50
self.max_nfes = max_nfes if max_nfes is not None else 100
self.NC = NC
self.NR = NR
self.Phe_a = Phe_a
self.Phe_max = Phe_max
self.Phe_min = Phe_min
self.P_max = P_max
self.g_values = g_values if g_values is not None else [0, 1, 2, 4]
self.de_pop = de_pop
self.de_merit_evals = de_merit_evals
self.save_data = save_data
self.save_path = save_path
self.name = name
self.disable_tqdm = disable_tqdm
[docs]
def optimize(self):
"""
Execute the SaEF-AKT algorithm.
The main loop follows Algorithm 3 in the paper:
Phase 1: GPOP search + real evaluation for each task (1 FE per task)
Phase 2: Similarity measurement via KL divergence on updated databases
Phase 3: Knowledge transfer + pheromone update for each task (1 FE per task)
Returns
-------
Results
Optimization results containing decision variables, objectives, and runtime
"""
start_time = time.time()
problem = self.problem
nt = problem.n_tasks
dims = problem.dims
n_initial_per_task = par_list(self.n_initial, nt)
max_nfes_per_task = par_list(self.max_nfes, nt)
# Initialize samples using Latin Hypercube Sampling
decs = initialization(problem, self.n_initial, method='lhs')
objs, _ = evaluation(problem, decs)
nfes_per_task = [n_initial_per_task[i] for i in range(nt)]
# Initialize pheromone matrix: tau[r, s] = pheromone of s assisting r
tau = np.full((nt, nt), (self.Phe_max + self.Phe_min) / 2)
np.fill_diagonal(tau, 0.0)
total_nfes = sum(max_nfes_per_task)
pbar = tqdm(total=total_nfes, initial=sum(nfes_per_task),
desc=f"{self.name}", disable=self.disable_tqdm)
while sum(nfes_per_task) < total_nfes:
active_tasks = [i for i in range(nt) if nfes_per_task[i] < max_nfes_per_task[i]]
if not active_tasks:
break
# ===== Phase 1: Surrogate-Assisted EA (GPOP) =====
# For each task: GP search + evaluate best candidate with real function
for i in active_tasks:
if nfes_per_task[i] >= max_nfes_per_task[i]:
continue
candidate = self._gpop_search(decs[i], objs[i], dims[i])
if candidate is not None:
candidate = candidate.reshape(1, -1)
obj_new, _ = evaluation_single(problem, candidate, i)
decs[i] = np.vstack([decs[i], candidate])
objs[i] = np.vstack([objs[i], obj_new])
nfes_per_task[i] += 1
pbar.update(1)
# ===== Phase 2: Similarity Measurement via KL Divergence =====
# Uses updated databases (including Phase 1 evaluations)
train_data = {}
for i in active_tasks:
train_data[i] = self._select_training_data(decs[i], objs[i])
eta = self._compute_similarity(train_data, dims, active_tasks, nt)
# ===== Phase 3: Knowledge Transfer + Pheromone Update =====
for r in active_tasks:
if nfes_per_task[r] >= max_nfes_per_task[r]:
continue
# Select auxiliary task
at = self._select_auxiliary_task(r, tau, eta, nt, active_tasks)
if at is not None:
# Transfer the optimal solution (x_best) of auxiliary task AT(r)
best_idx_at = np.argmin(objs[at].flatten())
transfer_dec = decs[at][best_idx_at].copy()
# Dimension alignment
if len(transfer_dec) < dims[r]:
transfer_dec = np.pad(transfer_dec, (0, dims[r] - len(transfer_dec)),
mode='constant', constant_values=0.5)
elif len(transfer_dec) > dims[r]:
transfer_dec = transfer_dec[:dims[r]]
transfer_dec = np.clip(transfer_dec, 0, 1).reshape(1, -1)
# Evaluate transferred solution with real fitness of task r
obj_transfer, _ = evaluation_single(problem, transfer_dec, r)
nfes_per_task[r] += 1
pbar.update(1)
# Pheromone evaporation
tau[r, at] = (1 - self.Phe_a) * tau[r, at]
# Success reinforcement: if transfer improves best-so-far
if obj_transfer.flatten()[0] < np.min(objs[r]):
tau[r, at] = self.Phe_max
# Clamp pheromone
tau[r, at] = np.clip(tau[r, at], self.Phe_min, self.Phe_max)
# Add evaluated solution to task r's dataset
decs[r] = np.vstack([decs[r], transfer_dec])
objs[r] = np.vstack([objs[r], obj_transfer])
pbar.close()
runtime = time.time() - start_time
# Build staircase history for analysis
all_decs, all_objs = build_staircase_history(decs, objs, k=1)
results = build_save_results(
all_decs=all_decs, all_objs=all_objs, runtime=runtime,
max_nfes=nfes_per_task,
bounds=problem.bounds, save_path=self.save_path,
filename=self.name, save_data=self.save_data)
return results
def _select_training_data(self, decs_t, objs_t):
"""
Select local training data: NC nearest to x_best + NR most recent.
Parameters
----------
decs_t : np.ndarray
All decision variables for this task, shape (n, dim)
objs_t : np.ndarray
All objective values for this task, shape (n, 1)
Returns
-------
np.ndarray
Selected training decisions, shape (n_train, dim)
"""
n = len(decs_t)
best_idx = np.argmin(objs_t.flatten())
x_best = decs_t[best_idx]
# NC nearest neighbors
nc = min(self.NC, n)
distances = cdist(x_best.reshape(1, -1), decs_t)[0]
nearest_idx = np.argsort(distances)[:nc]
# NR most recent
nr = min(self.NR, n)
recent_idx = np.arange(max(0, n - nr), n)
# Union of both sets
selected_idx = np.unique(np.concatenate([nearest_idx, recent_idx]))
return decs_t[selected_idx]
def _gpop_search(self, decs_t, objs_t, dim_t):
"""
GPOP: GP-based local search with multiple merit functions.
Parameters
----------
decs_t : np.ndarray
All decision variables for this task, shape (n, dim)
objs_t : np.ndarray
All objective values for this task, shape (n, 1)
dim_t : int
Dimension of this task
Returns
-------
np.ndarray or None
Best candidate found, shape (dim_t,), or None if GP fails
"""
n = len(decs_t)
best_idx = np.argmin(objs_t.flatten())
x_best = decs_t[best_idx]
# Select local training data
nc = min(self.NC, n)
distances = cdist(x_best.reshape(1, -1), decs_t)[0]
nearest_idx = np.argsort(distances)[:nc]
nr = min(self.NR, n)
recent_idx = np.arange(max(0, n - nr), n)
selected_idx = np.unique(np.concatenate([nearest_idx, recent_idx]))
train_x = decs_t[selected_idx]
train_y = objs_t[selected_idx].flatten()
# Define local search bounds based on NC nearest points range
nc_decs = decs_t[nearest_idx]
d = np.max(nc_decs, axis=0) - np.min(nc_decs, axis=0)
lb = np.clip(x_best - d / 2, 0, 1)
ub = np.clip(x_best + d / 2, 0, 1)
# Ensure minimum range
too_small = (ub - lb) < 1e-6
lb[too_small] = np.clip(x_best[too_small] - 0.05, 0, 1)
ub[too_small] = np.clip(x_best[too_small] + 0.05, 0, 1)
# Build GP model
try:
gp, y_mean, y_std = self._build_gp(train_x, train_y)
except Exception:
return None
# Search with multiple merit functions (g = 0, 1, 2, 4)
# Each g explores different exploration-exploitation trade-off
# Select best candidate using predicted mean (g=0) for fair comparison
best_candidate = None
best_pred_mean = np.inf
for g in self.g_values:
try:
candidate = self._optimize_merit(gp, y_mean, y_std, lb, ub, dim_t, g)
# Compare using predicted mean (g=0) to avoid bias toward high-g
pred_mean = self._evaluate_merit(gp, y_mean, y_std, candidate, 0)
if pred_mean < best_pred_mean:
best_pred_mean = pred_mean
best_candidate = candidate
except Exception:
continue
return best_candidate
def _build_gp(self, train_x, train_y):
"""
Build and fit a local GP model.
Parameters
----------
train_x : np.ndarray
Training decisions, shape (n, dim)
train_y : np.ndarray
Training objectives, shape (n,)
Returns
-------
tuple
(gp_model, y_mean, y_std) for denormalization
"""
train_x_t = torch.tensor(train_x, dtype=torch.double)
train_y_t = torch.tensor(train_y, dtype=torch.double).unsqueeze(-1)
# Standardize Y
y_mean = train_y_t.mean()
y_std = train_y_t.std()
if y_std < 1e-6:
y_std = torch.tensor(1.0, dtype=torch.double)
train_y_norm = (train_y_t - y_mean) / y_std
gp = SingleTaskGP(train_x_t, train_y_norm)
mll = ExactMarginalLogLikelihood(gp.likelihood, gp)
fit_gpytorch_mll(mll)
return gp, y_mean, y_std
def _evaluate_merit(self, gp, y_mean, y_std, x, g):
"""
Evaluate merit function: f_M(x) = mean(x) - g * std(x).
Parameters
----------
gp : SingleTaskGP
Fitted GP model
y_mean, y_std : torch.Tensor
Standardization parameters
x : np.ndarray
Decision vector, shape (dim,)
g : float
Exploration weight
Returns
-------
float
Merit function value
"""
x_t = torch.tensor(x, dtype=torch.double).unsqueeze(0)
with torch.no_grad():
posterior = gp.posterior(x_t)
mean = posterior.mean.item() * y_std.item() + y_mean.item()
std = posterior.variance.sqrt().item() * y_std.item()
return mean - g * std
def _batch_merit(self, gp, y_mean, y_std, X, g):
"""
Batch evaluate merit function for a population.
Parameters
----------
gp : SingleTaskGP
Fitted GP model
y_mean, y_std : torch.Tensor
Standardization parameters
X : np.ndarray
Population, shape (n, dim)
g : float
Exploration weight
Returns
-------
np.ndarray
Merit values, shape (n,)
"""
X_t = torch.tensor(X, dtype=torch.double)
with torch.no_grad():
posterior = gp.posterior(X_t)
means = posterior.mean.squeeze(-1).numpy() * y_std.item() + y_mean.item()
stds = posterior.variance.squeeze(-1).sqrt().numpy() * y_std.item()
return means - g * stds
def _optimize_merit(self, gp, y_mean, y_std, lb, ub, dim, g):
"""
Optimize merit function using DE within local bounds (batch evaluation).
Parameters
----------
gp : SingleTaskGP
Fitted GP model
y_mean, y_std : torch.Tensor
Standardization parameters
lb, ub : np.ndarray
Lower and upper bounds, shape (dim,)
dim : int
Decision space dimension
g : float
Exploration weight
Returns
-------
np.ndarray
Best candidate found, shape (dim,)
"""
pop_size = self.de_pop
max_gen = self.de_merit_evals // pop_size
F, CR = 0.5, 0.9
# Initialize population within local bounds
pop = lb + (ub - lb) * np.random.rand(pop_size, dim)
# Batch evaluate initial population
fit = self._batch_merit(gp, y_mean, y_std, pop, g)
for _ in range(max_gen):
# Generate all trials at once
trials = np.empty_like(pop)
for i in range(pop_size):
idxs = list(range(pop_size))
idxs.remove(i)
a, b, c = np.random.choice(idxs, 3, replace=False)
mutant = pop[a] + F * (pop[b] - pop[c])
mutant = np.clip(mutant, lb, ub)
j_rand = np.random.randint(dim)
mask = np.random.rand(dim) < CR
mask[j_rand] = True
trials[i] = np.where(mask, mutant, pop[i])
# Batch evaluate all trials
trial_fit = self._batch_merit(gp, y_mean, y_std, trials, g)
# Selection
improved = trial_fit < fit
pop[improved] = trials[improved]
fit[improved] = trial_fit[improved]
return pop[np.argmin(fit)]
def _compute_similarity(self, train_data, dims, active_tasks, nt):
"""
Compute task similarity matrix based on KL divergence.
Parameters
----------
train_data : dict
Selected training data per task {task_idx: np.ndarray}
dims : list of int
Dimensions per task
active_tasks : list of int
Active task indices
nt : int
Total number of tasks
Returns
-------
np.ndarray
Normalized similarity matrix, shape (nt, nt)
"""
eta = np.zeros((nt, nt))
for r in active_tasks:
for s in active_tasks:
if r == s:
continue
# Align dimensions to min(dim_r, dim_s)
d_min = min(dims[r], dims[s])
data_r = train_data[r][:, :d_min]
data_s = train_data[s][:, :d_min]
kld = self._kl_divergence(data_r, data_s)
# Similarity = 1 / KLD (avoid division by zero)
if kld > 1e-10:
eta[r, s] = 1.0 / kld
else:
eta[r, s] = 1e10
# Softmax normalization per row
for r in active_tasks:
row = eta[r, :]
nonzero = row > 0
if np.any(nonzero):
row_vals = row[nonzero]
# Clip for numerical stability
row_vals = np.clip(row_vals, -500, 500)
exp_vals = np.exp(row_vals - np.max(row_vals))
row[nonzero] = exp_vals / np.sum(exp_vals)
eta[r, :] = row
return eta
@staticmethod
def _kl_divergence(data_0, data_1):
"""
Compute KL divergence between two multivariate normal distributions
fitted to the data.
Parameters
----------
data_0 : np.ndarray
Data from distribution 0, shape (n0, d)
data_1 : np.ndarray
Data from distribution 1, shape (n1, d)
Returns
-------
float
KL(N_0 || N_1)
"""
k = data_0.shape[1]
m0 = np.mean(data_0, axis=0)
m1 = np.mean(data_1, axis=0)
# Empirical covariance with regularization
C0 = np.cov(data_0, rowvar=False) + 1e-6 * np.eye(k)
C1 = np.cov(data_1, rowvar=False) + 1e-6 * np.eye(k)
try:
C1_inv = np.linalg.inv(C1)
sign0, logdet0 = np.linalg.slogdet(C0)
sign1, logdet1 = np.linalg.slogdet(C1)
if sign0 <= 0 or sign1 <= 0:
return 1e10
diff = m1 - m0
kld = 0.5 * (np.trace(C1_inv @ C0) + diff @ C1_inv @ diff - k + logdet1 - logdet0)
return max(kld, 0.0)
except np.linalg.LinAlgError:
return 1e10
def _select_auxiliary_task(self, r, tau, eta, nt, active_tasks):
"""
Select auxiliary task for knowledge transfer using pheromone and similarity.
Parameters
----------
r : int
Target task index
tau : np.ndarray
Pheromone matrix, shape (nt, nt)
eta : np.ndarray
Similarity matrix, shape (nt, nt)
nt : int
Number of tasks
active_tasks : list of int
Active task indices
Returns
-------
int or None
Selected auxiliary task index, or None if no valid task
"""
candidates = [s for s in active_tasks if s != r]
if not candidates:
return None
# Compute transfer probabilities: P_kt(r, s) = tau(r,s) * eta(r,s) / sum
probs = np.array([tau[r, s] * eta[r, s] for s in candidates])
total = np.sum(probs)
if total <= 0:
return candidates[np.random.randint(len(candidates))]
probs = probs / total
# Selection rule
if np.random.rand() < self.P_max:
# Greedy: select task with maximum probability
return candidates[np.argmax(probs)]
else:
# Roulette wheel selection
cumsum = np.cumsum(probs)
r_val = np.random.rand()
for j, c in enumerate(cumsum):
if r_val <= c:
return candidates[j]
return candidates[-1]