Source code for ddmtolab.Algorithms.STMO.CSEA

"""
Classification-based Surrogate-assisted Evolutionary Algorithm (CSEA)

This module implements CSEA for computationally expensive multi/many-objective optimization.
It uses a neural network classifier to distinguish promising solutions from non-promising ones
relative to reference solutions, with adaptive surrogate-assisted selection strategies.

References
----------
    [1] L. Pan, C. He, Y. Tian, H. Wang, X. Zhang, and Y. Jin. A classification based surrogate-assisted evolutionary algorithm for expensive many-objective optimization. IEEE Transactions on Evolutionary Computation, 2019, 23(1): 74-88.

Notes
-----
Author: Jiangtao Shen
Date: 2026.02.16
Version: 1.0
"""
from tqdm import tqdm
import time
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader
from scipy.spatial.distance import pdist, squareform, cdist
from ddmtolab.Methods.Algo_Methods.algo_utils import *
import warnings

warnings.filterwarnings("ignore")


[docs] class CSEA: """ Classification-based Surrogate-assisted Evolutionary Algorithm for expensive multi/many-objective optimization. Attributes ---------- algorithm_information : dict Dictionary containing algorithm capabilities and requirements """ algorithm_information = { 'n_tasks': '[1, K]', 'dims': 'unequal', 'objs': 'unequal', 'n_objs': '[2, M]', 'cons': 'equal', 'n_cons': '0', 'expensive': 'True', 'knowledge_transfer': 'False', 'n_initial': 'unequal', 'max_nfes': 'unequal' } @classmethod def get_algorithm_information(cls, print_info=True): return get_algorithm_information(cls, print_info)
[docs] def __init__(self, problem, n_initial=None, max_nfes=None, n=None, k=6, gmax=3000, save_data=True, save_path='./Data', name='CSEA', disable_tqdm=True): """ Initialize CSEA algorithm. Parameters ---------- problem : MTOP Multi-task optimization problem instance n_initial : int or List[int], optional Number of initial samples per task (default: min(11*dim-1, 109)) max_nfes : int or List[int], optional Maximum number of function evaluations per task (default: 200) n : int or List[int], optional Population size per task (default: same as n_initial) k : int, optional Number of reference solutions (default: 6) gmax : int, optional Number of solutions evaluated by surrogate model (default: 3000) save_data : bool, optional Whether to save optimization data (default: True) save_path : str, optional Path to save results (default: './Data') name : str, optional Name for the experiment (default: 'CSEA') disable_tqdm : bool, optional Whether to disable progress bar (default: True) """ self.problem = problem self.n_initial = n_initial self.max_nfes = max_nfes if max_nfes is not None else 200 self.n = n self.k = k self.gmax = gmax self.save_data = save_data self.save_path = save_path self.name = name self.disable_tqdm = disable_tqdm
[docs] def optimize(self): """ Execute the CSEA algorithm. Returns ------- Results Optimization results containing decision variables, objectives, and runtime """ start_time = time.time() problem = self.problem nt = problem.n_tasks dims = problem.dims n_objs = problem.n_objs # Set default initial samples: min(11*dim - 1, 109) if self.n_initial is None: n_initial_per_task = [min(11 * dims[i] - 1, 109) for i in range(nt)] else: n_initial_per_task = par_list(self.n_initial, nt) max_nfes_per_task = par_list(self.max_nfes, nt) # Population size defaults to n_initial if self.n is None: n_per_task = n_initial_per_task.copy() else: n_per_task = par_list(self.n, nt) # Generate initial samples using Latin Hypercube Sampling decs = initialization(problem, n_initial_per_task, method='lhs') objs, _ = evaluation(problem, decs) nfes_per_task = n_initial_per_task.copy() # Archive: all evaluated solutions arc_decs = [d.copy() for d in decs] arc_objs = [o.copy() for o in objs] pbar = tqdm(total=sum(max_nfes_per_task), initial=sum(n_initial_per_task), desc=f"{self.name}", disable=self.disable_tqdm) while sum(nfes_per_task) < sum(max_nfes_per_task): active_tasks = [i for i in range(nt) if nfes_per_task[i] < max_nfes_per_task[i]] if not active_tasks: break for i in active_tasks: dim = dims[i] k = min(self.k, len(arc_decs[i])) n_pop = n_per_task[i] # Current population: select best n_pop from archive via RefSelect pop_decs, pop_objs = _ref_select(arc_decs[i], arc_objs[i], n_pop) # Select k reference solutions ref_decs, ref_objs = _ref_select(pop_decs, pop_objs, k) # Classify: 1 = not dominated by all ref points, 0 = dominated labels = _get_output(pop_objs, ref_objs) rr = np.sum(labels) / len(labels) tr = min(rr, 1 - rr) * 0.5 # Stratified train/test split train_X, train_Y, test_X, test_Y = _data_process(pop_decs, labels) # Train neural network classifier net = _build_network(dim) _train_network(net, train_X, train_Y, epochs=100, batch_size=32) # Calculate error rates on test set p0, p1 = _calc_error_rates(net, test_X, test_Y) # Surrogate-assisted selection next_decs = _surrogate_assisted_selection( net, p0, p1, ref_decs, pop_decs, self.gmax, tr ) if next_decs is not None and len(next_decs) > 0: # Remove duplicates next_decs = remove_duplicates(next_decs, decs[i]) if next_decs.shape[0] > 0: # Limit evaluations to remaining budget remaining = max_nfes_per_task[i] - nfes_per_task[i] if next_decs.shape[0] > remaining: next_decs = next_decs[:remaining] # Evaluate with expensive function next_objs, _ = evaluation_single(problem, next_decs, i) # Update archive arc_decs[i] = np.vstack([arc_decs[i], next_decs]) arc_objs[i] = np.vstack([arc_objs[i], next_objs]) decs[i] = np.vstack([decs[i], next_decs]) objs[i] = np.vstack([objs[i], next_objs]) nfes_per_task[i] += next_decs.shape[0] pbar.update(next_decs.shape[0]) pbar.close() runtime = time.time() - start_time # Convert database to staircase history structure for results db_decs = [decs[i].copy() for i in range(nt)] db_objs = [objs[i].copy() for i in range(nt)] all_decs, all_objs = build_staircase_history(db_decs, db_objs, k=1) results = build_save_results(all_decs=all_decs, all_objs=all_objs, runtime=runtime, max_nfes=nfes_per_task, bounds=problem.bounds, save_path=self.save_path, filename=self.name, save_data=self.save_data) return results
# ============================================================================= # Neural Network Classifier # ============================================================================= class _CSEANet(nn.Module): """Neural network classifier for CSEA.""" def __init__(self, input_dim): super().__init__() hidden_size = int(np.ceil(input_dim * 2)) self.net = nn.Sequential( nn.Linear(input_dim, hidden_size), nn.BatchNorm1d(hidden_size), nn.ReLU(), nn.Linear(hidden_size, 1), nn.Sigmoid() ) def forward(self, x): return self.net(x) def _build_network(input_dim): """Build CSEA classifier network.""" return _CSEANet(input_dim) def _train_network(net, train_X, train_Y, epochs=100, batch_size=32): """ Train the classifier with ADAM optimizer and MSE loss (regression layer in MATLAB). Parameters ---------- net : _CSEANet Neural network model train_X : np.ndarray Training inputs, shape (n, d) train_Y : np.ndarray Training labels, shape (n,) epochs : int Number of training epochs batch_size : int Mini-batch size """ device = torch.device('cpu') net = net.to(device) net.train() # Z-score normalization (matching MATLAB featureInputLayer 'zscore') mean = train_X.mean(axis=0) std = train_X.std(axis=0) std[std < 1e-10] = 1.0 train_X_norm = (train_X - mean) / std # Store normalization parameters net._input_mean = mean net._input_std = std X_tensor = torch.tensor(train_X_norm, dtype=torch.float32, device=device) Y_tensor = torch.tensor(train_Y, dtype=torch.float32, device=device).unsqueeze(1) dataset = TensorDataset(X_tensor, Y_tensor) loader = DataLoader(dataset, batch_size=batch_size, shuffle=True, drop_last=False) optimizer = torch.optim.Adam(net.parameters()) criterion = nn.MSELoss() for _ in range(epochs): for batch_X, batch_Y in loader: optimizer.zero_grad() pred = net(batch_X) loss = criterion(pred, batch_Y) loss.backward() optimizer.step() def _predict(net, X): """ Predict labels using the trained classifier. Parameters ---------- net : _CSEANet Trained neural network X : np.ndarray Input data, shape (n, d) Returns ------- predictions : np.ndarray Predicted labels, shape (n,) """ device = torch.device('cpu') net.eval() # Apply same normalization as training X_norm = (X - net._input_mean) / net._input_std with torch.no_grad(): X_tensor = torch.tensor(X_norm, dtype=torch.float32, device=device) pred = net(X_tensor).cpu().numpy().flatten() return pred def _calc_error_rates(net, test_X, test_Y): """ Calculate classification error rates for class 0 and class 1. Parameters ---------- net : _CSEANet Trained neural network test_X : np.ndarray Test inputs, shape (n, d) test_Y : np.ndarray Test labels, shape (n,) Returns ------- p0 : float Error rate for class 1 (good solutions misclassified) p1 : float Error rate for class 0 (bad solutions misclassified) """ test_pred = _predict(net, test_X) # p0: error for class 1 (IndexGood = TestOut==1) good_mask = test_Y == 1 if np.sum(good_mask) > 0: p0 = np.sum(np.abs(test_Y[good_mask] - test_pred[good_mask])) / np.sum(good_mask) else: p0 = 0.0 # p1: error for class 0 (IndexGood = ~IndexGood) bad_mask = ~good_mask if np.sum(bad_mask) > 0: p1 = np.sum(np.abs(test_Y[bad_mask] - test_pred[bad_mask])) / np.sum(bad_mask) else: p1 = 0.0 return p0, p1 # ============================================================================= # Reference Solution Selection (RefSelect) # ============================================================================= def _ref_select(pop_decs, pop_objs, k): """ Select k reference solutions using RSEA strategy. Parameters ---------- pop_decs : np.ndarray Decision variables, shape (N, D) pop_objs : np.ndarray Objective values, shape (N, M) k : int Number of reference solutions to select Returns ------- ref_decs : np.ndarray Selected reference decision variables ref_objs : np.ndarray Selected reference objective values """ N = pop_objs.shape[0] k = min(k, N) # Non-dominated sorting to select candidate set front_no, max_fno = nd_sort(pop_objs, k) next_idx = np.where(front_no <= max_fno)[0] # Normalize objectives pmin = pop_objs.min(axis=0) + 1e-6 pmax = pop_objs.max(axis=0) if np.all(pmax > pmin): norm_objs = (pop_objs - pmin) / (pmax - pmin) else: norm_objs = pop_objs.copy() # Environmental selection is_chosen = np.isin(next_idx, np.where(front_no < max_fno)[0]) div = int(np.ceil(np.sqrt(k))) choose = _last_selection(norm_objs[next_idx], is_chosen, div, k) selected = next_idx[choose] return pop_decs[selected], pop_objs[selected] def _last_selection(pop_obj, choose, div, k): """ Select solutions based on radar grid for convergence-diversity balance. Parameters ---------- pop_obj : np.ndarray Normalized objective values, shape (N, M) choose : np.ndarray Boolean mask of already chosen solutions div : int Grid divisions k : int Total number to select Returns ------- choose : np.ndarray Boolean mask of selected solutions """ N, M = pop_obj.shape choose = choose.copy().astype(bool) # Identify extreme solutions using PBI metric ones_vec = np.ones((1, M)) # PBI: perpendicular distance from solution to weight vector (1,...,1) norm = np.sqrt(np.sum(pop_obj ** 2, axis=1)) cosine = 1 - cdist(pop_obj, ones_vec, metric='cosine').flatten() pbi = norm * np.sqrt(np.clip(1 - cosine ** 2, 0, None)) extreme = np.argmin(pbi) choose[extreme] = True # Calculate convergence (sum of objectives) con = np.sum(pop_obj, axis=1) max_con = np.max(con) if max_con > 0: con = con / max_con # Calculate radar grid site, rloc = _radar_grid(pop_obj, div) rdis = squareform(pdist(rloc)) np.fill_diagonal(rdis, np.inf) # Grid crowding max_site = np.max(site) + 1 crowd_g = np.zeros(max_site) if np.any(choose): chosen_sites = site[choose] unique_sites, counts = np.unique(chosen_sites, return_counts=True) crowd_g[unique_sites] = counts # Select k solutions while np.sum(choose) < k: remain_s = np.where(~choose)[0] if len(remain_s) == 0: break remain_g = np.unique(site[remain_s]) min_crowd = np.min(crowd_g[remain_g]) best_g = remain_g[crowd_g[remain_g] == min_crowd] current = remain_s[np.isin(site[remain_s], best_g)] # Fitness: balance convergence and diversity chosen_idx = np.where(choose)[0] if len(chosen_idx) > 0: min_rdis = np.min(rdis[np.ix_(current, chosen_idx)], axis=1) else: min_rdis = np.zeros(len(current)) fitness = 0.1 * M * con[current] - min_rdis best = np.argmin(fitness) choose[current[best]] = True crowd_g[site[current[best]]] += 1 return choose # ============================================================================= # Radar Grid # ============================================================================= def _radar_grid(pop_obj, div): """ Calculate radar grid index of each solution. Parameters ---------- pop_obj : np.ndarray Objective values, shape (N, M) div : int Number of grid divisions Returns ------- site : np.ndarray Grid site index for each solution, shape (N,) rloc : np.ndarray Radar coordinates, shape (N, 2) """ N, M = pop_obj.shape # Calculate radar coordinates theta = np.linspace(0, 2 * np.pi * (M - 1) / M, M) row_sum = np.sum(pop_obj, axis=1, keepdims=True) row_sum = np.maximum(row_sum, 1e-10) rloc = np.zeros((N, 2)) rloc[:, 0] = np.sum(pop_obj * np.cos(theta), axis=1) / row_sum.flatten() rloc[:, 1] = np.sum(pop_obj * np.sin(theta), axis=1) / row_sum.flatten() rloc = (rloc + 1) / 2 # Normalize radar locations yl = np.min(rloc, axis=0) yu = np.max(rloc, axis=0) denom = yu - yl denom[denom < 1e-10] = 1.0 nrloc = (rloc - yl) / denom # Grid location gloc = np.floor(nrloc * div).astype(int) gloc = np.clip(gloc, 0, div - 1) # Map grid locations to unique site indices unique_gloc, inverse = np.unique(gloc, axis=0, return_inverse=True) site = inverse return site, rloc # ============================================================================= # Classification Output # ============================================================================= def _get_output(pop_obj, ref_obj): """ Classify solutions: 1 if not dominated by all reference points, 0 otherwise. A solution gets label 1 if for EVERY reference point, the solution is better in at least one objective. Parameters ---------- pop_obj : np.ndarray Population objectives, shape (N, M) ref_obj : np.ndarray Reference objectives, shape (k, M) Returns ------- output : np.ndarray Binary labels, shape (N,) """ N = pop_obj.shape[0] output = np.ones(N, dtype=float) for j in range(ref_obj.shape[0]): # For each ref point, check if solution has at least one objective <= ref output = output * np.any(pop_obj <= ref_obj[j], axis=1).astype(float) return output # ============================================================================= # Data Processing # ============================================================================= def _data_process(inputs, outputs): """ Stratified train/test split (75/25) preserving class balance. Parameters ---------- inputs : np.ndarray Input features, shape (N, D) outputs : np.ndarray Binary labels, shape (N,) Returns ------- train_in, train_out, test_in, test_out : np.ndarray Split data """ idx1 = np.where(outputs > 0.5)[0] idx0 = np.where(outputs <= 0.5)[0] # Select 75% of each class for training n_train1 = int(np.ceil(0.75 * len(idx1))) n_train0 = int(np.ceil(0.75 * len(idx0))) perm1 = np.random.permutation(len(idx1)) perm0 = np.random.permutation(len(idx0)) train_idx1 = idx1[perm1[:n_train1]] train_idx0 = idx0[perm0[:n_train0]] train_idx = np.concatenate([train_idx1, train_idx0]) test_idx = np.setdiff1d(np.arange(len(outputs)), train_idx) train_in = inputs[train_idx] train_out = outputs[train_idx] test_in = inputs[test_idx] test_out = outputs[test_idx] return train_in, train_out, test_in, test_out # ============================================================================= # Surrogate-Assisted Selection # ============================================================================= def _surrogate_assisted_selection(net, p0, p1, ref_decs, pop_decs, gmax, tr): """ Select promising solutions using surrogate predictions with adaptive strategies. Parameters ---------- net : _CSEANet Trained classifier p0 : float Error rate for class 1 p1 : float Error rate for class 0 ref_decs : np.ndarray Reference solution decisions, shape (k, D) pop_decs : np.ndarray Population decisions, shape (N, D) gmax : int Maximum surrogate evaluations tr : float Classification threshold Returns ------- next_decs : np.ndarray or None Selected solutions for expensive evaluation """ a = tr b = 1 - tr # Generate initial candidates via GA operators combined = np.vstack([pop_decs, ref_decs]) next_decs = ga_generation(combined, muc=15, mum=5) label = _predict(net, next_decs) i = 0 n_ref = ref_decs.shape[0] if p0 < 0.4 or (p1 < a and p0 < b): # Strategy 1: Focus on high-prediction solutions (classifier reliable for good class) while i < gmax: sorted_idx = np.argsort(label)[::-1] # descending input_decs = next_decs[sorted_idx[:n_ref]] combined = np.vstack([input_decs, ref_decs]) next_decs = ga_generation(combined, muc=15, mum=5) label = _predict(net, next_decs) i += next_decs.shape[0] # Select solutions predicted as good next_decs = next_decs[label > 0.9] elif p0 > b and p1 < a: # Strategy 2: Classifier unreliable, random selection rand_idx = np.random.randint(0, len(next_decs)) next_decs = next_decs[rand_idx:rand_idx + 1] elif p1 > b: # Strategy 3: Focus on low-prediction solutions (classifier reverses labels) while i < gmax: sorted_idx = np.argsort(label) # ascending input_decs = next_decs[sorted_idx[:n_ref]] combined = np.vstack([input_decs, ref_decs]) next_decs = ga_generation(combined, muc=15, mum=5) label = _predict(net, next_decs) i += next_decs.shape[0] # Select solutions predicted as bad (but likely good due to reversed classifier) next_decs = next_decs[label < 0.1] else: # Fallback: random selection rand_idx = np.random.randint(0, len(next_decs)) next_decs = next_decs[rand_idx:rand_idx + 1] if len(next_decs) == 0: return None return next_decs