Source code for ddmtolab.Algorithms.STSO.AutoSAEA

"""
Surrogate-Assisted EA with Model and Infill Criterion Auto-Configuration (AutoSAEA)

This module implements AutoSAEA for expensive single-objective optimization problems.

References
----------
    [1] Xie, L., Li, G., Wang, Z., Cui, L., & Gong, M. (2023). Surrogate-assisted evolutionary algorithm with model and infill criterion auto-configuration. IEEE Transactions on Evolutionary Computation.

Notes
-----
Author: Jiangtao Shen
Email: j.shen5@exeter.ac.uk
Date: 2026.02.19
Version: 1.0
"""
import time
import numpy as np
from tqdm import tqdm
from scipy.interpolate import RBFInterpolator
from scipy.stats import norm
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import RBF as RBF_kernel, ConstantKernel as C, WhiteKernel
from sklearn.neighbors import KNeighborsClassifier
from sklearn.linear_model import LinearRegression
from ddmtolab.Methods.Algo_Methods.algo_utils import *
import warnings

warnings.filterwarnings("ignore")


[docs] class AutoSAEA: """ Surrogate-Assisted EA with Model and Infill Criterion Auto-Configuration. Uses a Two-Level UCB multi-armed bandit to adaptively select from 8 model-criterion combinations: - {RBF, prescreening}, {RBF, local search} - {GP, LCB}, {GP, EI} - {PRS, prescreening}, {PRS, local search} - {KNN, exploitation}, {KNN, exploration} """ algorithm_information = { 'n_tasks': '[1, K]', 'dims': 'unequal', 'objs': 'equal', 'n_objs': '1', 'cons': 'equal', 'n_cons': '0', 'expensive': 'True', 'knowledge_transfer': 'False', 'n_initial': 'unequal', 'max_nfes': 'unequal' } # Arm indices ARM_RBF_PRE = 0 ARM_GP_LCB = 1 ARM_RBF_LS = 2 ARM_GP_EI = 3 ARM_PRS_PRE = 4 ARM_PRS_LS = 5 ARM_KNN_EOI = 6 ARM_KNN_EOR = 7 N_ARMS = 8 # High-level model mapping ARM_TO_MODEL = {0: 0, 1: 1, 2: 0, 3: 1, 4: 2, 5: 2, 6: 3, 7: 3} MODEL_TO_ARMS = {0: [0, 2], 1: [1, 3], 2: [4, 5], 3: [6, 7]} @classmethod def get_algorithm_information(cls, print_info=True): return get_algorithm_information(cls, print_info)
[docs] def __init__(self, problem, n_initial=None, max_nfes=None, save_data=True, save_path='./Data', name='AutoSAEA', disable_tqdm=True): self.problem = problem self.n_initial = n_initial if n_initial is not None else 50 self.max_nfes = max_nfes if max_nfes is not None else 300 self.save_data = save_data self.save_path = save_path self.name = name self.disable_tqdm = disable_tqdm
[docs] def optimize(self): start_time = time.time() problem = self.problem nt = problem.n_tasks dims = problem.dims n_initial_per_task = par_list(self.n_initial, nt) max_nfes_per_task = par_list(self.max_nfes, nt) # Generate initial samples decs = initialization(problem, self.n_initial, method='lhs') objs, _ = evaluation(problem, decs) nfes_per_task = n_initial_per_task.copy() current_decs = [decs[i].copy() for i in range(nt)] current_objs = [objs[i].copy() for i in range(nt)] # Initialize bandit state per task bandit_states = [] for _ in range(nt): bandit_states.append({ 'model_rewards': {m: [] for m in range(4)}, 'arm_rewards': {a: [] for a in range(self.N_ARMS)} }) popsize = 50 pbar = tqdm(total=sum(max_nfes_per_task), initial=sum(n_initial_per_task), desc=f"{self.name}", disable=self.disable_tqdm) while sum(nfes_per_task) < sum(max_nfes_per_task): active_tasks = [i for i in range(nt) if nfes_per_task[i] < max_nfes_per_task[i]] if not active_tasks: break for i in active_tasks: dim = dims[i] X = current_decs[i] Y = current_objs[i] Y_flat = Y.flatten() # Iteration id (1-indexed) id_val = nfes_per_task[i] - n_initial_per_task[i] + 1 # Elite initialization (top popsize by fitness) n_elite = min(len(Y_flat), popsize) elite_idx = np.argsort(Y_flat)[:n_elite] pop = X[elite_idx].copy() pop_objs = Y_flat[elite_idx].copy() # Generate shared DE/best/1 offspring offspring = self._de_best1(pop, pop_objs) # Select arm bs = bandit_states[i] if id_val <= self.N_ARMS: arm_id = id_val - 1 # Round-robin else: arm_id = self._select_arm(id_val, bs) # Execute arm candidate = self._execute_arm(arm_id, pop, pop_objs, offspring, Y_flat, dim) # Handle failure (GP arms may fail) arm_failed = candidate is None if arm_failed: candidate = np.random.rand(1, dim) # Check duplicate BEFORE perturbation (MATLAB: reward=0 for near-duplicates) is_duplicate = False if not arm_failed: dup_dist = cdist(candidate, X, metric='chebyshev').min() if dup_dist < 5e-3: is_duplicate = True # Ensure uniqueness (perturb for evaluation) candidate = self._ensure_uniqueness(candidate, X, dim) # Evaluate on real function obj, _ = evaluation_single(problem, candidate, i) # Compute reward (duplicate or failed arms get reward=0) if arm_failed or is_duplicate: reward = 0.0 else: reward = self._low_level_r(pop_objs, obj.flatten()[0]) # Update bandit model_id = self.ARM_TO_MODEL[arm_id] bs['model_rewards'][model_id].append(reward) bs['arm_rewards'][arm_id].append(reward) # Update dataset current_decs[i] = np.vstack([current_decs[i], candidate]) current_objs[i] = np.vstack([current_objs[i], obj]) nfes_per_task[i] += 1 pbar.update(1) pbar.close() runtime = time.time() - start_time # Convert database to staircase history structure for results db_decs = [current_decs[i].copy() for i in range(nt)] db_objs = [current_objs[i].copy() for i in range(nt)] all_decs, all_objs = build_staircase_history(db_decs, db_objs, k=1) results = build_save_results( all_decs=all_decs, all_objs=all_objs, runtime=runtime, max_nfes=nfes_per_task, bounds=problem.bounds, save_path=self.save_path, filename=self.name, save_data=self.save_data ) return results
# ==================== Bandit ==================== def _tl_ucb(self, rewards, id_val, q_value, alpha=2.5): """Two-Level UCB value computation.""" return q_value + np.sqrt(alpha * np.log(id_val) / len(rewards)) def _select_arm(self, id_val, bandit_state): """Select arm via Two-Level UCB at both high and low levels.""" mr = bandit_state['model_rewards'] ar = bandit_state['arm_rewards'] # High-level: select model u_model = np.zeros(4) for m in range(4): u_model[m] = self._tl_ucb(mr[m], id_val, np.mean(mr[m])) max_val = np.max(u_model) best_models = np.where(np.abs(u_model - max_val) < 1e-12)[0] selected_model = np.random.choice(best_models) # Low-level: select arm within model arms = self.MODEL_TO_ARMS[selected_model] u_arms = np.zeros(2) for j, arm_id in enumerate(arms): u_arms[j] = self._tl_ucb(ar[arm_id], id_val, np.mean(ar[arm_id])) max_val = np.max(u_arms) best_idx = np.where(np.abs(u_arms - max_val) < 1e-12)[0] selected_local = np.random.choice(best_idx) return arms[selected_local] def _low_level_r(self, Y_elite, y_new): """Rank-based reward: [0, 1], higher is better.""" N = len(Y_elite) Y_all = np.concatenate([Y_elite.flatten(), [y_new]]) sorted_idx = np.argsort(Y_all) position = np.where(sorted_idx == N)[0][0] + 1 return -position / N + (N + 1) / N # ==================== Arm Dispatch ==================== def _execute_arm(self, arm_id, pop, pop_objs, offspring, Y_full, dim): """Execute the selected arm and return candidate solution (or None on failure).""" if arm_id == self.ARM_RBF_PRE: return self._rbf_pre_arm(pop, pop_objs, offspring) elif arm_id == self.ARM_GP_LCB: return self._gp_lcb_arm(pop, pop_objs, offspring) elif arm_id == self.ARM_RBF_LS: return self._rbf_ls_arm(pop, pop_objs, dim) elif arm_id == self.ARM_GP_EI: return self._gp_ei_arm(pop, pop_objs, offspring, Y_full) elif arm_id == self.ARM_PRS_PRE: return self._prs_pre_arm(pop, pop_objs, offspring) elif arm_id == self.ARM_PRS_LS: return self._prs_ls_arm(pop, pop_objs, dim) elif arm_id == self.ARM_KNN_EOI: return self._knn_eoi_arm(pop, pop_objs, offspring) elif arm_id == self.ARM_KNN_EOR: return self._knn_eor_arm(pop, pop_objs, offspring) return None # ==================== Arm Implementations ==================== def _rbf_pre_arm(self, pop, pop_objs, offspring): """Arm 0: {RBF, prescreening} - evaluate offspring on RBF, return best.""" try: model = self._build_rbf_model(pop, pop_objs) pred = model(offspring).flatten() best_idx = np.argmin(pred) return offspring[best_idx:best_idx + 1] except Exception: return None def _gp_lcb_arm(self, pop, pop_objs, offspring): """Arm 1: {GP, LCB} - evaluate offspring with LCB (w=2), return best.""" try: mean, std = self._build_gpr_predict(pop, pop_objs, offspring) lcb = mean - 2.0 * std best_idx = np.argmin(lcb) return offspring[best_idx:best_idx + 1] except Exception: return None def _rbf_ls_arm(self, pop, pop_objs, dim): """Arm 2: {RBF, local search} - EA optimization on RBF within local bounds.""" try: model = self._build_rbf_model(pop, pop_objs) lb_local, ub_local = self._get_local_bounds(pop) return self._local_search_ea(model, dim, lb_local, ub_local) except Exception: return None def _gp_ei_arm(self, pop, pop_objs, offspring, Y_full): """Arm 3: {GP, EI} - Expected Improvement on GP, return best.""" try: mean, std = self._build_gpr_predict(pop, pop_objs, offspring) y_best = np.min(Y_full) with np.errstate(divide='ignore', invalid='ignore'): z = (y_best - mean) / (std + 1e-10) ei = (y_best - mean) * norm.cdf(z) + std * norm.pdf(z) ei[std < 1e-10] = 0.0 best_idx = np.argmax(ei) return offspring[best_idx:best_idx + 1] except Exception: return None def _prs_pre_arm(self, pop, pop_objs, offspring): """Arm 4: {PRS, prescreening} - evaluate offspring on PRS, return best.""" try: model = self._build_prs_model(pop, pop_objs) pred = model(offspring).flatten() best_idx = np.argmin(pred) return offspring[best_idx:best_idx + 1] except Exception: return None def _prs_ls_arm(self, pop, pop_objs, dim): """Arm 5: {PRS, local search} - EA optimization on PRS within local bounds.""" try: model = self._build_prs_model(pop, pop_objs) lb_local, ub_local = self._get_local_bounds(pop) return self._local_search_ea(model, dim, lb_local, ub_local) except Exception: return None def _knn_eoi_arm(self, pop, pop_objs, offspring): """Arm 6: {KNN, L1-exploitation} - minimize max-distance to level-1 parents.""" try: return self._knn_arm_core(pop, pop_objs, offspring, exploitation=True) except Exception: return None def _knn_eor_arm(self, pop, pop_objs, offspring): """Arm 7: {KNN, L1-exploration} - maximize min-distance to level-1 parents.""" try: return self._knn_arm_core(pop, pop_objs, offspring, exploitation=False) except Exception: return None def _knn_arm_core(self, pop, pop_objs, offspring, exploitation=True): """Core KNN arm with 5-level stratification.""" n = len(pop_objs) level = 5 # Sort by objective (pop is already sorted from elite init, but be explicit) sorted_idx = np.argsort(pop_objs) X_sorted = pop[sorted_idx] # Assign labels: 1=best, 5=worst labels = np.ceil(np.arange(1, n + 1) * level / n).astype(int) parents_level1 = X_sorted[labels == 1] # Train 1-NN classifier knn = KNeighborsClassifier(n_neighbors=1, metric='minkowski', p=2) knn.fit(X_sorted, labels) # Predict labels for offspring pred_labels = knn.predict(offspring) # Select offspring with best predicted label min_label = np.min(pred_labels) selected = np.where(pred_labels == min_label)[0] # Compute distances to level-1 parents dist = cdist(offspring[selected], parents_level1) if exploitation: dist_metric = np.max(dist, axis=1) best_local = np.argmin(dist_metric) else: dist_metric = np.min(dist, axis=1) best_local = np.argmax(dist_metric) idx = selected[best_local] return offspring[idx:idx + 1] # ==================== Surrogate Models ==================== def _build_rbf_model(self, X, Y): """Build Gaussian RBF surrogate model (matching MATLAB newrbe).""" Y_flat = Y.flatten() n_samples, dim = X.shape if n_samples > 1: dist_matrix = cdist(X, X, metric='euclidean') max_dist = dist_matrix.max() spread = max_dist / (dim * n_samples) ** (1.0 / dim) else: spread = 1.0 try: rbf = RBFInterpolator(X, Y_flat, kernel='gaussian', epsilon=1.0 / spread) except Exception: rbf = RBFInterpolator(X, Y_flat, kernel='thin_plate_spline') def model(x): if x.ndim == 1: x = x.reshape(1, -1) return rbf(x).reshape(-1, 1) return model def _build_gpr_predict(self, X, Y, X_pred): """Build GP model and return (mean, std) predictions at X_pred.""" Y_flat = Y.flatten() kernel = C(1.0, (1e-3, 1e3)) * RBF_kernel(1.0, (1e-2, 1e2)) + WhiteKernel(1e-5, (1e-10, 1e-1)) gpr = GaussianProcessRegressor( kernel=kernel, alpha=1e-6, normalize_y=True, n_restarts_optimizer=5, random_state=42 ) gpr.fit(X, Y_flat) mean, std = gpr.predict(X_pred, return_std=True) return mean.flatten(), std.flatten() def _build_prs_model(self, X, Y): """Build Pure Quadratic Regression Surrogate (no interaction terms).""" Y_flat = Y.flatten() X_aug = np.hstack([X, X ** 2]) reg = LinearRegression() reg.fit(X_aug, Y_flat) def model(x): if x.ndim == 1: x = x.reshape(1, -1) x_aug = np.hstack([x, x ** 2]) return reg.predict(x_aug).reshape(-1, 1) return model # ==================== EA Local Search ==================== def _local_search_ea(self, surrogate_func, dim, lb, ub, popsize=50, n_gen=100): """EA-based local search on surrogate within local bounds.""" popsize = popsize if popsize % 2 == 0 else popsize + 1 # Random initialization within local bounds pop = lb + (ub - lb) * np.random.rand(popsize, dim) pop_objs = surrogate_func(pop).flatten() for _ in range(n_gen): offspring = self._ea_offspring(pop, lb, ub) off_objs = surrogate_func(offspring).flatten() pop, pop_objs = self._roulette_wheel_selection( pop, pop_objs, offspring, off_objs, popsize) best_idx = np.argmin(pop_objs) return pop[best_idx:best_idx + 1] def _ea_offspring(self, parents, lb, ub, muc=15, mum=15, probswap=0.5): """EA offspring: SBX crossover + polynomial mutation + variable swap.""" popsize, dim = parents.shape range_vec = ub - lb range_vec = np.where(range_vec < 1e-10, 1e-10, range_vec) # Normalize to [0,1] within local bounds pop_norm = (parents - lb) / range_vec offspring = np.zeros((popsize, dim)) ind_order = np.random.permutation(popsize) for i in range(popsize // 2): p1 = ind_order[i] p2 = ind_order[i + popsize // 2] # SBX crossover u = np.random.rand(dim) cf = np.where(u <= 0.5, (2 * u) ** (1 / (muc + 1)), (2 * (1 - u)) ** (-1 / (muc + 1))) child1 = np.clip(0.5 * ((1 + cf) * pop_norm[p1] + (1 - cf) * pop_norm[p2]), 0, 1) child2 = np.clip(0.5 * ((1 + cf) * pop_norm[p2] + (1 - cf) * pop_norm[p1]), 0, 1) # Polynomial mutation for child in [child1, child2]: for j in range(dim): if np.random.rand() < 1 / dim: u_val = np.random.rand() if u_val <= 0.5: delta = (2 * u_val) ** (1 / (1 + mum)) - 1 child[j] += delta * child[j] else: delta = 1 - (2 * (1 - u_val)) ** (1 / (1 + mum)) child[j] += delta * (1 - child[j]) child[:] = np.clip(child, 0, 1) # Variable swap swap = np.random.rand(dim) >= probswap temp = child2[swap].copy() child2[swap] = child1[swap] child1[swap] = temp # Denormalize back to local bounds offspring[i] = lb + child1 * range_vec offspring[i + popsize // 2] = lb + child2 * range_vec return offspring def _roulette_wheel_selection(self, pop, pop_objs, offspring, off_objs, popsize): """Roulette wheel selection (inverse fitness, matching MATLAB).""" total_pop = np.vstack([pop, offspring]) total_objs = np.concatenate([pop_objs, off_objs]) shift = min(np.min(total_objs), 0) fit = 1.0 / (total_objs - shift + 1e-6) cum_fit = np.cumsum(fit) cum_fit /= cum_fit[-1] idx = np.searchsorted(cum_fit, np.random.rand(popsize)) idx = np.clip(idx, 0, len(total_objs) - 1) return total_pop[idx].copy(), total_objs[idx].copy() # ==================== DE Offspring ==================== def _de_best1(self, parents, objs, F=0.5, CR=0.8): """DE/best/1/bin offspring generation in [0,1] space.""" popsize, dim = parents.shape best_idx = np.argmin(objs) best = parents[best_idx] offspring = parents.copy() for i in range(popsize): idxs = np.delete(np.arange(popsize), i) r1, r2 = np.random.choice(idxs, 2, replace=False) mutant = best + F * (parents[r1] - parents[r2]) j_rand = np.random.randint(dim) mask = (np.random.rand(dim) <= CR) | (np.arange(dim) == j_rand) offspring[i, mask] = mutant[mask] return np.clip(offspring, 0, 1) # ==================== Utilities ==================== def _get_local_bounds(self, pop): """Compute local bounds from population with degenerate dimension handling.""" lb_local = np.min(pop, axis=0) ub_local = np.max(pop, axis=0) mask = (ub_local - lb_local) < 1e-10 lb_local[mask] = np.maximum(0.0, lb_local[mask] - 0.05) ub_local[mask] = np.minimum(1.0, ub_local[mask] + 0.05) return lb_local, ub_local def _ensure_uniqueness(self, candidate, X, dim, epsilon=5e-3, max_trials=50): """Ensure candidate is not too close to existing samples.""" scales = np.linspace(0.1, 1.0, max_trials) for t in range(max_trials): dist = cdist(candidate, X, metric='chebyshev').min() if dist >= epsilon: break perturbation = scales[t] * (np.random.rand(1, dim) - 0.5) candidate = np.clip(candidate + perturbation, 0.0, 1.0) return candidate