Source code for ddmtolab.Algorithms.STMO.MMRAEA

"""
Multi-Model-based Ranking Aggregation Evolutionary Algorithm (MMRAEA)

This module implements MMRAEA for computationally expensive multi/many-objective optimization.
It uses three RBF surrogate models (objective approximation, dominance prediction, and fitness
prediction) combined with a ranking aggregation infill strategy and dual evolutionary
optimization (CSO + GA) for balanced convergence and diversity.

References
----------
    [1] J. Shen, X. Wang, R. He, Y. Tian, W. Wang, P. Wang, and Z. Wen. Optimization of high-dimensional expensive multi-objective problems using multi-mode radial basis functions. Complex & Intelligent Systems, 2025, 11(2): 147.

Notes
-----
Author: Jiangtao Shen
Date: 2026.02.18
Version: 1.0
"""
from tqdm import tqdm
import time
import numpy as np
from scipy.spatial.distance import cdist
from ddmtolab.Methods.Algo_Methods.algo_utils import *
import warnings

warnings.filterwarnings("ignore")


[docs] class MMRAEA: """ Multi-Model-based Ranking Aggregation Evolutionary Algorithm for expensive multi/many-objective optimization. Uses three RBF surrogate models (objective, dominance, fitness) with ranking aggregation infill strategy and dual evolutionary optimization (CSO + GA). Attributes ---------- algorithm_information : dict Dictionary containing algorithm capabilities and requirements """ algorithm_information = { 'n_tasks': '[1, K]', 'dims': 'unequal', 'objs': 'unequal', 'n_objs': '[2, M]', 'cons': 'equal', 'n_cons': '0', 'expensive': 'True', 'knowledge_transfer': 'False', 'n_initial': 'unequal', 'max_nfes': 'unequal' } @classmethod def get_algorithm_information(cls, print_info=True): return get_algorithm_information(cls, print_info)
[docs] def __init__(self, problem, n_initial=None, max_nfes=None, n=100, wmax=20, save_data=True, save_path='./Data', name='MMRAEA', disable_tqdm=True): """ Initialize MMRAEA algorithm. Parameters ---------- problem : MTOP Multi-task optimization problem instance n_initial : int or List[int], optional Number of initial samples per task (default: 11*dim-1) max_nfes : int or List[int], optional Maximum number of function evaluations per task (default: 200) n : int or List[int], optional Population size per task (default: 100) wmax : int, optional Number of inner surrogate evolution generations (default: 20) save_data : bool, optional Whether to save optimization data (default: True) save_path : str, optional Path to save results (default: './Data') name : str, optional Name for the experiment (default: 'MMRAEA') disable_tqdm : bool, optional Whether to disable progress bar (default: True) """ self.problem = problem self.n_initial = n_initial self.max_nfes = max_nfes if max_nfes is not None else 200 self.n = n self.wmax = wmax self.save_data = save_data self.save_path = save_path self.name = name self.disable_tqdm = disable_tqdm
[docs] def optimize(self): """ Execute the MMRAEA algorithm. Returns ------- Results Optimization results containing decision variables, objectives, and runtime """ start_time = time.time() problem = self.problem nt = problem.n_tasks dims = problem.dims n_objs = problem.n_objs # Set default initial samples: 11*dim - 1 if self.n_initial is None: n_initial_per_task = [11 * dims[i] - 1 for i in range(nt)] else: n_initial_per_task = par_list(self.n_initial, nt) max_nfes_per_task = par_list(self.max_nfes, nt) n_per_task = par_list(self.n, nt) # Initialize with LHS decs = initialization(problem, n_initial_per_task, method='lhs') objs, _ = evaluation(problem, decs) nfes_per_task = n_initial_per_task.copy() # A1: archive of all evaluated solutions arc_decs = [d.copy() for d in decs] arc_objs = [o.copy() for o in objs] pbar = tqdm(total=sum(max_nfes_per_task), initial=sum(n_initial_per_task), desc=f"{self.name}", disable=self.disable_tqdm) while sum(nfes_per_task) < sum(max_nfes_per_task): active_tasks = [i for i in range(nt) if nfes_per_task[i] < max_nfes_per_task[i]] if not active_tasks: break for i in active_tasks: M = n_objs[i] dim = dims[i] N = n_per_task[i] A1Dec = arc_decs[i].copy() A1Obj = arc_objs[i].copy() # === Build dominance prediction model === front_no_a1, _ = nd_sort(A1Obj, len(A1Dec)) DS, DY = dsmerge(A1Dec, front_no_a1.astype(float)) Dmodel = rbf_build(DS, DY) # === Build RBF approximation models for each objective === RModels = [] mS_per_obj = [] for j in range(M): mS_j, mY_j = dsmerge(A1Dec, A1Obj[:, j]) rmodel = rbf_build(mS_j, mY_j) RModels.append(rmodel) mS_per_obj.append(mS_j) # === Build fitness prediction model === fitness_vals = _cal_fitness(A1Obj) FS, FY = dsmerge(A1Dec, fitness_vals) Fmodel = rbf_build(FS, FY) # === Evolutionary Optimization (dual: CSO + GA) === PopDec, PopObj = _ea_optimization( A1Dec, A1Obj, self.wmax, N, M, RModels, Fmodel, mS_per_obj, FS ) # === Infill Strategy === PopNew = _infill_strategy(PopDec, PopObj, Dmodel, DS, Fmodel, FS, A1Obj) # Remove duplicates PopNew = remove_duplicates(PopNew, decs[i]) if PopNew.shape[0] > 0: # Limit to remaining budget remaining = max_nfes_per_task[i] - nfes_per_task[i] if PopNew.shape[0] > remaining: PopNew = PopNew[:remaining] # Re-evaluate with expensive function new_objs, _ = evaluation_single(problem, PopNew, i) # Update archive (merge and deduplicate) arc_decs[i], arc_objs[i] = merge_archive( arc_decs[i], arc_objs[i], PopNew, new_objs ) # Update cumulative dataset decs[i] = np.vstack([decs[i], PopNew]) objs[i] = np.vstack([objs[i], new_objs]) nfes_per_task[i] += PopNew.shape[0] pbar.update(PopNew.shape[0]) pbar.close() runtime = time.time() - start_time # Convert database to staircase history structure for results db_decs = [decs[i].copy() for i in range(nt)] db_objs = [objs[i].copy() for i in range(nt)] all_decs, all_objs = build_staircase_history(db_decs, db_objs, k=1) results = build_save_results(all_decs=all_decs, all_objs=all_objs, runtime=runtime, max_nfes=nfes_per_task, bounds=problem.bounds, save_path=self.save_path, filename=self.name, save_data=self.save_data) return results
# ============================================================================= # Shift-Based Density Fitness (calFitness) # ============================================================================= def _cal_fitness(pop_obj): """ Calculate shift-based density fitness for each solution. For each solution i, compute the shifted objective of every other solution j as max(obj_j, obj_i) element-wise. The fitness is the minimum distance from solution i to any shifted solution j. Parameters ---------- pop_obj : np.ndarray Objective values, shape (N, M) Returns ------- fitness : np.ndarray Fitness values, shape (N,). Higher is better (more isolated). """ N = pop_obj.shape[0] if N <= 1: return np.ones(N) # Normalize objectives to [0, 1] fmax = pop_obj.max(axis=0) fmin = pop_obj.min(axis=0) obj_range = fmax - fmin obj_range[obj_range == 0] = 1.0 norm_obj = (pop_obj - fmin) / obj_range # Compute shift-based distance matrix dis = np.full((N, N), np.inf) for i in range(N): # Shifted objectives: max(norm_obj[j], norm_obj[i]) for all j shifted = np.maximum(norm_obj, norm_obj[i]) for j in range(N): if j != i: dis[i, j] = np.linalg.norm(norm_obj[i] - shifted[j]) fitness = np.min(dis, axis=1) return fitness # ============================================================================= # Competitive Swarm Optimizer (CSO) # ============================================================================= def _cso(pop_decs, fitness, pop_vel): """ Competitive Swarm Optimization operator. Pairs solutions randomly, losers learn from winners, and polynomial mutation is applied. Parameters ---------- pop_decs : np.ndarray Population decisions, shape (N, D), values in [0, 1] fitness : np.ndarray Fitness values, shape (N,). Higher is better. pop_vel : np.ndarray Velocity matrix, shape (N, D) Returns ------- off_decs : np.ndarray Offspring decisions, shape (2*floor(N/2), D) off_vel : np.ndarray Offspring velocities, shape (2*floor(N/2), D) """ N, D = pop_decs.shape if N < 2: return pop_decs.copy(), pop_vel.copy() # Random pairing half = N // 2 rank = np.random.permutation(N)[:half * 2] idx1 = rank[:half] idx2 = rank[half:] # Determine winners and losers (higher fitness wins) swap = fitness[idx1] >= fitness[idx2] loser = idx1.copy() winner = idx2.copy() loser[swap] = idx2[swap] winner[swap] = idx1[swap] loser_dec = pop_decs[loser] winner_dec = pop_decs[winner] loser_vel = pop_vel[loser] winner_vel = pop_vel[winner] # Update velocity and position r1 = np.random.rand(half, 1) * np.ones((1, D)) r2 = np.random.rand(half, 1) * np.ones((1, D)) off_vel = r1 * loser_vel + r2 * (winner_dec - loser_dec) off_dec = loser_dec + off_vel + r1 * (off_vel - loser_vel) # Combine with winners off_dec = np.vstack([off_dec, winner_dec]) off_vel = np.vstack([off_vel, winner_vel]) # Polynomial mutation (in [0,1] space) n_off = off_dec.shape[0] dis_m = 20 site = np.random.rand(n_off, D) < 1.0 / D mu = np.random.rand(n_off, D) off_dec = np.clip(off_dec, 0, 1) temp1 = site & (mu <= 0.5) off_dec[temp1] = off_dec[temp1] + (1.0 - 0.0) * ( (2.0 * mu[temp1] + (1 - 2.0 * mu[temp1]) * (1 - (off_dec[temp1] - 0.0) / (1.0 - 0.0)) ** (dis_m + 1)) ** (1.0 / (dis_m + 1)) - 1 ) temp2 = site & (mu > 0.5) off_dec[temp2] = off_dec[temp2] + (1.0 - 0.0) * ( 1 - (2.0 * (1 - mu[temp2]) + 2.0 * (mu[temp2] - 0.5) * (1 - (1.0 - off_dec[temp2]) / (1.0 - 0.0)) ** (dis_m + 1)) ** (1.0 / (dis_m + 1)) ) off_dec = np.clip(off_dec, 0, 1) return off_dec, off_vel # ============================================================================= # PDR Environmental Selection (NSGA-II style: ND-sort + crowding distance) # ============================================================================= def _es_pdr(pop_obj, N): """ Environmental selection using non-dominated sorting and crowding distance. Parameters ---------- pop_obj : np.ndarray Objective values, shape (n, M) N : int Number to select Returns ------- index : np.ndarray Selected indices """ n = pop_obj.shape[0] if n <= N: return np.arange(n) front_no, max_fno = nd_sort(pop_obj, N) Next = front_no < max_fno remaining = N - np.sum(Next) if remaining > 0: last_front = np.where(front_no == max_fno)[0] cd = crowding_distance(pop_obj, front_no) sorted_last = last_front[np.argsort(-cd[last_front])] Next[sorted_last[:remaining]] = True return np.where(Next)[0] # ============================================================================= # EA Optimization (Dual: CSO + GA) # ============================================================================= def _ea_optimization(A1Dec, A1Obj, wmax, N, M, RModels, Fmodel, mS_per_obj, FS): """ Dual inner evolutionary optimization on surrogates. Branch 1: CSO (Competitive Swarm Optimizer) guided by fitness model. Branch 2: GA (Genetic Algorithm) with standard operators. Both branches use RBF objective models for prediction and PDR for selection. Parameters ---------- A1Dec : np.ndarray Archive decisions, shape (n, D) A1Obj : np.ndarray Archive objectives, shape (n, M) wmax : int Number of inner generations N : int Population size M : int Number of objectives RModels : list List of M RBF models for objective prediction Fmodel : dict RBF model for fitness prediction mS_per_obj : list Training data for each RBF objective model, list of M arrays FS : np.ndarray Training data for fitness model Returns ------- PopDec : np.ndarray Combined population decisions PopObj : np.ndarray Combined predicted objectives """ n_archive = A1Dec.shape[0] D = A1Dec.shape[1] # --- Branch 1: CSO --- pop_dec1 = A1Dec.copy() pop_vel = np.zeros((n_archive, D)) for w in range(wmax): # Predict fitness for CSO fit = rbf_predict(Fmodel, FS, pop_dec1) # CSO operator off_dec1, off_vel = _cso(pop_dec1, fit, pop_vel) pop_vel = np.vstack([pop_vel, off_vel]) pop_dec1 = np.vstack([pop_dec1, off_dec1]) # Predict objectives pop_obj1 = _rbf_predict_multi(RModels, mS_per_obj, pop_dec1, M) # PDR selection idx1 = _es_pdr(pop_obj1, N) pop_dec1 = pop_dec1[idx1] pop_vel = pop_vel[idx1] pop_obj1 = pop_obj1[idx1] # --- Branch 2: GA --- pop_dec2 = A1Dec.copy() for w in range(wmax): off_dec2 = ga_generation(pop_dec2, muc=20.0, mum=20.0) pop_dec2 = np.vstack([pop_dec2, off_dec2]) pop_obj2 = _rbf_predict_multi(RModels, mS_per_obj, pop_dec2, M) idx2 = _es_pdr(pop_obj2, N) pop_dec2 = pop_dec2[idx2] pop_obj2 = pop_obj2[idx2] # Combine both branches PopDec = np.vstack([pop_dec1, pop_dec2]) PopObj = np.vstack([pop_obj1, pop_obj2]) return PopDec, PopObj # ============================================================================= # Infill Strategy # ============================================================================= def _infill_strategy(PopDec, PopObj, Dmodel, DS, Fmodel, FS, A1Obj): """ Ranking aggregation infill strategy for selecting new evaluation points. 1. Combine predicted population with archive, keep non-dominated from predicted set. 2. Compute 3 criteria: shift-based density fitness, dominance prediction, fitness prediction. 3. ND-sort on [-Fit1, Fit2, -Fit3] to extract front-1 candidates. 4. Rank each criterion, compute Quality (sum of ranks) and Uncertainty (rank differences). 5. Select front-1 of [Quality, Uncertainty] plus the most uncertain solution. Parameters ---------- PopDec : np.ndarray Predicted population decisions, shape (NP, D) PopObj : np.ndarray Predicted population objectives, shape (NP, M) Dmodel : dict RBF model for dominance prediction DS : np.ndarray Training data for dominance model Fmodel : dict RBF model for fitness prediction FS : np.ndarray Training data for fitness model A1Obj : np.ndarray Archive objectives, shape (NA, M) Returns ------- PopNew : np.ndarray Selected decision variables for re-evaluation """ N_pop = PopDec.shape[0] # Combine predicted population with archive objectives for ND-sorting CPopObj = np.vstack([PopObj, A1Obj]) FN1, _ = nd_sort(CPopObj, CPopObj.shape[0]) # Find front-1 solutions that are from the predicted population (not archive) f1_mask = FN1 == 1 f1_from_pop = np.where(f1_mask[:N_pop])[0] if len(f1_from_pop) == 0: # Fallback: return the best predicted solution return PopDec[:1] PopDec = PopDec[f1_from_pop] PopObj = PopObj[f1_from_pop] if len(f1_from_pop) <= 1: return PopDec N = PopDec.shape[0] # Compute 3 fitness criteria Fit1 = _cal_fitness(PopObj) # Shift-based density (higher = better) Fit2 = rbf_predict(Dmodel, DS, PopDec) # Dominance prediction (lower = better) Fit3 = rbf_predict(Fmodel, FS, PopDec) # Fitness prediction (higher = better) # ND-sort on [-Fit1, Fit2, -Fit3] (all minimized) tri_obj = np.column_stack([-Fit1, Fit2, -Fit3]) FN, _ = nd_sort(tri_obj, N) # Keep only front-1 f1_mask2 = FN == 1 PopDec = PopDec[f1_mask2] PopObj = PopObj[f1_mask2] Fit1 = Fit1[f1_mask2] Fit2 = Fit2[f1_mask2] Fit3 = Fit3[f1_mask2] if PopDec.shape[0] <= 1: return PopDec N = PopDec.shape[0] # Rank-based scoring (matching MATLAB implementation) # Fit1: descending (higher is better) -> rank 1 for highest s1 = np.argsort(-Fit1) # Fit2: ascending (lower is better) -> rank 1 for lowest s2 = np.argsort(Fit2) # Fit3: descending (higher is better) -> rank 1 for highest s3 = np.argsort(-Fit3) Q1 = np.empty(N) Q2 = np.empty(N) Q3 = np.empty(N) for rank, idx in enumerate(s1): Q1[idx] = rank + 1 for rank, idx in enumerate(s2): Q2[idx] = rank + 1 for rank, idx in enumerate(s3): Q3[idx] = rank + 1 Quality = Q1 + Q2 + Q3 Uncertainty = np.abs(Q1 - Q2) + np.abs(Q1 - Q3) + np.abs(Q2 - Q3) # ND-sort on [Quality, Uncertainty] qu_obj = np.column_stack([Quality, Uncertainty]) FN_qu, _ = nd_sort(qu_obj, N) f1_indices = np.where(FN_qu == 1)[0] # Also add the most uncertain solution max_unc_idx = np.argmax(Uncertainty) selected = np.unique(np.concatenate([f1_indices, [max_unc_idx]])) return PopDec[selected] def _rbf_predict_multi(RModels, mS_per_obj, X_query, M): """ Predict multiple objectives using RBF models. Parameters ---------- RModels : list List of M RBF models mS_per_obj : list Training data for each RBF model, list of M arrays X_query : np.ndarray Query points, shape (nq, d) M : int Number of objectives Returns ------- pred_objs : np.ndarray Predicted objectives, shape (nq, M) """ nq = X_query.shape[0] pred_objs = np.zeros((nq, M)) for j in range(M): pred_objs[:, j] = rbf_predict(RModels[j], mS_per_obj[j], X_query) return pred_objs