Source code for ddmtolab.Algorithms.STMO.EM_SAEA

"""
Ensemble-based Surrogate Model-Assisted Evolutionary Algorithm (EM-SAEA)

This module implements EM-SAEA for computationally expensive constrained/unconstrained
multi/many-objective optimization. It uses a two-stage approach:
- Stage 1 (objective-oriented, FE < 50% budget): RVMM-based search with two sub-populations
- Stage 2 (constraint-oriented, FE >= 50% budget): MOEA/D with ensemble constraint models

References
----------
    [1] Y. Li, X. Feng, and H. Yu. Enhancing landscape approximation with ensemble-based surrogate model for expensive constrained multiobjective optimization. IEEE Transactions on Evolutionary Computation, 2025.

Notes
-----
Author: Jiangtao Shen
Date: 2026.02.17
Version: 1.0
"""
from tqdm import tqdm
import time
import torch
import numpy as np
from scipy.spatial.distance import cdist
from ddmtolab.Methods.Algo_Methods.algo_utils import *
from ddmtolab.Methods.Algo_Methods.bo_utils import mo_gp_build, mo_gp_predict, gp_build, gp_predict
from ddmtolab.Methods.Algo_Methods.uniform_point import uniform_point
import warnings

warnings.filterwarnings("ignore")


