Source code for ddmtolab.Algorithms.STMO.EDN_ARMOEA

"""
Efficient Dropout Neural Network based AR-MOEA (EDN-ARMOEA)

This module implements EDN-ARMOEA for computationally expensive multi/many-objective optimization.
It uses a dropout neural network as surrogate model with MC dropout for uncertainty estimation,
combined with an adaptive reference point based evolutionary algorithm (AR-MOEA).

References
----------
    [1] D. Guo, X. Wang, K. Gao, Y. Jin, J. Ding, and T. Chai. Evolutionary optimization of high-dimensional multiobjective and many-objective expensive problems assisted by a dropout neural network. IEEE Transactions on Systems, Man, and Cybernetics: Systems, 2022, 52(4): 2084-2097.

Notes
-----
Author: Jiangtao Shen
Date: 2026.02.17
Version: 1.0
"""
from tqdm import tqdm
import time
import numpy as np
import torch
import torch.nn as nn
from scipy.spatial.distance import pdist, squareform, cdist
from ddmtolab.Methods.Algo_Methods.algo_utils import *
from ddmtolab.Methods.Algo_Methods.uniform_point import uniform_point
import warnings

warnings.filterwarnings("ignore")


[docs] class EDN_ARMOEA: """ Efficient Dropout Neural Network based AR-MOEA for expensive multi/many-objective optimization. """ algorithm_information = { 'n_tasks': '[1, K]', 'dims': 'unequal', 'objs': 'unequal', 'n_objs': '[2, M]', 'cons': 'equal', 'n_cons': '0', 'expensive': 'True', 'knowledge_transfer': 'False', 'n_initial': 'unequal', 'max_nfes': 'unequal' } @classmethod def get_algorithm_information(cls, print_info=True): return get_algorithm_information(cls, print_info)
[docs] def __init__(self, problem, n_initial=None, max_nfes=None, n=100, delta=0.05, wmax=20, ke=3, save_data=True, save_path='./Data', name='EDN-ARMOEA', disable_tqdm=True): """ Initialize EDN-ARMOEA algorithm. Parameters ---------- problem : MTOP Multi-task optimization problem instance n_initial : int or List[int], optional Number of initial samples per task (default: 11*dim-1) max_nfes : int or List[int], optional Maximum number of function evaluations per task (default: 200) n : int or List[int], optional Population size per task (default: 100) delta : float, optional Threshold for judging diversity improvement (default: 0.05) wmax : int, optional Number of generations before updating models (default: 20) ke : int, optional Number of solutions to be re-evaluated in each iteration (default: 3) save_data : bool, optional Whether to save optimization data (default: True) save_path : str, optional Path to save results (default: './Data') name : str, optional Name for the experiment (default: 'EDN-ARMOEA') disable_tqdm : bool, optional Whether to disable progress bar (default: True) """ self.problem = problem self.n_initial = n_initial self.max_nfes = max_nfes if max_nfes is not None else 200 self.n = n self.delta = delta self.wmax = wmax self.ke = ke self.save_data = save_data self.save_path = save_path self.name = name self.disable_tqdm = disable_tqdm
[docs] def optimize(self): """ Execute the EDN-ARMOEA algorithm. Returns ------- Results Optimization results containing decision variables, objectives, and runtime """ start_time = time.time() problem = self.problem nt = problem.n_tasks dims = problem.dims n_objs = problem.n_objs n_initial_per_task = par_list( self.n_initial if self.n_initial is not None else [11 * d - 1 for d in dims], nt) max_nfes_per_task = par_list(self.max_nfes, nt) n_per_task = par_list(self.n, nt) # Generate initial samples using Latin Hypercube Sampling decs = initialization(problem, n_initial_per_task, method='lhs') objs, _ = evaluation(problem, decs) nfes_per_task = n_initial_per_task.copy() pbar = tqdm(total=sum(max_nfes_per_task), initial=sum(n_initial_per_task), desc=f"{self.name}", disable=self.disable_tqdm) # Per-task state: nets and ratio_old nets = [None] * nt ratio_olds = [None] * nt while sum(nfes_per_task) < sum(max_nfes_per_task): active_tasks = [i for i in range(nt) if nfes_per_task[i] < max_nfes_per_task[i]] if not active_tasks: break for i in active_tasks: M = n_objs[i] D = dims[i] popsize = n_per_task[i] # Generate reference vectors W, _ = uniform_point(popsize, M) # Prepare training data: select from archive if decs[i].shape[0] > 11 * D - 1: tr_x, tr_y = _select_train_data(decs[i], objs[i], 11 * D - 1, min(self.ke, decs[i].shape[0])) else: tr_x, tr_y = decs[i].copy(), objs[i].copy() # Normalize training data to [-1, 1] tr_xx, ps = _mapminmax(tr_x) tr_yy, qs = _mapminmax(tr_y) # Train or update the dropout neural network if nets[i] is None: nets[i] = _train_model(tr_xx, tr_yy, D, M, n_iter=80000) else: _update_model(nets[i], tr_xx, tr_yy, D, n_iter=8000) # Generate random population and estimate pop_dec = np.random.rand(popsize, D) pop_obj, pop_mse = _estimate(pop_dec, nets[i], ps, qs, M) # UpdateRefPoint archive_obj, ref_point, range_val, ratio = _update_ref_point(pop_obj, W, None) if ratio_olds[i] is None: ratio_olds[i] = ratio # Inner evolution loop for w in range(self.wmax - 1): mating_pool = _mating_selection(pop_obj, ref_point, range_val) offspring_dec = ga_generation(pop_dec[mating_pool], muc=20, mum=20) offspring_obj, offspring_mse = _estimate(offspring_dec, nets[i], ps, qs, M) archive_obj, ref_point, range_val, ratio = _update_ref_point( np.vstack([archive_obj, offspring_obj]), W, range_val) mediate_dec = np.vstack([pop_dec, offspring_dec]) mediate_obj = np.vstack([pop_obj, offspring_obj]) mediate_mse = np.vstack([pop_mse, offspring_mse]) indices, range_val = _environmental_selection( mediate_obj, ref_point, range_val, popsize) pop_dec = mediate_dec[indices] pop_obj = mediate_obj[indices] pop_mse = mediate_mse[indices] # Select individuals for expensive evaluation flag = int(ratio_olds[i] - ratio < self.delta) pop_new = _individual_select(pop_dec, pop_obj, pop_mse, self.ke, flag) ratio_olds[i] = ratio # Remove duplicates pop_new = remove_duplicates(pop_new, decs[i]) if pop_new.shape[0] > 0: remaining = max_nfes_per_task[i] - nfes_per_task[i] if pop_new.shape[0] > remaining: pop_new = pop_new[:remaining] new_objs, _ = evaluation_single(problem, pop_new, i) decs[i] = np.vstack([decs[i], pop_new]) objs[i] = np.vstack([objs[i], new_objs]) nfes_per_task[i] += pop_new.shape[0] pbar.update(pop_new.shape[0]) pbar.close() runtime = time.time() - start_time all_decs, all_objs = build_staircase_history(decs, objs, k=self.ke) results = build_save_results(all_decs=all_decs, all_objs=all_objs, runtime=runtime, max_nfes=nfes_per_task, bounds=problem.bounds, save_path=self.save_path, filename=self.name, save_data=self.save_data) return results
# ============================================================================= # Dropout Neural Network # ============================================================================= class _DropoutNet: """Manual dropout neural network matching MATLAB implementation. Architecture: Input -> Dropout(0.2) -> FC(D,40) -> ReLU -> Dropout(0.5) -> FC(40,40) -> Tanh -> FC(40,M) Uses manual SGD with weight decay on weights only (not biases). """ def __init__(self, D, M, neuron_n=40): self.drop_p = [0.2, 0.5] self.decay = 1e-5 self.learn_r = 0.01 # Initialize weights (LeCun init: N(0, sqrt(1/fan_in))) self.W1 = np.random.randn(D, neuron_n) * np.sqrt(1.0 / D) self.W2 = np.random.randn(neuron_n, neuron_n) * np.sqrt(1.0 / neuron_n) self.W3 = np.random.randn(neuron_n, M) * np.sqrt(1.0 / neuron_n) # Initialize biases self.B1 = np.ones((1, neuron_n)) * 0.1 * np.sqrt(1.0 / neuron_n) self.B2 = np.random.randn(1, neuron_n) * np.sqrt(1.0 / neuron_n) self.B3 = np.random.randn(1, M) * np.sqrt(1.0 / M) def train_step(self, x, y): """One training step: forward + backward + SGD update.""" N = x.shape[0] dp = self.drop_p # Forward x1, mask1 = _dropout(x, dp[0]) x2 = x1 @ self.W1 + self.B1 x3 = np.maximum(0, x2) # ReLU x4, mask4 = _dropout(x3, dp[1]) x5 = x4 @ self.W2 + self.B2 x6 = np.tanh(x5) x7 = x6 @ self.W3 + self.B3 # Backward e = x7 - y dW3 = x6.T @ e dB3 = np.sum(e, axis=0, keepdims=True) dx6 = e @ self.W3.T dx5 = dx6 * (1 - x6 ** 2) # tanh derivative dW2 = x4.T @ dx5 dB2 = np.sum(dx5, axis=0, keepdims=True) dx4 = dx5 @ self.W2.T # Dropout backward: zero out dropped, scale kept dx3 = dx4 * mask4 / (1 - dp[1]) dx2 = dx3 * (x3 > 0).astype(float) # ReLU derivative dW1 = x1.T @ dx2 dB1 = np.sum(dx2, axis=0, keepdims=True) # SGD update with weight decay on weights only lr = self.learn_r self.W1 -= (self.decay * self.W1 + dW1) / N * lr self.W2 -= (self.decay * self.W2 + dW2) / N * lr self.W3 -= (self.decay * self.W3 + dW3) / N * lr self.B1 -= dB1 / N * lr self.B2 -= dB2 / N * lr self.B3 -= dB3 / N * lr def forward_dropout(self, x): """Forward pass WITH dropout (for MC estimation).""" dp = self.drop_p x1, _ = _dropout(x, dp[0]) x2 = x1 @ self.W1 + self.B1 x3 = np.maximum(0, x2) x4, _ = _dropout(x3, dp[1]) x5 = x4 @ self.W2 + self.B2 x6 = np.tanh(x5) x7 = x6 @ self.W3 + self.B3 return x7 def _dropout(x, p): """Inverted dropout: zero with prob p, scale by 1/(1-p).""" mask = (np.random.rand(*x.shape) >= p).astype(float) return x * mask / (1 - p), mask # ============================================================================= # Model Training / Update / Estimation # ============================================================================= def _mapminmax(data): """Normalize each feature to [-1, 1]. Returns normalized data and params.""" mins = data.min(axis=0) maxs = data.max(axis=0) ranges = maxs - mins ranges[ranges < 1e-10] = 1.0 normalized = 2.0 * (data - mins) / ranges - 1.0 return normalized, (mins, maxs, ranges) def _mapminmax_apply(data, params): """Apply normalization using existing params.""" mins, maxs, ranges = params return 2.0 * (data - mins) / ranges - 1.0 def _mapminmax_reverse(data, params): """Reverse normalization.""" mins, maxs, ranges = params return (data + 1.0) * ranges / 2.0 + mins def _train_model(tr_xx, tr_yy, D, M, n_iter=80000): """Train the dropout neural network from scratch.""" net = _DropoutNet(D, M) N = tr_xx.shape[0] batch_size = D for j in range(n_iter): idx = np.random.randint(0, N, size=batch_size) x_batch = tr_xx[idx] y_batch = tr_yy[idx] net.train_step(x_batch, y_batch) return net def _update_model(net, tr_xx, tr_yy, D, n_iter=8000): """Fine-tune the dropout neural network.""" N = tr_xx.shape[0] batch_size = D for j in range(n_iter): idx = np.random.randint(0, N, size=batch_size) x_batch = tr_xx[idx] y_batch = tr_yy[idx] net.train_step(x_batch, y_batch) def _estimate(pop_dec, net, ps, qs, M): """MC dropout estimation: 100 stochastic forward passes.""" x = _mapminmax_apply(pop_dec, ps) N = x.shape[0] n_mc = 100 sum_y = np.zeros((N, M)) sum_ysq = np.zeros((N, M)) for _ in range(n_mc): y_pred = net.forward_dropout(x) y_orig = _mapminmax_reverse(y_pred, qs) sum_y += y_orig sum_ysq += y_orig ** 2 mu = sum_y / n_mc s2 = sum_ysq / n_mc std = np.sqrt(np.maximum(s2 - mu ** 2, 0)) return mu, std # ============================================================================= # AR-MOEA Selection Components # ============================================================================= def _cal_distance(pop_obj, ref_point): """Calculate distance between each solution and each adjusted reference point.""" N = pop_obj.shape[0] NR = ref_point.shape[0] # Handle zero-norm solutions norms = np.sqrt(np.sum(pop_obj ** 2, axis=1)) zero_idx = norms == 0 if np.any(zero_idx): pop_obj = pop_obj.copy() pop_obj[zero_idx] += 1e-6 * np.random.rand(np.sum(zero_idx), pop_obj.shape[1]) # Cosine similarity norm_p = np.sqrt(np.sum(pop_obj ** 2, axis=1, keepdims=True)) # (N, 1) norm_r = np.sqrt(np.sum(ref_point ** 2, axis=1, keepdims=True)) # (NR, 1) cosine = 1 - cdist(pop_obj, ref_point, 'cosine') # (N, NR) cosine = np.clip(cosine, -1, 1) # d1 and d2 d1 = norm_p * cosine # (N, NR) d2 = norm_p * np.sqrt(np.maximum(1 - cosine ** 2, 0)) # (N, NR) # Adjust reference points nearest = np.argmin(d2, axis=0) # (NR,) scale = d1[nearest, np.arange(NR)] / norm_r.ravel() # (NR,) adjusted_ref = ref_point * scale[:, np.newaxis] # Euclidean distance distance = cdist(pop_obj, adjusted_ref) return distance def _update_ref_point(archive_obj, W, range_val): """Adaptive reference point management.""" # Delete dominated solutions and duplicates if archive_obj.shape[0] > 0: front_no, _ = nd_sort(archive_obj, archive_obj.shape[0]) nd_mask = front_no == 1 archive_obj = archive_obj[nd_mask] # Remove duplicate rows if archive_obj.shape[0] > 1: _, unique_idx = np.unique(archive_obj, axis=0, return_index=True) archive_obj = archive_obj[np.sort(unique_idx)] NA = archive_obj.shape[0] NW = W.shape[0] # Update ideal point if range_val is not None: range_val = range_val.copy() range_val[0] = np.minimum(range_val[0], archive_obj.min(axis=0)) if NA > 0 else range_val[0] elif NA > 0: range_val = np.vstack([archive_obj.min(axis=0), archive_obj.max(axis=0)]) if NA <= 1: return archive_obj, W.copy(), range_val, 0.0 # Translate archive t_archive = archive_obj - range_val[0] # Scale weight vectors w_scaled = W * (range_val[1] - range_val[0]) # Find contributing solutions and valid weight vectors distance = _cal_distance(t_archive, w_scaled) # (NA, NW) nearest_p = np.argmin(distance, axis=0) # (NW,) - nearest solution per ref point contributing_s = np.unique(nearest_p) nearest_w = np.argmin(distance, axis=1) # (NA,) - nearest ref point per solution valid_w = np.unique(nearest_w[contributing_s]) # Update archive: expand with diversity choose = np.zeros(NA, dtype=bool) choose[contributing_s] = True cosine = 1 - cdist(t_archive, t_archive, 'cosine') np.fill_diagonal(cosine, 0) target_size = min(3 * NW, NA) while np.sum(choose) < target_size: unselected = np.where(~choose)[0] if len(unselected) == 0: break max_cos = np.max(cosine[np.ix_(unselected, np.where(choose)[0])], axis=1) x = np.argmin(max_cos) choose[unselected[x]] = True archive_obj = archive_obj[choose] t_archive = t_archive[choose] # Update reference points ref_candidates = np.vstack([w_scaled[valid_w], t_archive]) n_valid = len(valid_w) choose_ref = np.zeros(ref_candidates.shape[0], dtype=bool) choose_ref[:n_valid] = True cosine_ref = 1 - cdist(ref_candidates, ref_candidates, 'cosine') np.fill_diagonal(cosine_ref, 0) target_ref = min(NW, ref_candidates.shape[0]) while np.sum(choose_ref) < target_ref: unselected = np.where(~choose_ref)[0] if len(unselected) == 0: break selected = np.where(choose_ref)[0] max_cos = np.max(cosine_ref[np.ix_(unselected, selected)], axis=1) x = np.argmin(max_cos) choose_ref[unselected[x]] = True ref_point = ref_candidates[choose_ref] ratio = len(valid_w) / NW return archive_obj, ref_point, range_val, ratio def _environmental_selection(obj, ref_point, range_val, N): """AR-MOEA environmental selection: ND sort + last front selection.""" n_pop = obj.shape[0] front_no, max_fno = nd_sort(obj, N) # Select all solutions in fronts < max front next_mask = front_no < max_fno # Select from last front last = np.where(front_no == max_fno)[0] remain_n = N - np.sum(next_mask) if remain_n > 0 and len(last) > 0: chosen_last = _last_selection(obj[last], ref_point, range_val, remain_n) next_mask[last[chosen_last]] = True indices = np.where(next_mask)[0] # Update range range_val = range_val.copy() range_val[1] = np.max(obj, axis=0) diff = range_val[1] - range_val[0] range_val[1, diff < 1e-6] = 1.0 return indices, range_val def _last_selection(pop_obj, ref_point, range_val, K): """Select K solutions from the last front based on IGD-NS metric contribution.""" N = pop_obj.shape[0] NR = ref_point.shape[0] # Translate by ideal point translated = pop_obj - range_val[0] distance = _cal_distance(translated, ref_point) # (N, NR) convergence = np.min(distance, axis=1) # (N,) # Sort distance per reference point (column-wise) rank = np.argsort(distance, axis=0) # (N, NR) dis = np.sort(distance, axis=0) # (N, NR) remain = np.ones(N, dtype=bool) while np.sum(remain) > K: remain_idx = np.where(remain)[0] n_remain = len(remain_idx) # Identify noncontributing solutions noncontributing = np.ones(N, dtype=bool) noncontributing[rank[0, :]] = False noncontributing &= remain METRIC = np.sum(dis[0, :]) + np.sum(convergence[noncontributing]) metric = np.full(N, np.inf) # Fitness of noncontributing nc_idx = np.where(noncontributing & remain)[0] for p in nc_idx: metric[p] = METRIC - convergence[p] # Fitness of contributing contributing = remain & ~noncontributing for p in np.where(contributing)[0]: temp = rank[0, :] == p nc_new = np.zeros(N, dtype=bool) nc_new[rank[1, temp]] = True nc_new &= noncontributing metric[p] = METRIC - np.sum(dis[0, temp]) + np.sum(dis[1, temp]) - np.sum( convergence[nc_new]) # Delete worst metric[~remain] = np.inf delete = np.argmin(metric) remain[delete] = False # Update dis and rank: remove deleted solution keep_mask = rank != delete n_now = np.sum(remain) new_dis = np.zeros((n_now, NR)) new_rank = np.zeros((n_now, NR), dtype=int) for j in range(NR): col_keep = keep_mask[:, j] new_dis[:, j] = dis[col_keep, j][:n_now] new_rank[:, j] = rank[col_keep, j][:n_now] dis = new_dis rank = new_rank return np.where(remain)[0] def _mating_selection(obj, ref_point, range_val): """AR-MOEA mating selection based on IGD-NS fitness.""" N = obj.shape[0] NR = ref_point.shape[0] translated = obj - range_val[0] distance = _cal_distance(translated, ref_point) convergence = np.min(distance, axis=1) rank = np.argsort(distance, axis=0) dis = np.sort(distance, axis=0) noncontributing = np.ones(N, dtype=bool) noncontributing[rank[0, :]] = False METRIC = np.sum(dis[0, :]) + np.sum(convergence[noncontributing]) fitness = np.full(N, np.inf) nc_idx = np.where(noncontributing)[0] for p in nc_idx: fitness[p] = METRIC - convergence[p] for p in np.where(~noncontributing)[0]: temp = rank[0, :] == p nc_new = np.zeros(N, dtype=bool) nc_new[rank[1, temp]] = True nc_new &= noncontributing fitness[p] = METRIC - np.sum(dis[0, temp]) + np.sum(dis[1, temp]) - np.sum( convergence[nc_new]) # Binary tournament selection (higher fitness = worse, so select by -fitness) n_select = (N // 2) * 2 if n_select < 2: n_select = 2 pool = np.zeros(n_select, dtype=int) for k in range(n_select): i1, i2 = np.random.randint(0, N, size=2) pool[k] = i1 if fitness[i1] > fitness[i2] else i2 # Higher metric = better diversity return pool # ============================================================================= # Individual Selection (Infill) # ============================================================================= def _individual_select(pop_dec, pop_obj, pop_mse, ke, flag): """Select ke individuals using k-means clustering. flag=0: select by max uncertainty (exploration) flag=1: select by min convergence (exploitation) """ from sklearn.cluster import KMeans N = pop_dec.shape[0] actual_ke = min(ke, N) try: kmeans = KMeans(n_clusters=actual_ke, n_init=10, random_state=None) labels = kmeans.fit_predict(pop_obj) except Exception: # Fallback: random selection idx = np.random.choice(N, size=actual_ke, replace=False) return pop_dec[idx] selected = [] unique_labels = np.unique(labels) if flag == 0: # Max uncertainty per cluster uncertainty = np.mean(pop_mse, axis=1) for lbl in unique_labels: cluster_idx = np.where(labels == lbl)[0] best = cluster_idx[np.argmax(uncertainty[cluster_idx])] selected.append(best) else: # Min convergence per cluster convergence = np.sqrt(np.sum(pop_obj ** 2, axis=1)) for lbl in unique_labels: cluster_idx = np.where(labels == lbl)[0] best = cluster_idx[np.argmin(convergence[cluster_idx])] selected.append(best) return pop_dec[np.array(selected)] # ============================================================================= # Training Data Selection # ============================================================================= def _select_train_data(all_decs, all_objs, N1, N2): """Select diverse training data. Prioritize recent N2 solutions, then greedily fill to N1 by max cosine diversity. """ NA = all_objs.shape[0] N1 = min(N1, NA) N2 = min(N2, NA) # Translate objectives tr_y = all_objs - all_objs.min(axis=0) # Cosine similarity matrix norms = np.sqrt(np.sum(tr_y ** 2, axis=1, keepdims=True)) norms[norms < 1e-10] = 1e-10 tr_y_norm = tr_y / norms cosine = tr_y_norm @ tr_y_norm.T np.fill_diagonal(cosine, 0) # Start with recent N2 solutions (last N2 entries) choose = np.zeros(NA, dtype=bool) choose[NA - N2:] = True while np.sum(choose) < N1: unselected = np.where(~choose)[0] selected = np.where(choose)[0] if len(unselected) == 0: break max_cos = np.max(cosine[np.ix_(unselected, selected)], axis=1) x = np.argmin(max_cos) choose[unselected[x]] = True return all_decs[choose], tr_y[choose]