Source code for ddmtolab.Algorithms.STMO.SSDE

"""
Self-Organizing Surrogate-Assisted Non-Dominated Sorting Differential Evolution (SSDE)

This module implements SSDE for computationally expensive multi/many-objective optimization.
It uses a Self-Organizing Map (SOM) as a surrogate model to predict offspring quality,
combined with NSGA-II environmental selection. Only offspring that survive selection are
evaluated with the expensive objective function.

References
----------
    [1] A. F. R. Araujo, L. R. C. Farias, and A. R. C. Goncalves. Self-organizing surrogate-assisted non-dominated sorting differential evolution. Swarm and Evolutionary Computation, 2024, 91: 101703.

Notes
-----
Author: Jiangtao Shen (DDMTOLab implementation)
Date: 2026.02.16
Version: 1.0
"""
from tqdm import tqdm
import time
import numpy as np
from scipy.spatial.distance import cdist
from ddmtolab.Methods.Algo_Methods.algo_utils import *
import warnings

warnings.filterwarnings("ignore")


[docs] class SSDE: """ Self-Organizing Surrogate-Assisted Non-Dominated Sorting Differential Evolution for expensive multi/many-objective optimization. Attributes ---------- algorithm_information : dict Dictionary containing algorithm capabilities and requirements """ algorithm_information = { 'n_tasks': '[1, K]', 'dims': 'unequal', 'objs': 'unequal', 'n_objs': '[2, M]', 'cons': 'unequal', 'n_cons': '[0, C]', 'expensive': 'True', 'knowledge_transfer': 'False', 'n': 'unequal', 'max_nfes': 'unequal' } @classmethod def get_algorithm_information(cls, print_info=True): return get_algorithm_information(cls, print_info)
[docs] def __init__(self, problem, n=None, max_nfes=None, num_nodes=None, eta0=0.2, sigma0=None, save_data=True, save_path='./Data', name='SSDE', disable_tqdm=True): """ Initialize SSDE algorithm. Parameters ---------- problem : MTOP Multi-task optimization problem instance n : int or List[int], optional Population size per task (default: 100). Also used as the initial sample count (matching MATLAB: Problem.N). max_nfes : int or List[int], optional Maximum number of function evaluations per task (default: 200) num_nodes : int, optional Number of neurons in the SOM (default: same as n) eta0 : float, optional Initial learning rate for SOM training (default: 0.2) sigma0 : float, optional Initial neighborhood size for SOM (default: same as n) save_data : bool, optional Whether to save optimization data (default: True) save_path : str, optional Path to save results (default: './Data') name : str, optional Name for the experiment (default: 'SSDE') disable_tqdm : bool, optional Whether to disable progress bar (default: True) """ self.problem = problem self.max_nfes = max_nfes if max_nfes is not None else 200 self.n = n if n is not None else 100 self.num_nodes = num_nodes self.eta0 = eta0 self.sigma0 = sigma0 self.save_data = save_data self.save_path = save_path self.name = name self.disable_tqdm = disable_tqdm
[docs] def optimize(self): """ Execute the SSDE algorithm. Returns ------- Results Optimization results containing decision variables, objectives, and runtime """ start_time = time.time() problem = self.problem nt = problem.n_tasks dims = problem.dims n_objs = problem.n_objs n_cons = problem.n_cons n_per_task = par_list(self.n, nt) max_nfes_per_task = par_list(self.max_nfes, nt) # Generate initial samples using random initialization decs = initialization(problem, n_per_task, method='random') objs, cons = evaluation(problem, decs) nfes_per_task = n_per_task.copy() # History tracking has_cons = any(nc > 0 for nc in n_cons) pbar = tqdm(total=sum(max_nfes_per_task), initial=sum(n_per_task), desc=f"{self.name}", disable=self.disable_tqdm) # Per-task state task_states = [] for i in range(nt): N = n_per_task[i] D = dims[i] M = n_objs[i] num_nodes = self.num_nodes if self.num_nodes is not None else N sigma0 = self.sigma0 if self.sigma0 is not None else float(N) # Initialize SOM weight vectors: W ~ N(0.5, 0.001) a, b = 0.001, 0.5 W = a * np.random.randn(num_nodes, D + M) + b # 1D latent space: neurons at positions 1, 2, ..., num_nodes V = np.arange(1, num_nodes + 1, dtype=float).reshape(-1, 1) LDis = cdist(V, V) # pairwise distances in latent space # Winning weights tracking winning_weights = np.zeros(num_nodes, dtype=bool) # Current population (indices into accumulated decs/objs) pop_decs = decs[i].copy() pop_objs = objs[i].copy() pop_cons = cons[i].copy() if n_cons[i] > 0 else None # Sample set for SOM training sample_decs = pop_decs.copy() sample_objs = pop_objs.copy() task_states.append({ 'W': W, 'LDis': LDis, 'V': V, 'winning_weights': winning_weights, 'pop_decs': pop_decs, 'pop_objs': pop_objs, 'pop_cons': pop_cons, 'sample_decs': sample_decs, 'sample_objs': sample_objs, 'num_nodes': num_nodes, 'sigma0': sigma0, 'D': D, 'M': M, 'N': N, }) while sum(nfes_per_task) < sum(max_nfes_per_task): active_tasks = [i for i in range(nt) if nfes_per_task[i] < max_nfes_per_task[i]] if not active_tasks: break for i in active_tasks: st = task_states[i] N = st['N'] D = st['D'] M = st['M'] # Training: when enough samples accumulated if st['sample_decs'].shape[0] >= N: st['W'] = _som_training( st['W'], st['LDis'], st['sample_decs'], st['sample_objs'], st['num_nodes'], self.eta0, st['sigma0'], st['winning_weights'], D, M ) st['winning_weights'] = np.zeros(st['num_nodes'], dtype=bool) st['sample_decs'] = np.empty((0, D)) st['sample_objs'] = np.empty((0, M)) # Operator: generate offspring, predict with SOM, select pop_decs = st['pop_decs'] pop_objs = st['pop_objs'] pop_cons = st['pop_cons'] n_pop = pop_decs.shape[0] # Tournament selection based on constraint violations if pop_cons is not None and pop_cons.shape[1] > 0: cv = np.sum(np.maximum(0, pop_cons), axis=1) mating_pool = tournament_selection(2, N, -cv) else: mating_pool = np.random.randint(0, n_pop, size=N) # DE offspring generation donor1_idx = np.random.randint(0, n_pop, size=N) donor2_idx = np.random.randint(0, n_pop, size=N) offspring_dec = _de_operator( pop_decs[mating_pool], pop_decs[donor1_idx], pop_decs[donor2_idx] ) # Map offspring to nearest SOM neuron to estimate objectives Distance = cdist(offspring_dec, st['W'][:, :D]) rank = np.argsort(Distance, axis=1) offspring_labels = st['W'][rank[:, 0], D:D + M] # Track winning neurons st['winning_weights'][rank[:, 0]] = True # NSGA-II selection on combined population combined_objs = np.vstack([pop_objs, offspring_labels]) if pop_cons is not None and pop_cons.shape[1] > 0: # Offspring get zero constraints (optimistic assumption) combined_cons = np.vstack([ pop_cons, np.zeros((offspring_labels.shape[0], pop_cons.shape[1])) ]) front_no, max_fno = nd_sort(combined_objs, combined_cons, N) else: combined_cons = None front_no, max_fno = nd_sort(combined_objs, N) # Select N survivors Next = front_no < max_fno crowd_dis = crowding_distance(combined_objs, front_no) last_front = np.where(front_no == max_fno)[0] last_cd = crowd_dis[last_front] last_sorted = np.argsort(last_cd)[::-1] # descending n_needed = N - np.sum(Next) for idx in last_sorted[:n_needed]: Next[last_front[idx]] = True # Split selection into parent survivors and offspring survivors out = Next[:n_pop] # which parents survive in_mask = Next[n_pop:] # which offspring survive if np.sum(in_mask) >= 1: # Evaluate surviving offspring with expensive function selected_off_dec = offspring_dec[in_mask] # Limit to remaining budget remaining = max_nfes_per_task[i] - nfes_per_task[i] if selected_off_dec.shape[0] > remaining: selected_off_dec = selected_off_dec[:remaining] in_count = selected_off_dec.shape[0] new_objs, new_cons = evaluation_single(problem, selected_off_dec, i) # Update population: replace non-surviving parents with evaluated offspring # (matching MATLAB: Population(~out) = Offspring) non_surviving = np.where(~out)[0] n_replace = min(in_count, len(non_surviving)) for j in range(n_replace): st['pop_decs'][non_surviving[j]] = selected_off_dec[j] st['pop_objs'][non_surviving[j]] = new_objs[j] if pop_cons is not None: st['pop_cons'][non_surviving[j]] = new_cons[j] # Accumulate samples for SOM training st['sample_decs'] = np.vstack([st['sample_decs'], selected_off_dec[:in_count]]) st['sample_objs'] = np.vstack([st['sample_objs'], new_objs[:in_count]]) # Update global tracking decs[i] = np.vstack([decs[i], selected_off_dec[:in_count]]) objs[i] = np.vstack([objs[i], new_objs[:in_count]]) if all_cons is not None and n_cons[i] > 0: cons[i] = np.vstack([cons[i], new_cons[:in_count]]) nfes_per_task[i] += in_count pbar.update(in_count) else: # No offspring survived, use current population as samples st['sample_decs'] = np.vstack([st['sample_decs'], pop_decs]) st['sample_objs'] = np.vstack([st['sample_objs'], pop_objs]) pbar.close() runtime = time.time() - start_time if has_cons: all_decs, all_objs, all_cons = build_staircase_history(decs, objs, k=1, db_cons=cons) else: all_decs, all_objs = build_staircase_history(decs, objs, k=1) all_cons = None if all_cons is not None: results = build_save_results( all_decs=all_decs, all_objs=all_objs, runtime=runtime, max_nfes=nfes_per_task, all_cons=all_cons, bounds=problem.bounds, save_path=self.save_path, filename=self.name, save_data=self.save_data) else: results = build_save_results( all_decs=all_decs, all_objs=all_objs, runtime=runtime, max_nfes=nfes_per_task, bounds=problem.bounds, save_path=self.save_path, filename=self.name, save_data=self.save_data) return results
# ============================================================================= # SOM Training # ============================================================================= def _som_training(W, LDis, sample_decs, sample_objs, num_nodes, eta0, sigma0, winning_weights, D, M): """ Train the SOM surrogate model with memory-based reinitialization. Parameters ---------- W : np.ndarray SOM weight matrix, shape (num_nodes, D+M) LDis : np.ndarray Pairwise distances in latent space, shape (num_nodes, num_nodes) sample_decs : np.ndarray Sample decision variables in [0,1], shape (n_samples, D) sample_objs : np.ndarray Sample objective values, shape (n_samples, M) num_nodes : int Number of SOM neurons eta0 : float Initial learning rate sigma0 : float Initial neighborhood size winning_weights : np.ndarray Boolean mask of neurons that won during previous phase, shape (num_nodes,) D : int Number of decision variables M : int Number of objectives Returns ------- W : np.ndarray Updated SOM weight matrix """ W = W.copy() # Memory-based reinitialization of non-winning neurons n_winning = np.sum(winning_weights) if n_winning > 0 and n_winning < num_nodes: winning_idx = np.where(winning_weights)[0] non_winning_idx = np.where(~winning_weights)[0] n_dead = len(non_winning_idx) # Non-dominated sorting on winning nodes' objective weights winning_objs = W[winning_idx, D:] front_no, _ = nd_sort(winning_objs, n_winning) # Crowding distance for diversity criterion crowd_dis = crowding_distance(winning_objs, front_no) # Rank by crowding distance: higher CD = higher weight sorted_cd_idx = np.argsort(crowd_dis) factor = np.zeros(n_winning) for rank_val, idx in enumerate(sorted_cd_idx): factor[idx] = rank_val + 1 # 1-indexed rank # Weighted sampling with replacement probs = factor / np.sum(factor) chosen_local = np.random.choice(n_winning, size=2 * n_dead, replace=True, p=probs) chosen_local = chosen_local.reshape(n_dead, 2) # Map local indices to global winning indices chosen_global = winning_idx[chosen_local] # Linear combination + noise W[non_winning_idx] = ( (W[chosen_global[:, 0]] + W[chosen_global[:, 1]]) / 2.0 + 0.001 * np.random.randn(n_dead, D + M) ) # Repair decision space bounds W[:, :D] = np.clip(W[:, :D], 0.0, 1.0) # Concatenate samples: [decs, objs] # Decision variables are already in [0, 1] Samples = np.hstack([sample_decs, sample_objs]) n_samples = Samples.shape[0] # Reset win count win_count_set = np.zeros(num_nodes) # SOM training: 50 epochs for epoch in range(1, 51): # Shuffle samples randpos = np.random.permutation(n_samples) # Per-neuron neighborhood radius and learning rate sigma = sigma0 * np.exp(-win_count_set / n_samples) eta = eta0 * np.exp(-win_count_set / n_samples) for ii in range(n_samples): s = randpos[ii] # Find winning neuron (nearest in decision space) dists = np.sum((Samples[s, :D] - W[:, :D]) ** 2, axis=1) u1 = np.argmin(dists) # First win: assign sample to neuron directly if win_count_set[u1] == 0: W[u1] = Samples[s] # Update win counter (limited to current epoch) if win_count_set[u1] < epoch: win_count_set[u1] += 1 # Update winning neuron and its neighborhood U = LDis[u1] < sigma # neighbors within each neuron's own sigma U_idx = np.where(U)[0] if len(U_idx) > 0: eta_U = eta[U_idx].reshape(-1, 1) dist_decay = np.exp(-LDis[u1, U_idx]).reshape(-1, 1) W[U_idx] += eta_U * dist_decay * (Samples[s] - W[U_idx]) return W # ============================================================================= # DE Operator # ============================================================================= def _de_operator(parents, donors1, donors2, CR=0.9, F=0.5): """ Generate offspring using DE/rand/1/bin operator. Parameters ---------- parents : np.ndarray Target vectors, shape (N, D) donors1 : np.ndarray First donor population, shape (N, D) donors2 : np.ndarray Second donor population, shape (N, D) CR : float Crossover rate F : float Differential weight Returns ------- offspring : np.ndarray Offspring decision variables, shape (N, D), clipped to [0, 1] """ N, D = parents.shape # Mutation mutant = parents + F * (donors1 - donors2) # Binomial crossover mask = np.random.rand(N, D) < CR j_rand = np.random.randint(0, D, size=N) for idx in range(N): mask[idx, j_rand[idx]] = True offspring = np.where(mask, mutant, parents) return np.clip(offspring, 0.0, 1.0)