Source code for ddmtolab.Algorithms.STMO.SAEA_DBLL

"""
Surrogate-Assisted Evolutionary Algorithm with Direction-Based Local Learning (SAEA-DBLL)

This module implements SAEA-DBLL for computationally expensive multi/many-objective optimization.
It uses RBF surrogate models with a direction-based local learning strategy, where sub-reference
vectors define neighborhoods for competitive swarm optimization, combined with adaptive vector
selection and APD-based environmental selection.

References
----------
    [1] J. Shen, P. Wang, H. Dong, W. Wang, and J. Li. Surrogate-assisted evolutionary algorithm with decomposition-based local learning for high-dimensional multi-objective optimization. Expert Systems with Applications, 2024, 240: 122575.

Notes
-----
Author: Jiangtao Shen
Date: 2026.02.18
Version: 1.0
"""
from tqdm import tqdm
import time
import numpy as np
from scipy.spatial.distance import cdist
from ddmtolab.Methods.Algo_Methods.algo_utils import *
from ddmtolab.Methods.Algo_Methods.uniform_point import uniform_point
import warnings

warnings.filterwarnings("ignore")


[docs] class SAEA_DBLL: """ Surrogate-Assisted Evolutionary Algorithm with Direction-Based Local Learning for expensive multi/many-objective optimization. Uses RBF surrogates with neighborhood-aware competitive swarm optimization and adaptive sub-reference vector management. Attributes ---------- algorithm_information : dict Dictionary containing algorithm capabilities and requirements """ algorithm_information = { 'n_tasks': '[1, K]', 'dims': 'unequal', 'objs': 'unequal', 'n_objs': '[2, M]', 'cons': 'equal', 'n_cons': '0', 'expensive': 'True', 'knowledge_transfer': 'False', 'n_initial': 'unequal', 'max_nfes': 'unequal' } @classmethod def get_algorithm_information(cls, print_info=True): return get_algorithm_information(cls, print_info)
[docs] def __init__(self, problem, n_initial=None, max_nfes=None, n=50, alpha=2.0, wmax=20, mu=5, T=3, K=2, save_data=True, save_path='./Data', name='SAEA-DBLL', disable_tqdm=True): """ Initialize SAEA-DBLL algorithm. Parameters ---------- problem : MTOP Multi-task optimization problem instance n_initial : int or List[int], optional Number of initial samples per task (default: dim+50) max_nfes : int or List[int], optional Maximum number of function evaluations per task (default: 200) n : int or List[int], optional Number of reference vectors per task (default: 50) alpha : float, optional Exponent for theta progression (default: 2.0) wmax : int, optional Number of inner surrogate evolution generations (default: 20) mu : int, optional Number of re-evaluated solutions per iteration (default: 5) T : int, optional Neighborhood size for sub-vectors (default: 3) K : int, optional Division factor for sub-vector count (default: 2) save_data : bool, optional Whether to save optimization data (default: True) save_path : str, optional Path to save results (default: './Data') name : str, optional Name for the experiment (default: 'SAEA-DBLL') disable_tqdm : bool, optional Whether to disable progress bar (default: True) """ self.problem = problem self.n_initial = n_initial self.max_nfes = max_nfes if max_nfes is not None else 200 self.n = n self.alpha = alpha self.wmax = wmax self.mu = mu self.T = T self.K = K self.save_data = save_data self.save_path = save_path self.name = name self.disable_tqdm = disable_tqdm
[docs] def optimize(self): """ Execute the SAEA-DBLL algorithm. Returns ------- Results Optimization results containing decision variables, objectives, and runtime """ start_time = time.time() problem = self.problem nt = problem.n_tasks dims = problem.dims n_objs = problem.n_objs # Set default initial samples: dim + 50 if self.n_initial is None: n_initial_per_task = [dims[i] + 50 for i in range(nt)] else: n_initial_per_task = par_list(self.n_initial, nt) max_nfes_per_task = par_list(self.max_nfes, nt) n_per_task = par_list(self.n, nt) # Generate uniform reference vectors V0_list = [] for i in range(nt): v_i, actual_n = uniform_point(n_per_task[i], n_objs[i]) V0_list.append(v_i) n_per_task[i] = actual_n # Initialize with LHS decs = initialization(problem, n_initial_per_task, method='lhs') objs, _ = evaluation(problem, decs) nfes_per_task = n_initial_per_task.copy() # A1: archive of all evaluated solutions arc_decs = [d.copy() for d in decs] arc_objs = [o.copy() for o in objs] # Initialize adapted reference vectors and sub-vectors V_list = [v.copy() for v in V0_list] Ve_list = [] for i in range(nt): NV = n_per_task[i] n_sub = max(1, int(np.ceil(NV / self.K))) perm = np.random.permutation(NV)[:n_sub] Ve_list.append(V_list[i][perm].copy()) pbar = tqdm(total=sum(max_nfes_per_task), initial=sum(n_initial_per_task), desc=f"{self.name}", disable=self.disable_tqdm) while sum(nfes_per_task) < sum(max_nfes_per_task): active_tasks = [i for i in range(nt) if nfes_per_task[i] < max_nfes_per_task[i]] if not active_tasks: break for i in active_tasks: M = n_objs[i] dim = dims[i] NV = n_per_task[i] V0 = V0_list[i] V = V_list[i] Ve = Ve_list[i] # Compute theta (progress parameter) theta = (nfes_per_task[i] / max_nfes_per_task[i]) ** self.alpha # Compute neighborhood matrix B for sub-vectors Ne = min(Ve.shape[0], self.T) if Ve.shape[0] > 1: B_dist = cdist(Ve, Ve, metric='euclidean') B = np.argsort(B_dist, axis=1)[:, :Ne] else: B = np.zeros((1, 1), dtype=int) A1Dec = arc_decs[i].copy() A1Obj = arc_objs[i].copy() # Build RBF models for each objective RModels = [] mS_per_obj = [] for j in range(M): mS_j, mY_j = dsmerge(A1Dec, A1Obj[:, j]) rmodel = rbf_build(mS_j, mY_j) RModels.append(rmodel) mS_per_obj.append(mS_j) # Inner loop: surrogate-based evolution PopDec = A1Dec.copy() PopObj = A1Obj.copy() PopVel = np.zeros_like(PopDec) Ns = [] for w in range(self.wmax): # Reproduction: direction-based local learning OffDec, OffVel = _reproduction_operator( PopObj, PopDec, PopVel, B, Ve, theta, M ) PopDec = np.vstack([PopDec, OffDec]) PopVel = np.vstack([PopVel, OffVel]) # Predict objectives using RBF models N_pop = PopDec.shape[0] PopObj = np.zeros((N_pop, M)) for j in range(M): PopObj[:, j] = rbf_predict(RModels[j], mS_per_obj[j], PopDec) # Environmental selection (RVEA-style APD) index = _environmental_selection(PopObj, V, theta) Ns.append(len(index)) PopDec = PopDec[index] PopObj = PopObj[index] PopVel = PopVel[index] # Adapt reference vectors V = V0 * np.maximum(PopObj.max(axis=0) - PopObj.min(axis=0), 1e-10) V_list[i] = V mean_Ns = np.mean(Ns) if len(Ns) > 0 else NV n_sub_new = max(1, int(np.ceil(mean_Ns / self.K))) Ve = _vector_adaption(V0, PopObj, n_sub_new) Ve_list[i] = Ve # Sample selection (infill) PopNew = _sample_selection(PopDec, PopObj, V, self.mu, theta) # Remove duplicates if PopNew is not None and PopNew.shape[0] > 0: PopNew = remove_duplicates(PopNew, decs[i]) if PopNew is not None and PopNew.shape[0] > 0: # Limit to remaining budget remaining = max_nfes_per_task[i] - nfes_per_task[i] if PopNew.shape[0] > remaining: PopNew = PopNew[:remaining] # Re-evaluate with expensive function new_objs, _ = evaluation_single(problem, PopNew, i) # Update archive arc_decs[i], arc_objs[i] = merge_archive( arc_decs[i], arc_objs[i], PopNew, new_objs ) # Update cumulative dataset decs[i] = np.vstack([decs[i], PopNew]) objs[i] = np.vstack([objs[i], new_objs]) nfes_per_task[i] += PopNew.shape[0] pbar.update(PopNew.shape[0]) pbar.close() runtime = time.time() - start_time all_decs, all_objs = build_staircase_history(decs, objs, k=self.mu) results = build_save_results(all_decs=all_decs, all_objs=all_objs, runtime=runtime, max_nfes=nfes_per_task, bounds=problem.bounds, save_path=self.save_path, filename=self.name, save_data=self.save_data) return results
# ============================================================================= # Environmental Selection (RVEA-style APD) # ============================================================================= def _environmental_selection(pop_obj, V, theta): """ Environmental selection using angle-penalized distance (APD). Parameters ---------- pop_obj : np.ndarray Objective values, shape (N, M) V : np.ndarray Reference vectors, shape (NV, M) theta : float Penalty parameter Returns ------- index : np.ndarray Selected solution indices """ N, M = pop_obj.shape NV = V.shape[0] # Translate objectives pop_obj_t = pop_obj - pop_obj.min(axis=0) # Smallest angle between each reference vector and others cosine = 1 - cdist(V, V, metric='cosine') np.fill_diagonal(cosine, 0) gamma = np.min(np.arccos(np.clip(cosine, -1, 1)), axis=1) gamma = np.maximum(gamma, 1e-6) # Associate each solution to nearest reference vector angle = np.arccos(np.clip(1 - cdist(pop_obj_t, V, metric='cosine'), -1, 1)) associate = np.argmin(angle, axis=1) # Select one solution per reference vector Next = np.full(NV, -1, dtype=int) for vi in np.unique(associate): current = np.where(associate == vi)[0] APD = (1 + M * theta * angle[current, vi] / gamma[vi]) * \ np.sqrt(np.sum(pop_obj_t[current] ** 2, axis=1)) best = np.argmin(APD) Next[vi] = current[best] return Next[Next != -1] # ============================================================================= # Reproduction Operator (Direction-Based Local Learning) # ============================================================================= def _reproduction_operator(pop_obj, pop_dec, pop_vel, B, Ve, theta, M): """ Direction-based local learning reproduction operator. Winners are selected via APD environmental selection using sub-vectors Ve. Losers learn from neighborhood winners via CSO-style velocity update, followed by polynomial mutation. Parameters ---------- pop_obj : np.ndarray Population objectives, shape (N, M) pop_dec : np.ndarray Population decisions, shape (N, D) pop_vel : np.ndarray Population velocities, shape (N, D) B : np.ndarray Neighborhood matrix for sub-vectors, shape (n_sub, Ne) Ve : np.ndarray Sub-reference vectors, shape (n_sub, M) theta : float Penalty parameter M : int Number of objectives Returns ------- off_dec : np.ndarray Offspring decisions off_vel : np.ndarray Offspring velocities """ N, D = pop_dec.shape # Environmental selection on sub-vectors to determine winners winner_idx, winner_v_idx = _env_selection_with_vectors(pop_obj, Ve, theta) if len(winner_idx) == 0: # Fallback: return GA offspring off_dec = ga_generation(pop_dec, muc=20.0, mum=20.0) off_vel = np.zeros_like(off_dec) return off_dec, off_vel loser_idx = np.setdiff1d(np.arange(N), winner_idx) NL = len(loser_idx) if NL == 0: # All are winners; return winners with mutation off_dec = pop_dec.copy() off_vel = pop_vel.copy() off_dec = _polynomial_mutation(off_dec) return off_dec, off_vel loser_dec = pop_dec[loser_idx] winner_dec = pop_dec[winner_idx] loser_vel = pop_vel[loser_idx] winner_vel = pop_vel[winner_idx] loser_obj = pop_obj[loser_idx] - pop_obj.min(axis=0) # Associate each loser to nearest sub-vector angle_loser = np.arccos(np.clip(1 - cdist(loser_obj, Ve, metric='cosine'), -1, 1)) loser_associate = np.argmin(angle_loser, axis=1) # For each loser, find a winner from the neighborhood of its associated sub-vector temp_winner_dec = np.zeros((NL, D)) for li in range(NL): ve_idx = loser_associate[li] # Get neighborhood sub-vector indices neighbors = B[ve_idx] # Find which winner_v_idx entries are in the neighborhood match_mask = np.isin(neighbors, winner_v_idx) match_indices = neighbors[match_mask] if len(match_indices) > 0: # Map back to winner indices: find which winner corresponds to each matched Ve index chosen_ve = match_indices[np.random.randint(len(match_indices))] w_pos = np.where(winner_v_idx == chosen_ve)[0] if len(w_pos) > 0: w_idx = w_pos[np.random.randint(len(w_pos))] else: w_idx = np.random.randint(len(winner_idx)) else: w_idx = np.random.randint(len(winner_idx)) temp_winner_dec[li] = winner_dec[w_idx] # CSO velocity and position update r1 = np.random.rand(NL, 1) * np.ones((1, D)) r2 = np.random.rand(NL, 1) * np.ones((1, D)) off_vel_loser = r1 * loser_vel + r2 * (temp_winner_dec - loser_dec) off_dec_loser = loser_dec + off_vel_loser + r1 * (off_vel_loser - loser_vel) # Combine with winners off_dec = np.vstack([off_dec_loser, winner_dec]) off_vel = np.vstack([off_vel_loser, winner_vel]) # Polynomial mutation off_dec = _polynomial_mutation(off_dec) return off_dec, off_vel def _env_selection_with_vectors(pop_obj, V, theta): """ Environmental selection returning both solution indices and associated vector indices. Parameters ---------- pop_obj : np.ndarray Objective values, shape (N, M) V : np.ndarray Reference vectors, shape (NV, M) theta : float Penalty parameter Returns ------- index : np.ndarray Selected solution indices index_v : np.ndarray Associated reference vector indices for each selected solution """ N, M = pop_obj.shape NV = V.shape[0] pop_obj_t = pop_obj - pop_obj.min(axis=0) cosine = 1 - cdist(V, V, metric='cosine') np.fill_diagonal(cosine, 0) gamma = np.min(np.arccos(np.clip(cosine, -1, 1)), axis=1) gamma = np.maximum(gamma, 1e-6) angle = np.arccos(np.clip(1 - cdist(pop_obj_t, V, metric='cosine'), -1, 1)) associate = np.argmin(angle, axis=1) Next = np.full(NV, -1, dtype=int) NextV = np.full(NV, -1, dtype=int) for vi in np.unique(associate): current = np.where(associate == vi)[0] APD = (1 + M * theta * angle[current, vi] / gamma[vi]) * \ np.sqrt(np.sum(pop_obj_t[current] ** 2, axis=1)) best = np.argmin(APD) Next[vi] = current[best] NextV[vi] = vi valid = Next != -1 return Next[valid], NextV[valid] def _polynomial_mutation(off_dec, dis_m=20): """ Polynomial mutation in [0, 1] space. Parameters ---------- off_dec : np.ndarray Decision variables, shape (N, D) dis_m : float Distribution index (default: 20) Returns ------- off_dec : np.ndarray Mutated decision variables """ N, D = off_dec.shape off_dec = np.clip(off_dec, 0, 1) site = np.random.rand(N, D) < 1.0 / D mu = np.random.rand(N, D) temp1 = site & (mu <= 0.5) off_dec[temp1] = off_dec[temp1] + ( (2.0 * mu[temp1] + (1 - 2.0 * mu[temp1]) * (1 - off_dec[temp1]) ** (dis_m + 1)) ** (1.0 / (dis_m + 1)) - 1 ) temp2 = site & (mu > 0.5) off_dec[temp2] = off_dec[temp2] + ( 1 - (2.0 * (1 - mu[temp2]) + 2.0 * (mu[temp2] - 0.5) * (1 - (1 - off_dec[temp2])) ** (dis_m + 1)) ** (1.0 / (dis_m + 1)) ) return np.clip(off_dec, 0, 1) # ============================================================================= # Vector Adaption # ============================================================================= def _vector_adaption(V0, pop_obj, k): """ Adapt sub-reference vectors by clustering active vectors. Parameters ---------- V0 : np.ndarray Original reference vectors, shape (NV, M) pop_obj : np.ndarray Population objectives, shape (N, M) k : int Target number of sub-vectors Returns ------- Ve : np.ndarray Adapted sub-reference vectors """ # Scale V0 by objective range obj_range = pop_obj.max(axis=0) - pop_obj.min(axis=0) obj_range = np.maximum(obj_range, 1e-10) V = V0 * obj_range # Find active vectors _, active = _no_active(pop_obj, V) if len(active) == 0: # Fallback: use random subset perm = np.random.permutation(V.shape[0])[:max(1, k)] return V[perm] Va = V[active] k = max(1, min(k, len(active))) if len(active) <= k: return Va # Cluster active vectors into k groups from sklearn.cluster import KMeans km = KMeans(n_clusters=k, n_init=10, random_state=None) labels = km.fit_predict(Va) centers = km.cluster_centers_ # For each cluster, select the vector closest to the cluster center Vindex = [] for c in range(k): current = np.where(labels == c)[0] if len(current) == 1: Vindex.append(current[0]) else: Vc = Va[current] angle = np.arccos(np.clip(1 - cdist(Vc, centers[c:c + 1], metric='cosine'), -1, 1)) best = np.argmin(angle.flatten()) Vindex.append(current[best]) return Va[Vindex] def _no_active(pop_obj, V): """ Detect inactive reference vectors. Parameters ---------- pop_obj : np.ndarray Objective values, shape (N, M) V : np.ndarray Reference vectors, shape (NV, M) Returns ------- num_inactive : int Number of inactive reference vectors active : np.ndarray Indices of active reference vectors """ pop_obj_t = pop_obj - pop_obj.min(axis=0) angle = np.arccos(np.clip(1 - cdist(pop_obj_t, V, metric='cosine'), -1, 1)) associate = np.argmin(angle, axis=1) active = np.unique(associate) num_inactive = V.shape[0] - len(active) return num_inactive, active # ============================================================================= # Sample Selection (Infill Strategy) # ============================================================================= def _sample_selection(pop_dec, pop_obj, V, mu, theta): """ Select infill points via clustering of active reference vectors and APD-based selection within each cluster. Parameters ---------- pop_dec : np.ndarray Population decisions, shape (N, D) pop_obj : np.ndarray Population objectives, shape (N, M) V : np.ndarray Reference vectors, shape (NV, M) mu : int Number of solutions to select theta : float Penalty parameter Returns ------- pop_new : np.ndarray Selected decision variables for re-evaluation """ M = pop_obj.shape[1] N = pop_obj.shape[0] if N == 0: return np.empty((0, pop_dec.shape[1])) # Find active vectors num_inactive, active_idx = _no_active(pop_obj, V) n_active = len(active_idx) if n_active == 0: # Fallback: return random subset idx = np.random.permutation(N)[:min(mu, N)] return pop_dec[idx] NCluster = min(mu, n_active) Va = V[active_idx] # Translate objectives pop_obj_t = pop_obj - pop_obj.min(axis=0) # Compute gamma for active vectors if Va.shape[0] > 1: cosine = 1 - cdist(Va, Va, metric='cosine') np.fill_diagonal(cosine, 0) gamma = np.min(np.arccos(np.clip(cosine, -1, 1)), axis=1) gamma = np.maximum(gamma, 1e-6) else: gamma = np.array([1.0]) # Associate each solution to nearest active vector angle = np.arccos(np.clip(1 - cdist(pop_obj_t, Va, metric='cosine'), -1, 1)) associate = np.argmin(angle, axis=1) # Compute APD for each solution APD_S = np.ones(N) for vi in np.unique(associate): current = np.where(associate == vi)[0] if len(current) > 0: APD = (1 + M * theta * angle[current, vi] / gamma[vi]) * \ np.sqrt(np.sum(pop_obj_t[current] ** 2, axis=1)) APD_S[current] = APD # Cluster active vectors if NCluster >= n_active: labels = np.arange(n_active) NCluster = n_active else: from sklearn.cluster import KMeans km = KMeans(n_clusters=NCluster, n_init=10, random_state=None) labels = km.fit_predict(Va) # Map solutions to clusters via their associated active vector Cindex = labels[associate] # Select one solution per cluster Next = np.full(NCluster, -1, dtype=int) for c in np.unique(Cindex): current = np.where(Cindex == c)[0] # For each active vector in this cluster, find the best APD solution solution_best = [] t = np.unique(associate[current]) for vi in t: current_s = np.where(associate == vi)[0] best_id = current_s[np.argmin(APD_S[current_s])] solution_best.append(best_id) solution_best = np.array(solution_best) # Among the per-vector bests, select the overall best best = solution_best[np.argmin(APD_S[solution_best])] Next[c] = best selected = Next[Next != -1] if len(selected) == 0: idx = np.random.permutation(N)[:min(mu, N)] return pop_dec[idx] return pop_dec[selected]