Source code for ddmtolab.Algorithms.STMO.MOEA_D_EGO

"""
MOEA/D with Efficient Global Optimization (MOEA/D-EGO)

This module implements MOEA/D-EGO for computationally expensive multi-objective optimization.
It uses clustering-based Gaussian Process models, MOEA/D-DE to maximize Expected Tchebycheff
Improvement (ETI) with Moment Matching Approximation, and K-means batch selection.

References
----------
    [1] Q. Zhang, W. Liu, E. Tsang, and B. Virginas. Expensive multiobjective optimization by MOEA/D with Gaussian process model. IEEE Transactions on Evolutionary Computation, 2010, 14(3): 456-474.

Notes
-----
Author: Jiangtao Shen
Date: 2026.02.17
Version: 1.0
"""
from tqdm import tqdm
import time
import numpy as np
from scipy.stats import norm
from scipy.spatial.distance import cdist
from sklearn.cluster import KMeans
from ddmtolab.Methods.Algo_Methods.algo_utils import *
from ddmtolab.Methods.Algo_Methods.bo_utils import gp_build, gp_predict
from ddmtolab.Methods.Algo_Methods.uniform_point import uniform_point
import warnings

warnings.filterwarnings("ignore")


[docs] class MOEA_D_EGO: """ MOEA/D with Efficient Global Optimization for expensive multi-objective optimization. """ algorithm_information = { 'n_tasks': '[1, K]', 'dims': 'unequal', 'objs': 'unequal', 'n_objs': '[2, M]', 'cons': 'equal', 'n_cons': '0', 'expensive': 'True', 'knowledge_transfer': 'False', 'n_initial': 'unequal', 'max_nfes': 'unequal' } @classmethod def get_algorithm_information(cls, print_info=True): return get_algorithm_information(cls, print_info)
[docs] def __init__(self, problem, n_initial=None, max_nfes=None, batch_size=5, save_data=True, save_path='./Data', name='MOEA-D-EGO', disable_tqdm=True): """ Initialize MOEA/D-EGO algorithm. Parameters ---------- problem : MTOP Multi-task optimization problem instance n_initial : int or List[int], optional Number of initial samples per task (default: 11*dim-1) max_nfes : int or List[int], optional Maximum number of function evaluations per task (default: 200) batch_size : int, optional Number of true evaluations per iteration (default: 5) save_data : bool, optional Whether to save optimization data (default: True) save_path : str, optional Path to save results (default: './Data') name : str, optional Name for the experiment (default: 'MOEA-D-EGO') disable_tqdm : bool, optional Whether to disable progress bar (default: True) """ self.problem = problem self.n_initial = n_initial self.max_nfes = max_nfes if max_nfes is not None else 200 self.batch_size = batch_size self.save_data = save_data self.save_path = save_path self.name = name self.disable_tqdm = disable_tqdm
[docs] def optimize(self): """ Execute the MOEA/D-EGO algorithm. Returns ------- Results Optimization results containing decision variables, objectives, and runtime """ start_time = time.time() problem = self.problem nt = problem.n_tasks dims = problem.dims n_objs = problem.n_objs n_initial_per_task = par_list( self.n_initial if self.n_initial is not None else [11 * d - 1 for d in dims], nt) max_nfes_per_task = par_list(self.max_nfes, nt) # Generate initial samples using Latin Hypercube Sampling decs = initialization(problem, n_initial_per_task, method='lhs') objs, _ = evaluation(problem, decs) nfes_per_task = n_initial_per_task.copy() pbar = tqdm(total=sum(max_nfes_per_task), initial=sum(n_initial_per_task), desc=f"{self.name}", disable=self.disable_tqdm) while sum(nfes_per_task) < sum(max_nfes_per_task): active_tasks = [i for i in range(nt) if nfes_per_task[i] < max_nfes_per_task[i]] if not active_tasks: break for i in active_tasks: M = n_objs[i] D = dims[i] remaining = max_nfes_per_task[i] - nfes_per_task[i] actual_batch = min(self.batch_size, remaining) # Build clustering-based GP models try: models, centers = _build_gp_models_fcm(decs[i], objs[i]) except Exception: new_dec = np.random.rand(actual_batch, D) new_obj, _ = evaluation_single(problem, new_dec, i) decs[i] = np.vstack([decs[i], new_dec]) objs[i] = np.vstack([objs[i], new_obj]) nfes_per_task[i] += actual_batch pbar.update(actual_batch) continue # Optimize ETI and select batch of candidate points new_x = _opt_eti_fcm(M, D, actual_batch, decs[i], objs[i], models, centers) # Remove duplicates new_x = remove_duplicates(new_x, decs[i]) if new_x.shape[0] == 0: new_x = np.clip(np.random.rand(1, D), 0, 1) if new_x.shape[0] > actual_batch: new_x = new_x[:actual_batch] # Expensive evaluation new_obj, _ = evaluation_single(problem, new_x, i) decs[i] = np.vstack([decs[i], new_x]) objs[i] = np.vstack([objs[i], new_obj]) nfes_per_task[i] += new_x.shape[0] pbar.update(new_x.shape[0]) pbar.close() runtime = time.time() - start_time all_decs, all_objs = build_staircase_history(decs, objs, k=self.batch_size) results = build_save_results(all_decs=all_decs, all_objs=all_objs, runtime=runtime, max_nfes=nfes_per_task, bounds=problem.bounds, save_path=self.save_path, filename=self.name, save_data=self.save_data) return results
# ============================================================================= # GP Model Building with Clustering # ============================================================================= _L1 = 80 # Max cluster size for GP _L2 = 20 # Step size for clustering def _build_gp_models_fcm(train_x, train_y, L1=_L1, L2=_L2): """Build GP models with K-Means clustering (approximation of Fuzzy C-Means). When N <= L1, build a single set of M GP models on all data. When N > L1, cluster data and build separate GP models per cluster. Returns ------- models : list of list models[cluster_i][obj_j] is a GP model centers : np.ndarray Cluster centers, shape (csize, D) """ K, M = train_y.shape D = train_x.shape[1] if K <= L1: cluster_models = [] for j in range(M): model = gp_build(train_x, train_y[:, j:j + 1]) cluster_models.append(model) centers = train_x.mean(axis=0, keepdims=True) return [cluster_models], centers else: csize = 1 + int(np.ceil((K - L1) / L2)) kmeans = KMeans(n_clusters=csize, n_init=10, random_state=0) kmeans.fit(train_x) centers = kmeans.cluster_centers_ # For each cluster, use L1 nearest points dis = cdist(train_x, centers) sorted_idx = np.argsort(dis, axis=0) # (K, csize) models = [] for ci in range(csize): temp_idx = sorted_idx[:L1, ci] cluster_models = [] for j in range(M): model = gp_build(train_x[temp_idx], train_y[temp_idx, j:j + 1]) cluster_models.append(model) models.append(cluster_models) return models, centers def _gp_evaluate_fcm(X, models, centers): """Predict using cluster-based GP models. Returns ------- u : np.ndarray, shape (N, M) - predicted means s : np.ndarray, shape (N, M) - predicted stds """ dis = cdist(X, centers) nearest = np.argmin(dis, axis=1) N = X.shape[0] M = len(models[0]) u = np.zeros((N, M)) s = np.zeros((N, M)) for ci in range(len(models)): mask = nearest == ci if not np.any(mask): continue X_ci = X[mask] for j in range(M): pred, std = gp_predict(models[ci][j], X_ci) u[mask, j] = pred.flatten() s[mask, j] = std.flatten() s = np.maximum(s, 0) return u, s def _gp_evaluate_mean_fcm(X, models, centers): """Predict mean only using cluster-based GP models.""" u, _ = _gp_evaluate_fcm(X, models, centers) return u # ============================================================================= # Main Infill Procedure # ============================================================================= def _opt_eti_fcm(M, D, batch_size, train_x, train_y, models, centers): """Maximize ETI using MOEA/D and select batch of candidate points.""" # Generate reference vectors num_weights = {2: 200, 3: 210, 4: 295, 5: 456, 6: 462} if M <= 3: ref_vecs, _ = uniform_point(num_weights.get(M, 200), M, method='NBI') elif M <= 6: ref_vecs, _ = uniform_point(num_weights.get(M, 295), M, method='ILD') else: ref_vecs, _ = uniform_point(500, M) # Estimate utopian point z z = _get_estimation_z(D, models, centers, ref_vecs, train_y.min(axis=0)) # Calculate gmin for each weight vector gmin = _get_gmin(train_y, ref_vecs, z) # Maximize ETI using MOEA/D-DE pop_eti, candidate_x, _, _ = _moead_eti(D, models, centers, ref_vecs, gmin, z) # Filter: remove similar candidates and those with ETI <= 0 Q = [] Q_eti = [] temp = train_x.copy() for idx in range(candidate_x.shape[0]): if candidate_x.shape[0] > 0 and temp.shape[0] > 0: min_dist = np.min(cdist(candidate_x[idx:idx + 1], temp)) else: min_dist = np.inf if min_dist > 1e-5 and pop_eti[idx] > 0: Q.append(candidate_x[idx]) Q_eti.append(pop_eti[idx]) temp = np.vstack([temp, candidate_x[idx:idx + 1]]) if len(Q) > 0: Q = np.array(Q) Q_eti = np.array(Q_eti) else: Q = np.empty((0, D)) Q_eti = np.empty(0) # K-means batch selection new_x = _kmeans_batch_select(Q, batch_size, candidate_x, Q_eti) return new_x # ============================================================================= # Utopian Point Estimation # ============================================================================= def _get_estimation_z(D, models, centers, ref_vecs, z_init): """Estimate utopian point using MOEA/D to minimize GP posterior mean.""" delta = 0.9 nr = 2 max_iter = 100 pop_size = ref_vecs.shape[0] # Neighborhood T = max(2, int(np.ceil(pop_size / 10))) B_dist = cdist(ref_vecs, ref_vecs) B = np.argsort(B_dist, axis=1)[:, :T] # Initial population pop_x = np.random.rand(pop_size, D) pop_mean = _gp_evaluate_mean_fcm(pop_x, models, centers) z = np.minimum(pop_mean.min(axis=0), z_init) for gen in range(max_iter - 1): for i in range(pop_size): if np.random.rand() < delta: P = B[i, np.random.permutation(T)] else: P = np.random.permutation(pop_size) # Generate offspring using DE off_x = _operator_de(pop_x[i], pop_x[P[0]], pop_x[P[1]]) off_mean = _gp_evaluate_mean_fcm(off_x.reshape(1, -1), models, centers).flatten() # Update z z = np.minimum(z, off_mean) # Tchebycheff scalarization g_old = np.max((pop_mean[P] - z) * ref_vecs[P], axis=1) g_new = np.max(np.tile(off_mean - z, (len(P), 1)) * ref_vecs[P], axis=1) # Replace at most nr neighbors improve_idx = np.where(g_old > g_new)[0][:nr] if len(improve_idx) > 0: replace_idx = P[improve_idx] pop_x[replace_idx] = off_x pop_mean[replace_idx] = off_mean return z # ============================================================================= # ETI Maximization with MOEA/D-DE # ============================================================================= def _get_gmin(objs, ref_vecs, z): """Calculate minimum Tchebycheff value for each weight vector.""" translated = objs - z # (N, M) G = ref_vecs[:, 0:1] * translated[:, 0:1].T # (NW, N) for j in range(1, ref_vecs.shape[1]): G = np.maximum(G, ref_vecs[:, j:j + 1] * translated[:, j:j + 1].T) return G.min(axis=1) # (NW,) def _moead_eti(D, models, centers, ref_vecs, gmin, z): """Use MOEA/D-DE to maximize ETI for all subproblems.""" delta = 0.9 nr = 2 max_iter = 50 pop_size = ref_vecs.shape[0] # Neighborhood T = max(2, int(np.ceil(pop_size / 10))) B_dist = cdist(ref_vecs, ref_vecs) B = np.argsort(B_dist, axis=1)[:, :T] # Initial population pop_x = np.random.rand(pop_size, D) pop_mean, pop_std = _gp_evaluate_fcm(pop_x, models, centers) pop_eti = _get_eti(pop_mean, pop_std, ref_vecs, gmin, z) for gen in range(max_iter - 1): for i in range(pop_size): if np.random.rand() < delta: P = B[i, np.random.permutation(T)] else: P = np.random.permutation(pop_size) # Generate offspring using DE off_x = _operator_de(pop_x[i], pop_x[P[0]], pop_x[P[1]]) off_mean, off_std = _gp_evaluate_fcm(off_x.reshape(1, -1), models, centers) off_mean = off_mean.flatten() off_std = off_std.flatten() # Compute ETI for offspring on neighbors' subproblems eti_new = _get_eti( np.tile(off_mean, (len(P), 1)), np.tile(off_std, (len(P), 1)), ref_vecs[P], gmin[P], z ) # Replace at most nr neighbors where offspring has better ETI improve_idx = np.where(pop_eti[P] < eti_new)[0][:nr] if len(improve_idx) > 0: replace_idx = P[improve_idx] pop_x[replace_idx] = off_x pop_mean[replace_idx] = off_mean pop_std[replace_idx] = off_std pop_eti[replace_idx] = eti_new[improve_idx] return pop_eti, pop_x, pop_mean, pop_std # ============================================================================= # Expected Tchebycheff Improvement (ETI) # ============================================================================= def _get_eti(u, sigma, ref_vecs, gbest, z): """Compute Expected Tchebycheff Improvement. Uses Moment Matching Approximation to approximate max of weighted Gaussians. Parameters ---------- u : (N, M) predicted means sigma : (N, M) predicted stds ref_vecs : (N, M) weight vectors gbest : (N,) best Tchebycheff values per subproblem z : (M,) utopian point """ # Weighted Gaussian: g_j ~ N(w_j*(u_j-z_j), (w_j*sigma_j)^2) g_mu = ref_vecs * (u - z) # (N, M) g_sig = ref_vecs * sigma # (N, M) g_sig = np.maximum(g_sig, 0) g_sig2 = g_sig ** 2 # (N, M) # MMA: approximate max of M Gaussians recursively mma_mean = g_mu[:, 0].copy() mma_sigma2 = g_sig2[:, 0].copy() for j in range(1, g_mu.shape[1]): mu_pair = np.column_stack([mma_mean, g_mu[:, j]]) sig2_pair = np.column_stack([mma_sigma2, g_sig2[:, j]]) mma_mean, mma_sigma2 = _app_max_of_2_gaussian(mu_pair, sig2_pair) mma_std = np.sqrt(np.maximum(mma_sigma2, 1e-20)) tau = (gbest - mma_mean) / mma_std # ETI = (gbest - mma_mean) * Phi(tau) + mma_std * phi(tau) eti = (gbest - mma_mean) * norm.cdf(tau) + mma_std * norm.pdf(tau) return eti def _app_max_of_2_gaussian(mu, sig2): """Moment Matching Approximation for max of two Gaussians (Eq. 18 & 19). Parameters ---------- mu : (N, 2) means sig2 : (N, 2) variances Returns ------- y : (N,) approximated mean of max s2 : (N,) approximated variance of max """ tau = np.sqrt(np.maximum(sig2[:, 0] + sig2[:, 1], 1e-20)) alpha = (mu[:, 0] - mu[:, 1]) / tau phi_alpha = norm.pdf(alpha) Phi_alpha = norm.cdf(alpha) Phi_neg_alpha = norm.cdf(-alpha) # Eq. 18: mean of max y = mu[:, 0] * Phi_alpha + mu[:, 1] * Phi_neg_alpha + tau * phi_alpha # Eq. 19: second moment (corrected as per Appendix B) s2 = ((mu[:, 0] ** 2 + sig2[:, 0]) * Phi_alpha + (mu[:, 1] ** 2 + sig2[:, 1]) * Phi_neg_alpha + (mu[:, 0] + mu[:, 1]) * tau * phi_alpha) s2 = s2 - y ** 2 s2 = np.maximum(s2, 0) return y, s2 # ============================================================================= # DE Operator # ============================================================================= def _operator_de(parent1, parent2, parent3, CR=1.0, F=0.5, mum=20): """DE/rand/1/bin with polynomial mutation in [0,1] space.""" D = len(parent1) # Differential evolution site = np.random.rand(D) < CR offspring = parent1.copy() offspring[site] = offspring[site] + F * (parent2[site] - parent3[site]) # Polynomial mutation (prob = 1/D per dimension) site = np.random.rand(D) < 1.0 / D offspring = np.clip(offspring, 0, 1) mu = np.random.rand(D) for j in np.where(site)[0]: if mu[j] <= 0.5: offspring[j] = offspring[j] + (1 - 0) * ( (2 * mu[j] + (1 - 2 * mu[j]) * (1 - offspring[j]) ** (mum + 1)) ** (1 / (mum + 1)) - 1) else: offspring[j] = offspring[j] + (1 - 0) * ( 1 - (2 * (1 - mu[j]) + 2 * (mu[j] - 0.5) * offspring[j] ** (mum + 1)) ** (1 / (mum + 1))) return np.clip(offspring, 0, 1) # ============================================================================= # Batch Selection # ============================================================================= def _kmeans_batch_select(Q, batch_size, candidate_x, Q_eti): """Select batch of solutions using K-means clustering on filtered candidates.""" if Q.shape[0] == 0: # Fallback: random from candidates idx = np.random.choice(candidate_x.shape[0], size=min(batch_size, candidate_x.shape[0]), replace=False) return candidate_x[idx] actual_batch = min(batch_size, Q.shape[0]) try: kmeans = KMeans(n_clusters=actual_batch, n_init=10, random_state=0) labels = kmeans.fit_predict(Q) except Exception: idx = np.random.choice(Q.shape[0], size=actual_batch, replace=False) return Q[idx] selected = [] for ci in range(actual_batch): cluster_idx = np.where(labels == ci)[0] if len(cluster_idx) > 0: best = cluster_idx[np.argmax(Q_eti[cluster_idx])] selected.append(best) result = Q[np.array(selected)] if selected else np.empty((0, Q.shape[1])) # Pad with random candidates if not enough if result.shape[0] < batch_size: n_pad = batch_size - result.shape[0] pad_idx = np.random.choice(candidate_x.shape[0], size=min(n_pad, candidate_x.shape[0]), replace=False) result = np.vstack([result, candidate_x[pad_idx]]) if result.shape[0] > 0 else candidate_x[ pad_idx] return result