[docs] class EM_SAEA: """ Ensemble-based Surrogate Model-Assisted Evolutionary Algorithm for expensive constrained/unconstrained multi/many-objective optimization. Attributes ---------- algorithm_information : dict Dictionary containing algorithm capabilities and requirements """ algorithm_information = { 'n_tasks': '[1, K]', 'dims': 'unequal', 'objs': 'unequal', 'n_objs': '[2, M]', 'cons': 'unequal', 'n_cons': '[0, C]', 'expensive': 'True', 'knowledge_transfer': 'False', 'n_initial': 'unequal', 'max_nfes': 'unequal' } @classmethod def get_algorithm_information(cls, print_info=True): return get_algorithm_information(cls, print_info)
[docs] def __init__(self, problem, n_initial=None, max_nfes=None, n=100, wmax=20, lc_num=5, mu=5, alpha=2.0, kk=0.5, save_data=True, save_path='./Data', name='EM-SAEA', disable_tqdm=True): """ Initialize EM-SAEA algorithm. Parameters ---------- problem : MTOP Multi-task optimization problem instance n_initial : int or List[int], optional Number of initial samples per task (default: 11*dim-1) max_nfes : int or List[int], optional Maximum number of function evaluations per task (default: 200) n : int or List[int], optional Population size (number of reference vectors) per task (default: 100) wmax : int, optional Number of generations before updating surrogate models (default: 20) lc_num : int, optional Number of local constraint model clusters (default: 5) mu : int, optional Number of re-evaluated solutions per iteration in stage 2 (default: 5) alpha : float, optional Parameter controlling APD penalty rate (default: 2.0) kk : float, optional Uncertainty weighting factor for MSE augmentation (default: 0.5) save_data : bool, optional Whether to save optimization data (default: True) save_path : str, optional Path to save results (default: './Data') name : str, optional Name for the experiment (default: 'EM-SAEA') disable_tqdm : bool, optional Whether to disable progress bar (default: True) """ self.problem = problem self.n_initial = n_initial self.max_nfes = max_nfes if max_nfes is not None else 200 self.n = n self.wmax = wmax self.lc_num = lc_num self.mu = mu self.alpha = alpha self.kk = kk self.save_data = save_data self.save_path = save_path self.name = name self.disable_tqdm = disable_tqdm
[docs] def optimize(self): """ Execute the EM-SAEA algorithm. Returns ------- Results Optimization results containing decision variables, objectives, and runtime """ data_type = torch.float start_time = time.time() problem = self.problem nt = problem.n_tasks dims = problem.dims n_objs = problem.n_objs n_cons = problem.n_cons if self.n_initial is None: n_initial_per_task = [11 * dims[i] - 1 for i in range(nt)] else: n_initial_per_task = par_list(self.n_initial, nt) max_nfes_per_task = par_list(self.max_nfes, nt) n_per_task = par_list(self.n, nt) # Reference/weight vectors W_list, V0_list = [], [] for i in range(nt): w_i, actual_n = uniform_point(n_per_task[i], n_objs[i]) W_list.append(w_i) V0_list.append(w_i.copy()) n_per_task[i] = actual_n # Cluster weight vectors for local constraint models ClW_list, lc_num_list = [], [] for i in range(nt): clw_i, actual_lc = uniform_point(self.lc_num, n_objs[i]) ClW_list.append(clw_i) lc_num_list.append(actual_lc) # Initialize by LHS decs = initialization(problem, n_initial_per_task, method='lhs') objs, cons = evaluation(problem, decs) nfes_per_task = n_initial_per_task.copy() # History has_cons = any(c > 0 for c in n_cons) pbar = tqdm(total=sum(max_nfes_per_task), initial=sum(n_initial_per_task), desc=f"{self.name}", disable=self.disable_tqdm) stages = [2] * nt # Initial stage = 2 (constraint-oriented) while sum(nfes_per_task) < sum(max_nfes_per_task): active_tasks = [i for i in range(nt) if nfes_per_task[i] < max_nfes_per_task[i]] if not active_tasks: break for i in active_tasks: M = n_objs[i] dim = dims[i] n_con = n_cons[i] NI = n_initial_per_task[i] N = n_per_task[i] # Deduplicate training data _, unique_idx = np.unique(decs[i], axis=0, return_index=True) unique_idx = np.sort(unique_idx) train_decs = decs[i][unique_idx] train_objs = objs[i][unique_idx] train_cons = cons[i][unique_idx] if n_con > 0 else None # Build objective GP models obj_models = mo_gp_build(train_decs, train_objs, data_type) if stages[i] == 1: new_decs = _stage1_objective( train_decs, train_objs, obj_models, V0_list[i], N, M, self.wmax, self.alpha, self.kk, data_type ) else: new_decs = _stage2_constraint( train_decs, train_objs, train_cons, obj_models, W_list[i], V0_list[i], N, M, n_con, self.wmax, self.mu, self.kk, self.alpha, lc_num_list[i], ClW_list[i], nfes_per_task[i], max_nfes_per_task[i], data_type ) if new_decs is not None and len(new_decs) > 0: new_decs = remove_duplicates(new_decs, decs[i]) if new_decs is not None and len(new_decs) > 0: new_objs, new_cons = evaluation_single(problem, new_decs, i) decs[i] = np.vstack([decs[i], new_decs]) objs[i] = np.vstack([objs[i], new_objs]) if n_con > 0: cons[i] = np.vstack([cons[i], new_cons]) nfes_per_task[i] += new_decs.shape[0] pbar.update(new_decs.shape[0]) # Stage switch threshold = int(np.ceil(NI + 0.5 * (max_nfes_per_task[i] - NI))) stages[i] = 1 if nfes_per_task[i] < threshold else 2 pbar.close() runtime = time.time() - start_time if has_cons: all_decs, all_objs, all_cons = build_staircase_history(decs, objs, k=self.mu, db_cons=cons) else: all_decs, all_objs = build_staircase_history(decs, objs, k=self.mu) all_cons = None kwargs = {} if has_cons: kwargs['all_cons'] = all_cons results = build_save_results( all_decs=all_decs, all_objs=all_objs, runtime=runtime, max_nfes=nfes_per_task, bounds=problem.bounds, save_path=self.save_path, filename=self.name, save_data=self.save_data, **kwargs ) return results
# ============================================================================= # Stage 1: Objective-oriented optimization with RVMM # ============================================================================= def _stage1_objective(train_decs, train_objs, obj_models, V0, N, M, wmax, alpha, kk, data_type): """ Objective-oriented optimization stage using RVMM. Two sub-populations evolved on surrogates: 1. Sub-pop with clustered reference vectors (V1) - convergence 2. Sub-pop with full reference vectors (V) - diversity RVMM selects between convergence and diversity contributions. Returns ------- new_decs : np.ndarray or None Selected decision variables for expensive re-evaluation """ # Get non-dominated archive objectives front_no, _ = nd_sort(train_objs, train_objs.shape[0]) A2Obj = train_objs[front_no == 1] if A2Obj.shape[0] == 0: A2Obj = train_objs.copy() # Scale for reference vector adaptation if A2Obj.shape[0] >= 2: scale = np.maximum(A2Obj.max(axis=0) - A2Obj.min(axis=0), 1e-6) else: scale = np.ones(M) # --- Sub-population 1: V1 (clustered active reference vectors) --- V_1 = V0 * scale A2Obj_temp = A2Obj - A2Obj.min(axis=0) angle_a2 = np.arccos(np.clip(1 - cdist(A2Obj_temp, V_1, 'cosine'), -1, 1)) associate_a2 = np.argmin(angle_a2, axis=1) active = np.unique(associate_a2) Va = V_1[active] # Cluster active vectors into min(5, |Va|) groups n_cluster = min(5, Va.shape[0]) if n_cluster > 1 and Va.shape[0] > n_cluster: labels = kmeans_clustering(V0[active], n_cluster) else: labels = np.arange(Va.shape[0]) % max(n_cluster, 1) # Select one random reference vector per cluster V1_list = [] selected_active_ids = [] for c in range(n_cluster): cluster_idx = np.where(labels == c)[0] if len(cluster_idx) > 0: idx = cluster_idx[np.random.randint(len(cluster_idx))] V1_list.append(Va[idx]) selected_active_ids.append(active[idx]) V1 = np.array(V1_list) if V1_list else V_1[:min(5, V_1.shape[0])] # Pad to 5 if needed if V1.shape[0] < 5: not_selected = np.setdiff1d(np.arange(V_1.shape[0]), np.array(selected_active_ids) if selected_active_ids else []) n_add = min(5 - V1.shape[0], len(not_selected)) if n_add > 0: add_idx = not_selected[np.random.permutation(len(not_selected))[:n_add]] V1 = np.vstack([V1, V_1[add_idx]]) # Evolve sub-population 1 with V1 pop1_decs, pop1_objs, pop1_mse = _surrogate_evolve( train_decs, obj_models, V1, N, M, wmax, alpha, kk, data_type, scale.copy(), train_decs, accumulate=False ) # --- Sub-population 2: V (full reference vectors) --- V = V0 * scale pop2_decs, pop2_objs, pop2_mse = _surrogate_evolve( train_decs, obj_models, V, N, M, wmax, alpha, kk, data_type, scale.copy(), train_decs, accumulate=True, V0=V0 ) # --- RVMM Selection --- return _rvmm_select(pop2_decs, pop2_objs, pop1_decs, pop1_objs, A2Obj, scale, V0) def _surrogate_evolve(init_decs, obj_models, V, N, M, wmax, alpha, kk, data_type, scale, archive_decs, accumulate=False, V0=None): """ Run GA evolution on surrogate models with K-RVEA environmental selection. Parameters ---------- init_decs : np.ndarray Initial population decisions obj_models : list Trained GP models for objectives V : np.ndarray Reference vectors N : int Population size M : int Number of objectives wmax : int Number of generations alpha : float APD penalty parameter kk : float MSE weighting factor data_type : torch.dtype Data type for GP scale : np.ndarray Scale for reference vector adaptation archive_decs : np.ndarray Already-evaluated solutions (for duplicate checking) accumulate : bool If True, accumulate solutions from all generations V0 : np.ndarray or None Original reference vectors (needed if accumulate=True for V init) Returns ------- final_decs, final_objs, final_mse : np.ndarray Evolved population (filtered: unique, not in archive) """ pop_decs = init_decs.copy() all_gen_decs, all_gen_objs, all_gen_mse = [], [], [] for w in range(1, wmax + 1): # Generate offspring if pop_decs.shape[0] < N: pool = np.random.randint(0, pop_decs.shape[0], N) off_decs = ga_generation(pop_decs[pool], muc=20, mum=20) else: off_decs = ga_generation(pop_decs, muc=20, mum=20) # Remove near-duplicates against archive off_decs = remove_duplicates(off_decs, init_decs) if off_decs.shape[0] == 0: if accumulate: all_gen_decs.append(pop_decs.copy()) pred_o, pred_m = mo_gp_predict(obj_models, pop_decs, data_type, mse=True) all_gen_objs.append(pred_o) all_gen_mse.append(pred_m) continue pop_decs = np.vstack([pop_decs, off_decs]) # Predict objectives and MSE pop_objs, pop_mse = mo_gp_predict(obj_models, pop_decs, data_type, mse=True) pop_mse = _process_mse(pop_mse) pop_objs_orig = pop_objs.copy() pop_mse_orig = pop_mse.copy() # Augment objectives with uncertainty pop_objs_aug = pop_objs + kk * pop_mse # Initialize V on first generation for full-vector mode if accumulate and V0 is not None and w == 1: fno, _ = nd_sort(pop_objs_aug, pop_objs_aug.shape[0]) nd_objs = pop_objs_aug[fno == 1] if nd_objs.shape[0] > 0: w_scale = np.maximum(nd_objs.max(axis=0) - nd_objs.min(axis=0), 1e-6) V = V0 * w_scale scale = w_scale # K-RVEA environmental selection cons_zero = np.zeros((pop_decs.shape[0], 1)) index = _k_env_selection(pop_objs_aug, V, (w / wmax) ** alpha) if len(index) == 0: if accumulate: all_gen_decs.append(pop_decs.copy()) all_gen_objs.append(pop_objs_aug) all_gen_mse.append(pop_mse) continue pop_decs = pop_decs[index] pop_objs_aug = pop_objs_aug[index] pop_mse = pop_mse[index] # Fallback: if all selected are already in archive, add ND offspring new_unique = remove_duplicates(pop_decs, archive_decs) if new_unique.shape[0] == 0 and off_decs.shape[0] > 0: n_total = pop_objs_orig.shape[0] off_start = n_total - off_decs.shape[0] off_obj = pop_objs_orig[off_start:] off_mse_v = pop_mse_orig[off_start:] fno_off, _ = nd_sort(off_obj, off_obj.shape[0]) nd_idx = np.where(fno_off == 1)[0] if len(nd_idx) > 0: pop_decs = np.vstack([pop_decs, off_decs[nd_idx]]) pop_objs_aug = np.vstack([pop_objs_aug, off_obj[nd_idx] + kk * off_mse_v[nd_idx]]) pop_mse = np.vstack([pop_mse, off_mse_v[nd_idx]]) # Adapt reference vectors adapt_interval = max(1, int(np.ceil(wmax * 0.1))) if w % adapt_interval == 0 and len(np.unique(pop_objs_aug, axis=0)) > 2: if accumulate and V0 is not None: V = V0 * np.maximum(pop_objs_aug.max(axis=0) - pop_objs_aug.min(axis=0), 1e-6) else: V = V / np.maximum(scale, 1e-6) scale = np.maximum(pop_objs_aug.max(axis=0) - pop_objs_aug.min(axis=0), 1e-6) V = V * scale if accumulate: all_gen_decs.append(pop_decs.copy()) all_gen_objs.append(pop_objs_aug.copy()) all_gen_mse.append(pop_mse.copy()) # Finalize if accumulate and all_gen_decs: pop_decs = np.vstack(all_gen_decs) pop_objs_aug = np.vstack(all_gen_objs) pop_mse = np.vstack(all_gen_mse) # Remove MSE augmentation pop_objs = pop_objs_aug - kk * pop_mse # Deduplicate _, uidx = np.unique(pop_objs, axis=0, return_index=True) uidx = np.sort(uidx) pop_decs = pop_decs[uidx] pop_objs = pop_objs[uidx] pop_mse = pop_mse[uidx] # Remove already-evaluated keep = _not_in_archive(pop_decs, archive_decs) pop_decs = pop_decs[keep] pop_objs = pop_objs[keep] pop_mse = pop_mse[keep] # Keep only non-dominated if pop_decs.shape[0] > 0: fno, _ = nd_sort(pop_objs, pop_objs.shape[0]) nd_mask = fno == 1 pop_decs = pop_decs[nd_mask] pop_objs = pop_objs[nd_mask] pop_mse = pop_mse[nd_mask] return pop_decs, pop_objs, pop_mse # ============================================================================= # Stage 2: Constraint-oriented optimization with ensemble models # ============================================================================= def _stage2_constraint(train_decs, train_objs, train_cons, obj_models, W, V0, N, M, n_con, wmax, mu, kk, alpha, lc_num, ClW, nfes, max_nfes, data_type): """ Constraint-oriented optimization stage with MOEA/D and ensemble constraint models. Returns ------- new_decs : np.ndarray or None Selected decision variables for expensive re-evaluation """ n_train = train_decs.shape[0] # --- CDP environmental selection to get initial population --- if n_train <= N: pop_decs = train_decs.copy() pop_idx = np.arange(n_train) else: if train_cons is not None: front_no, max_fno = nd_sort(train_objs, train_cons, N) else: front_no, max_fno = nd_sort(train_objs, N) mask = front_no < max_fno last_front = np.where(front_no == max_fno)[0] n_needed = N - np.sum(mask) if n_needed > 0 and len(last_front) > 0: cd = crowding_distance(train_objs[last_front]) sorted_idx = np.argsort(-cd) selected_last = last_front[sorted_idx[:n_needed]] mask[selected_last] = True pop_idx = np.where(mask)[0][:N] pop_decs = train_decs[pop_idx] # --- Build constraint models --- con_models_global = [] con_models_local = [] # [cluster][constraint] if n_con > 0 and train_cons is not None: # Cluster training data for local models z_min_obj = train_objs.min(axis=0) norm_objs = train_objs - z_min_obj cluster_size = int(np.ceil(n_train / lc_num)) # Assign clusters: angle-based to ClW cluster_assign = np.full((n_train, lc_num), -1, dtype=int) for c in range(lc_num): angles_c = np.arccos(np.clip(1 - cdist(norm_objs, ClW[c:c + 1], 'cosine'), -1, 1)).flatten() sorted_idx = np.argsort(angles_c) cluster_assign[sorted_idx[:cluster_size], c] = c # Round-robin for remaining norm_objs_rr = norm_objs.copy() remaining = n_train while remaining > 0: for c in range(lc_num): angles_c = np.arccos(np.clip(1 - cdist(norm_objs_rr, ClW[c:c + 1], 'cosine'), -1, 1)).flatten() loc = np.argmin(angles_c) cluster_assign[loc, c] = c norm_objs_rr[loc] = np.inf remaining -= 1 if remaining == 0: break # Final cluster: min of two assignments cluster_final = np.min(cluster_assign, axis=1) # Train local constraint models for c in range(lc_num): c_models = [] c_idx = np.where(cluster_final == c)[0] if len(c_idx) < 2: c_idx = np.arange(n_train) # fallback to global for j in range(n_con): try: model = gp_build(train_decs[c_idx], train_cons[c_idx, j:j + 1], data_type) except Exception: model = gp_build(train_decs, train_cons[:, j:j + 1], data_type) c_models.append(model) con_models_local.append(c_models) # Train global constraint models for j in range(n_con): model = gp_build(train_decs, train_cons[:, j:j + 1], data_type) con_models_global.append(model) # --- Predict initial population's objectives and constraints --- pop_objs = mo_gp_predict(obj_models, pop_decs, data_type, mse=False) pop_N = pop_decs.shape[0] if n_con > 0 and con_models_global: pop_cons = np.zeros((pop_N, n_con)) for j in range(n_con): pred, _ = gp_predict(con_models_global[j], pop_decs, data_type) pop_cons[:, j] = pred.flatten() else: pop_cons = np.zeros((pop_N, 0)) pop_cv = np.sum(np.maximum(0, pop_cons), axis=1) if n_con > 0 else np.zeros(pop_N) pf = np.mean(pop_cv == 0) # fraction of feasible solutions # --- Neighbor structure for MOEA/D --- T = max(2, int(np.ceil(pop_N / 10))) nr = max(1, int(np.ceil(pop_N / 100))) B_dist = cdist(W[:pop_N], W[:pop_N]) B = np.argsort(B_dist, axis=1)[:, :T] # Angle thresholds for VCDP angle_ww = np.arccos(np.clip(1 - cdist(W[:pop_N], W[:pop_N], 'cosine'), -1, 1)) np.fill_diagonal(angle_ww, np.inf) theta_min = np.min(angle_ww, axis=1) theta_angle = theta_min * 0.5 Z = train_objs.min(axis=0) # ideal point # --- MOEA/D evolution --- for _gen in range(wmax): for ii in range(pop_N): # Choose parents if np.random.rand() < 0.9: P = B[ii, np.random.permutation(B.shape[1])] else: P = np.random.permutation(pop_N) # Generate offspring from first 2 parents p1, p2 = pop_decs[P[0]], pop_decs[P[1 % len(P)]] off_dec1, _ = crossover(p1, p2, mu=20) off_dec = mutation(off_dec1, mu=20) off_dec = np.clip(off_dec, 0, 1) # Predict offspring objectives off_dec_2d = off_dec.reshape(1, -1) off_obj = mo_gp_predict(obj_models, off_dec_2d, data_type, mse=False) off_obj = off_obj.flatten() # Update ideal point Z = np.minimum(Z, off_obj) # Predict offspring constraints with ensemble if n_con > 0 and con_models_global: norm_off = off_obj - Z angles_to_clw = np.arccos(np.clip( 1 - cdist(norm_off.reshape(1, -1), ClW, 'cosine'), -1, 1)).flatten() off_cluster = np.argmin(angles_to_clw) off_con_gc = np.zeros(n_con) off_mse_gc = np.zeros(n_con) off_con_lc = np.zeros(n_con) off_mse_lc = np.zeros(n_con) for j in range(n_con): pred_gc, std_gc = gp_predict(con_models_global[j], off_dec_2d, data_type) off_con_gc[j] = pred_gc.flatten()[0] off_mse_gc[j] = (std_gc.flatten()[0]) ** 2 if off_cluster < len(con_models_local): pred_lc, std_lc = gp_predict( con_models_local[off_cluster][j], off_dec_2d, data_type) off_con_lc[j] = pred_lc.flatten()[0] off_mse_lc[j] = (std_lc.flatten()[0]) ** 2 else: off_con_lc[j] = off_con_gc[j] off_mse_lc[j] = off_mse_gc[j] # Choose model with lower MSE off_con = off_con_gc if np.mean(off_mse_gc) < np.mean(off_mse_lc) else off_con_lc off_cv = np.sum(np.maximum(0, off_con)) else: off_con = np.zeros(0) off_cv = 0.0 # PBI values for neighbors P_valid = P[P < pop_N] if len(P_valid) == 0: continue W_p = W[P_valid] obj_p = pop_objs[P_valid] cv_p = pop_cv[P_valid] # PBI decomposition: g = d1 + 5*d2 norm_w = np.sqrt(np.sum(W_p ** 2, axis=1)) diff_p = obj_p - Z norm_p = np.sqrt(np.sum(diff_p ** 2, axis=1)) cos_p = np.sum(diff_p * W_p, axis=1) / (norm_w * np.maximum(norm_p, 1e-10)) cos_p = np.clip(cos_p, -1, 1) g_old = norm_p * cos_p + 5 * norm_p * np.sqrt(np.maximum(1 - cos_p ** 2, 0)) diff_o = off_obj - Z norm_o = np.sqrt(np.sum(diff_o ** 2)) cos_o = np.sum(diff_o * W_p, axis=1) / (norm_w * max(norm_o, 1e-10)) cos_o = np.clip(cos_o, -1, 1) g_new = norm_o * cos_o + 5 * norm_o * np.sqrt(np.maximum(1 - cos_o ** 2, 0)) # VCDP replacement if off_cv + np.sum(cv_p) == 0: # All feasible: replace by PBI replace_mask = g_old >= g_new else: # CDP: replace if (better PBI and same CV) or (lower CV) replace_mask = ((g_old >= g_new) & (cv_p == off_cv)) | (cv_p > off_cv) replace_idx = np.where(replace_mask)[0][:nr] for r in replace_idx: idx = P_valid[r] pop_decs[idx] = off_dec pop_objs[idx] = off_obj if n_con > 0: pop_cons[idx] = off_con pop_cv[idx] = off_cv # --- Select solutions for re-evaluation --- # Try ArchiveUpdate: select feasible non-dominated new_decs = _archive_update(pop_decs, pop_objs, pop_cons if n_con > 0 else None, mu) if new_decs is None or len(new_decs) == 0: # Fallback: KrigingSelect (APD-based with constraints) theta_ks = (nfes / max(max_nfes, 1)) ** 2 new_decs = _kriging_select_con(pop_decs, pop_objs, W[:pop_N], mu, theta_ks, pop_cons if n_con > 0 else None) return new_decs # ============================================================================= # Helper Functions # ============================================================================= def _k_env_selection(pop_obj, V, theta): """ K-RVEA environmental selection using Angle-Penalized Distance (APD). Parameters ---------- pop_obj : np.ndarray Objective values, shape (N, M) V : np.ndarray Reference vectors, shape (NV, M) theta : float APD penalty parameter Returns ------- index : np.ndarray Selected solution indices """ N, M = pop_obj.shape NV = V.shape[0] # Translate pop_obj = pop_obj - pop_obj.min(axis=0) # Gamma: smallest angle between each reference vector and others cosine = 1 - cdist(V, V, 'cosine') np.fill_diagonal(cosine, 0) gamma = np.min(np.arccos(np.clip(cosine, -1, 1)), axis=1) gamma = np.maximum(gamma, 1e-6) # Associate each solution to nearest reference vector angle = np.arccos(np.clip(1 - cdist(pop_obj, V, 'cosine'), -1, 1)) associate = np.argmin(angle, axis=1) # Select one solution per reference vector by minimum APD Next = np.full(NV, -1, dtype=int) for i in np.unique(associate): current = np.where(associate == i)[0] APD = (1 + M * theta * angle[current, i] / gamma[i]) * \ np.sqrt(np.sum(pop_obj[current] ** 2, axis=1)) best = np.argmin(APD) Next[i] = current[best] return Next[Next != -1] def _process_mse(mse): """Transform MSE: sqrt for values <= 1, keep raw for values > 1.""" mse = np.maximum(mse, 0) s = np.sqrt(mse) return np.where(mse <= 1, s, mse) def _not_in_archive(decs, archive, tol=1e-6): """Return boolean mask of solutions NOT in archive.""" if archive.shape[0] == 0: return np.ones(decs.shape[0], dtype=bool) dists = cdist(decs, archive) min_dists = np.min(dists, axis=1) return min_dists > tol def _archive_update(pop_dec, pop_obj, pop_con, n): """ Select feasible non-dominated solutions with niche-based diversity. Parameters ---------- pop_dec : np.ndarray Decision variables pop_obj : np.ndarray Objective values pop_con : np.ndarray or None Constraint values n : int Number of solutions to select Returns ------- selected_decs : np.ndarray or None Selected decision variables """ # Select feasible solutions if pop_con is not None and pop_con.shape[1] > 0: feasible = np.all(pop_con <= 0, axis=1) else: feasible = np.ones(pop_dec.shape[0], dtype=bool) f_dec = pop_dec[feasible] f_obj = pop_obj[feasible] if f_dec.shape[0] == 0: return None # Select non-dominated front_no, _ = nd_sort(f_obj, f_obj.shape[0]) nd_mask = front_no == 1 nd_dec = f_dec[nd_mask] nd_obj = f_obj[nd_mask] if nd_dec.shape[0] == 0: return None # Random permutation perm = np.random.permutation(nd_dec.shape[0]) nd_dec = nd_dec[perm] nd_obj = nd_obj[perm] if nd_dec.shape[0] <= n: return nd_dec # Niche-based truncation fmax = nd_obj.max(axis=0) fmin = nd_obj.min(axis=0) rng = fmax - fmin rng[rng == 0] = 1.0 norm_obj = (nd_obj - fmin) / rng d = cdist(norm_obj, norm_obj) np.fill_diagonal(d, np.inf) sd = np.sort(d, axis=1) r = np.median(sd[:, min(norm_obj.shape[1] - 1, sd.shape[1] - 1)]) r = max(r, 1e-10) R = np.minimum(d / r, 1.0) # Delete worst one by one remaining = list(range(nd_dec.shape[0])) while len(remaining) > n: niche_vals = 1 - np.prod(R[np.ix_(remaining, remaining)], axis=1) worst = np.argmax(niche_vals) remaining.pop(worst) return nd_dec[remaining] def _kriging_select_con(pop_dec, pop_obj, V, mu, theta, pop_con=None): """ Constraint-aware Kriging selection using APD. Parameters ---------- pop_dec : np.ndarray Population decisions pop_obj : np.ndarray Predicted objectives V : np.ndarray Weight/reference vectors mu : int Number of solutions to select theta : float APD penalty parameter pop_con : np.ndarray or None Predicted constraints Returns ------- selected_decs : np.ndarray Selected decision variables """ N = pop_obj.shape[0] M = pop_obj.shape[1] CV = np.sum(np.maximum(0, pop_con), axis=1) if pop_con is not None and pop_con.shape[1] > 0 else np.zeros(N) # Active reference vectors pop_obj_t = pop_obj - pop_obj.min(axis=0) angle_all = np.arccos(np.clip(1 - cdist(pop_obj_t, V, 'cosine'), -1, 1)) associate = np.argmin(angle_all, axis=1) active = np.unique(associate) Va = V[active] n_cluster = min(mu, len(active)) if n_cluster == 0: idx = np.random.choice(N, size=min(mu, N), replace=False) return pop_dec[idx] if n_cluster < len(active): labels = kmeans_clustering(Va, n_cluster) else: labels = np.arange(len(active)) # Gamma and APD if Va.shape[0] > 1: cosine = 1 - cdist(Va, Va, 'cosine') np.fill_diagonal(cosine, 0) gamma = np.min(np.arccos(np.clip(cosine, -1, 1)), axis=1) gamma = np.maximum(gamma, 1e-6) else: gamma = np.array([1.0]) angle_va = np.arccos(np.clip(1 - cdist(pop_obj_t, Va, 'cosine'), -1, 1)) assoc_va = np.argmin(angle_va, axis=1) APD = np.ones(N) for j in np.unique(assoc_va): cur = np.where(assoc_va == j)[0] if len(cur) > 0: APD[cur] = (1 + M * theta * angle_va[cur, j] / gamma[j]) * \ np.sqrt(np.sum(pop_obj_t[cur] ** 2, axis=1)) cindex = labels[assoc_va] selected = [] for c in np.unique(cindex): cur = np.where(cindex == c)[0] if len(cur) == 0: continue # Prefer feasible solutions feas = cur[CV[cur] == 0] if len(feas) > 0: t = np.unique(assoc_va[feas]) best_per_vec = [] for j in t: cur_j = np.where((assoc_va == j) & (CV == 0))[0] if len(cur_j) > 0: best_per_vec.append(cur_j[np.argmin(APD[cur_j])]) if best_per_vec: best_arr = np.array(best_per_vec) selected.append(best_arr[np.argmin(APD[best_arr])]) else: # Select by minimum CV selected.append(cur[np.argmin(CV[cur])]) if len(selected) == 0: idx = np.random.choice(N, size=min(mu, N), replace=False) return pop_dec[idx] return pop_dec[selected] def _rvmm_select(pop2_dec, pop2_obj, pop1_dec, pop1_obj, A2Obj, scale, V0): """ RVMM-based infill selection combining convergence and diversity. Parameters ---------- pop2_dec, pop2_obj : np.ndarray Sub-population 2 (full vectors) - for diversity pop1_dec, pop1_obj : np.ndarray Sub-population 1 (clustered vectors) - for convergence A2Obj : np.ndarray Archive (real-evaluated) non-dominated objectives scale : np.ndarray Objective scale V0 : np.ndarray Original reference vectors Returns ------- new_decs : np.ndarray or None Selected solution(s) for evaluation """ if pop1_dec.shape[0] == 0 and pop2_dec.shape[0] == 0: return None # --- Convergence metric (cd) from sub-pop 1 --- cbest_dec = None if pop1_obj.shape[0] > 0: zmin = np.minimum(A2Obj.min(axis=0), pop1_obj.min(axis=0)) cd = np.zeros(pop1_obj.shape[0]) for idx in range(pop1_obj.shape[0]): sol = pop1_obj[idx] dist_to_archive = cdist((sol - zmin).reshape(1, -1), A2Obj - zmin, 'euclidean').flatten() nearest_id = np.argmin(dist_to_archive) # Check if solution dominates nearest archive member dominates = np.any(sol < A2Obj[nearest_id]) and not np.any(sol > A2Obj[nearest_id]) cd[idx] = dist_to_archive[nearest_id] if dominates else 0.0 if np.max(cd) > 0: cbest = np.argmax(cd) cbest_dec = pop1_dec[cbest:cbest + 1] cbest_obj = pop1_obj[cbest] # --- Diversity metric (dd) from sub-pop 2 --- dbest_dec = None if pop2_obj.shape[0] > 0: zmin_all = A2Obj.min(axis=0) if pop1_obj.shape[0] > 0: zmin_all = np.minimum(zmin_all, pop1_obj.min(axis=0)) if pop2_obj.shape[0] > 0: zmin_all = np.minimum(zmin_all, pop2_obj.min(axis=0)) scale_safe = np.maximum(scale, 1e-6) pop2_norm = np.maximum(pop2_obj - zmin_all, 0) / scale_safe A2_norm1 = np.maximum(A2Obj - zmin_all, 0) / scale_safe A2_norm2 = np.maximum(A2Obj - A2Obj.min(axis=0), 0) / scale_safe # Choose reference based on IGD igd1 = _rvmm_igd(pop2_norm, A2_norm1) igd2 = _rvmm_igd(pop2_norm, A2_norm2) A2_ref = A2_norm1 if igd1 < igd2 else A2_norm2 # Compute angle-based diversity if A2_ref.shape[0] > 0 and pop2_norm.shape[0] > 0: angle = np.arccos(np.clip(1 - cdist(pop2_norm, A2_ref, 'cosine'), -1, 1)) min_angle = np.min(angle, axis=1) dd = min_angle / max(np.max(min_angle), 1e-10) dbest = np.argmax(dd) dbest_dec = pop2_dec[dbest:dbest + 1] # --- Final selection: convergence or diversity --- if cbest_dec is not None: # Check if convergence solution is on Pareto front 1 w.r.t. archive combined = np.vstack([cbest_obj.reshape(1, -1), A2Obj]) front_no, _ = nd_sort(combined, combined.shape[0]) if front_no[0] == 1: return cbest_dec if dbest_dec is not None: return dbest_dec if cbest_dec is not None: return cbest_dec return None def _rvmm_igd(pop_obj, optimum): """Compute Inverted Generational Distance.""" if pop_obj.shape[1] != optimum.shape[1] or pop_obj.shape[0] == 0 or optimum.shape[0] == 0: return np.inf return np.mean(np.min(cdist(optimum, pop_obj), axis=1))