Source code for ddmtolab.Algorithms.STMO.DirHV_EI

"""
Expected Direction-based Hypervolume Improvement (DirHV-EI)

This module implements DirHV-EI for parallel expensive multi/many-objective optimization.
It uses GP surrogates with a MOEA/D-GR framework to maximize direction-based hypervolume
expected improvement, and a greedy batch selection strategy.

References
----------
    [1] L. Zhao and Q. Zhang. Hypervolume-guided decomposition for parallel expensive multiobjective optimization. IEEE Transactions on Evolutionary Computation, 2024, 28(2): 432-444.

Notes
-----
Author: Jiangtao Shen
Date: 2026.02.16
Version: 1.0
"""
from tqdm import tqdm
import time
import torch
import numpy as np
from scipy.stats import norm
from scipy.spatial.distance import pdist, squareform
from ddmtolab.Methods.Algo_Methods.algo_utils import *
from ddmtolab.Methods.Algo_Methods.bo_utils import mo_gp_build, mo_gp_predict
from ddmtolab.Methods.Algo_Methods.uniform_point import uniform_point
import warnings

warnings.filterwarnings("ignore")


[docs] class DirHV_EI: """ Expected Direction-based Hypervolume Improvement for parallel expensive multi/many-objective optimization. Attributes ---------- algorithm_information : dict Dictionary containing algorithm capabilities and requirements """ algorithm_information = { 'n_tasks': '[1, K]', 'dims': 'unequal', 'objs': 'unequal', 'n_objs': '[2, M]', 'cons': 'equal', 'n_cons': '0', 'expensive': 'True', 'knowledge_transfer': 'False', 'n_initial': 'unequal', 'max_nfes': 'unequal' } @classmethod def get_algorithm_information(cls, print_info=True): return get_algorithm_information(cls, print_info)
[docs] def __init__(self, problem, n_initial=None, max_nfes=None, batch_size=5, save_data=True, save_path='./Data', name='DirHV-EI', disable_tqdm=True): """ Initialize DirHV-EI algorithm. Parameters ---------- problem : MTOP Multi-task optimization problem instance n_initial : int or List[int], optional Number of initial samples per task (default: 11*dim-1) max_nfes : int or List[int], optional Maximum number of function evaluations per task (default: 200) batch_size : int, optional Number of true function evaluations per iteration (default: 5) save_data : bool, optional Whether to save optimization data (default: True) save_path : str, optional Path to save results (default: './Data') name : str, optional Name for the experiment (default: 'DirHV-EI') disable_tqdm : bool, optional Whether to disable progress bar (default: True) """ self.problem = problem self.n_initial = n_initial self.max_nfes = max_nfes if max_nfes is not None else 200 self.batch_size = batch_size self.save_data = save_data self.save_path = save_path self.name = name self.disable_tqdm = disable_tqdm
[docs] def optimize(self): """ Execute the DirHV-EI algorithm. Returns ------- Results Optimization results containing decision variables, objectives, and runtime """ data_type = torch.float start_time = time.time() problem = self.problem nt = problem.n_tasks dims = problem.dims n_objs = problem.n_objs # Set default initial samples: 11*dim - 1 if self.n_initial is None: n_initial_per_task = [11 * dims[i] - 1 for i in range(nt)] else: n_initial_per_task = par_list(self.n_initial, nt) max_nfes_per_task = par_list(self.max_nfes, nt) # Generate initial samples using Latin Hypercube Sampling decs = initialization(problem, n_initial_per_task, method='lhs') objs, _ = evaluation(problem, decs) nfes_per_task = n_initial_per_task.copy() pbar = tqdm(total=sum(max_nfes_per_task), initial=sum(n_initial_per_task), desc=f"{self.name}", disable=self.disable_tqdm) while sum(nfes_per_task) < sum(max_nfes_per_task): active_tasks = [i for i in range(nt) if nfes_per_task[i] < max_nfes_per_task[i]] if not active_tasks: break for i in active_tasks: m = n_objs[i] dim = dims[i] # Determine actual batch size (don't exceed remaining budget) remaining = max_nfes_per_task[i] - nfes_per_task[i] actual_batch = min(self.batch_size, remaining) # Scale objectives to [0, 1] ori_objs = objs[i] ymin = ori_objs.min(axis=0) ymax = ori_objs.max(axis=0) yrange = ymax - ymin yrange[yrange < 1e-12] = 1e-12 train_y = (ori_objs - ymin) / yrange # Non-dominated solutions (scaled) front_no, _ = nd_sort(train_y, 1) train_y_nds = train_y[front_no == 1] # Build GP models for each objective on scaled data models = mo_gp_build(decs[i], train_y, data_type) # Optimize DirHV-EI and select batch of candidate points new_decs = _opt_dirhvei( m, dim, models, train_y_nds, actual_batch, data_type ) # Remove duplicates new_decs = remove_duplicates(new_decs, decs[i]) if new_decs.shape[0] > 0: # Expensive evaluation new_objs, _ = evaluation_single(problem, new_decs, i) # Update dataset decs[i] = np.vstack([decs[i], new_decs]) objs[i] = np.vstack([objs[i], new_objs]) nfes_per_task[i] += new_decs.shape[0] pbar.update(new_decs.shape[0]) pbar.close() runtime = time.time() - start_time all_decs, all_objs = build_staircase_history(decs, objs, k=self.batch_size) results = build_save_results(all_decs=all_decs, all_objs=all_objs, runtime=runtime, max_nfes=nfes_per_task, bounds=problem.bounds, save_path=self.save_path, filename=self.name, save_data=self.save_data) return results
# ==================== Core DirHV-EI Functions ==================== def _opt_dirhvei(M, D, models, train_y_nds, batch_size, data_type): """ Maximize DirHV-EI and select a batch of query points. Parameters ---------- M : int Number of objectives D : int Number of decision variables models : list List of trained GP models (one per objective) train_y_nds : np.ndarray Scaled non-dominated objective values, shape (n_nds, M) batch_size : int Number of points to select data_type : torch.dtype Data type for GP prediction Returns ------- new_x : np.ndarray Selected candidate points, shape (batch_size, D) """ # Generate reference vectors num_weights = [200, 210, 295, 456, 462] if M <= 3: ref_vecs, _ = uniform_point(num_weights[M - 2], M) elif M <= 6: ref_vecs, _ = uniform_point(num_weights[M - 2], M, method='ILD') else: ref_vecs, _ = uniform_point(500, M) # Utopian point z = -0.01 * np.ones(M) # Calculate intersection points and direction vectors xis, dir_vecs = _get_xis(train_y_nds, ref_vecs, z) # Use MOEA/D-GR to maximize DirHV-EI candidate_x, candidate_mean, candidate_std = _moead_gr( D, models, dir_vecs, xis, data_type ) # Discard duplicate candidates _, unique_idx = np.unique(candidate_x, axis=0, return_index=True) unique_idx = np.sort(unique_idx) candidate_x = candidate_x[unique_idx] candidate_mean = candidate_mean[unique_idx] candidate_std = candidate_std[unique_idx] # Compute DirHV-EI for all candidates against all directions n_candidates = candidate_x.shape[0] n_dirs = dir_vecs.shape[0] DirHVEIs = np.zeros((n_candidates, n_dirs)) for j in range(n_candidates): u_rep = np.tile(candidate_mean[j], (n_dirs, 1)) s_rep = np.tile(candidate_std[j], (n_dirs, 1)) DirHVEIs[j, :] = _get_dirhvei(u_rep, s_rep, xis) # Greedy batch selection actual_batch = min(batch_size, n_candidates) Qb = _subset_selection(DirHVEIs, actual_batch) new_x = candidate_x[Qb] return new_x def _get_xis(train_y_nds, ref_vecs, z): """ Calculate intersection points and direction vectors. Parameters ---------- train_y_nds : np.ndarray Non-dominated objectives (scaled), shape (n, M) ref_vecs : np.ndarray Reference vectors, shape (N, M) z : np.ndarray Utopian point, shape (M,) Returns ------- xis : np.ndarray Intersection points, shape (N, M) dir_vecs : np.ndarray Direction vectors (unit L2 norm), shape (N, M) """ M = ref_vecs.shape[1] # Eq. 25: direction vectors with unit L2 norm temp = 1.1 * ref_vecs - z norm_temp = np.sqrt(np.sum(temp ** 2, axis=1, keepdims=True)) dir_vecs = temp / norm_temp # Eq. 11: intersection points div_dir = 1.0 / dir_vecs # (N, M) train_y_translated = train_y_nds - z # (n, M) # G[i,k] = max_j ( (train_y_translated[k,j]) / dir_vecs[i,j] ) G = div_dir[:, 0:1] * train_y_translated[:, 0:1].T # (N, n) for j in range(1, M): G = np.maximum(G, div_dir[:, j:j + 1] * train_y_translated[:, j:j + 1].T) # Minimum of mTch for each direction vector Lmin = G.min(axis=1, keepdims=True) # (N, 1) # Intersection points xis = z + Lmin * dir_vecs # (N, M) return xis, dir_vecs def _moead_gr(D, models, dir_vecs, xis, data_type): """ MOEA/D with Global Replacement to maximize DirHV-EI over direction vectors. Parameters ---------- D : int Number of decision variables models : list List of GP models dir_vecs : np.ndarray Direction vectors, shape (N, M) xis : np.ndarray Intersection points, shape (N, M) data_type : torch.dtype Data type for GP prediction Returns ------- pop_x : np.ndarray Candidate solutions, shape (N, D) pop_mean : np.ndarray GP predicted means, shape (N, M) pop_std : np.ndarray GP predicted standard deviations, shape (N, M) """ max_iter = 50 pop_size = dir_vecs.shape[0] # Neighbourhood T = max(2, int(np.ceil(pop_size / 10))) dist_mat = squareform(pdist(dir_vecs)) B = np.argsort(dist_mat, axis=1)[:, :T] # Initial population via LHS in [0, 1]^D pop_x = np.random.rand(pop_size, D) # LHS approx # GP prediction for initial population pop_mean, pop_std = _gp_evaluate(pop_x, models, data_type) # DirHV-EI for each individual on its own direction pop_dirhvei = _get_dirhvei(pop_mean, pop_std, xis) # Main optimization loop for gen in range(max_iter - 1): for i in range(pop_size): # Select parents if np.random.rand() < 0.8: P = B[i, np.random.permutation(B.shape[1])] else: P = np.random.permutation(pop_size) # Generate offspring via DE + polynomial mutation off_x = _operator_de(pop_x[i:i + 1], pop_x[P[0]:P[0] + 1], pop_x[P[1]:P[1] + 1]) off_x = off_x.reshape(1, -1) # GP prediction for offspring off_mean, off_std = _gp_evaluate(off_x, models, data_type) # Compute DirHV-EI of offspring for all directions off_mean_rep = np.tile(off_mean, (pop_size, 1)) off_std_rep = np.tile(off_std, (pop_size, 1)) off_dirhveis = _get_dirhvei(off_mean_rep, off_std_rep, xis) # Global Replacement: find best matching subproblem best_index = np.argmax(off_dirhveis) # Replacement neighbourhood P_replace = B[best_index] # Update solutions where offspring is better improve_mask = pop_dirhvei[P_replace] < off_dirhveis[P_replace] off_indices = P_replace[improve_mask] if len(off_indices) > 0: pop_x[off_indices] = off_x pop_mean[off_indices] = off_mean pop_std[off_indices] = off_std pop_dirhvei[off_indices] = off_dirhveis[off_indices] return pop_x, pop_mean, pop_std def _gp_evaluate(X, models, data_type): """ Predict GP posterior mean and std for candidate solutions. gp_predict returns original objectives (it negates internally to undo the training-time negation). We use those directly as the GP mean predictions. Parameters ---------- X : np.ndarray Candidate solutions, shape (N, D) models : list List of GP models data_type : torch.dtype Data type Returns ------- u : np.ndarray Predicted means, shape (N, M) s : np.ndarray Predicted standard deviations, shape (N, M) """ from ddmtolab.Methods.Algo_Methods.bo_utils import gp_predict N = X.shape[0] M = len(models) u = np.zeros((N, M)) s = np.zeros((N, M)) for j in range(M): pred_obj, pred_std = gp_predict(models[j], X, data_type) # gp_predict already returns original objectives (negated back) u[:, j] = pred_obj.flatten() s[:, j] = pred_std.flatten() # Ensure non-negative variance s = np.maximum(s, 0.0) return u, s def _get_dirhvei(u, sigma, xis): """ Calculate direction-based hypervolume expected improvement (Eq. 23). Parameters ---------- u : np.ndarray Predicted means, shape (N, M) sigma : np.ndarray Predicted standard deviations, shape (N, M) xis : np.ndarray Intersection points, shape (N, M) Returns ------- dirhvei : np.ndarray DirHV-EI values, shape (N,) """ # Avoid division by zero sigma_safe = np.maximum(sigma, 1e-10) xi_minus_u = xis - u tau = xi_minus_u / sigma_safe # Precompute normal CDF and PDF normcdf_tau = norm.cdf(tau) normpdf_tau = norm.pdf(tau) temp = xi_minus_u * normcdf_tau + sigma_safe * normpdf_tau # (N, M) dirhvei = np.prod(temp, axis=1) # (N,) return dirhvei def _subset_selection(DirHVEIs, batch_size): """ Greedy batch selection via submodularity (Algorithm 3). Parameters ---------- DirHVEIs : np.ndarray DirHV-EI matrix, shape (L, N) where L = candidates, N = directions batch_size : int Number of points to select Returns ------- Qb : list Selected candidate indices """ L, N = DirHVEIs.shape Qb = [] temp = DirHVEIs.copy() beta = np.zeros(N) for _ in range(batch_size): # Select candidate with max sum of DirHV-EI across all directions sums = temp.sum(axis=1) index = np.argmax(sums) Qb.append(index) beta += temp[index] # Update: [EI_D(x|lambda) - beta]_+ temp = DirHVEIs - beta temp[temp < 0] = 0 return Qb def _operator_de(parent1, parent2, parent3): """ Differential Evolution operator with polynomial mutation. Operates in [0, 1]^D space (DDMTOLab convention). Parameters ---------- parent1, parent2, parent3 : np.ndarray Parent solutions, shape (1, D) Returns ------- offspring : np.ndarray Generated offspring, shape (1, D) """ CR, F, proM, disM = 1.0, 0.5, 1.0, 20.0 N, D = parent1.shape # DE mutation site = np.random.rand(N, D) < CR offspring = parent1.copy() offspring[site] = offspring[site] + F * (parent2[site] - parent3[site]) # Clip to bounds [0, 1] offspring = np.clip(offspring, 0, 1) # Polynomial mutation site = np.random.rand(N, D) < proM / D mu = np.random.rand(N, D) temp = site & (mu <= 0.5) if np.any(temp): val = 2.0 * mu[temp] + (1.0 - 2.0 * mu[temp]) * \ (1.0 - offspring[temp]) ** (disM + 1) delta = val ** (1.0 / (disM + 1)) - 1.0 offspring[temp] = offspring[temp] + delta temp = site & (mu > 0.5) if np.any(temp): val = 2.0 * (1.0 - mu[temp]) + 2.0 * (mu[temp] - 0.5) * \ (1.0 - (1.0 - offspring[temp])) ** (disM + 1) delta = 1.0 - val ** (1.0 / (disM + 1)) offspring[temp] = offspring[temp] + delta offspring = np.clip(offspring, 0, 1) return offspring