Source code for ddmtolab.Algorithms.STMO.EIM_EGO

"""
Expected Improvement Matrix based Efficient Global Optimization (EIM-EGO)

This module implements EIM-EGO for computationally expensive multi-objective optimization.
It builds Kriging (Gaussian Process) models for each objective and uses the Expected Improvement
Matrix (EIM) criterion to select one promising candidate per iteration via GA optimization.

Three EIM infill criteria are supported:
    1. Euclidean distance-based EIM criterion (default)
    2. Maximin distance-based EIM criterion
    3. Hypervolume-based EIM criterion

References
----------
    [1] D. Zhan, Y. Cheng, and J. Liu. Expected improvement matrix-based infill criteria for expensive multiobjective optimization. IEEE Transactions on Evolutionary Computation, 2017, 21(6): 956-975.

Notes
-----
Author: Jiangtao Shen
Date: 2026.02.17
Version: 1.0
"""
from tqdm import tqdm
import time
import numpy as np
from scipy.stats import norm
from ddmtolab.Methods.Algo_Methods.algo_utils import *
from ddmtolab.Methods.Algo_Methods.bo_utils import mo_gp_build, mo_gp_predict
import warnings

warnings.filterwarnings("ignore")


[docs] class EIM_EGO: """ Expected Improvement Matrix based Efficient Global Optimization for expensive multi-objective optimization. """ algorithm_information = { 'n_tasks': '[1, K]', 'dims': 'unequal', 'objs': 'unequal', 'n_objs': '[2, M]', 'cons': 'equal', 'n_cons': '0', 'expensive': 'True', 'knowledge_transfer': 'False', 'n_initial': 'unequal', 'max_nfes': 'unequal' } @classmethod def get_algorithm_information(cls, print_info=True): return get_algorithm_information(cls, print_info)
[docs] def __init__(self, problem, n_initial=None, max_nfes=None, eim_type=1, ga_pop_size=100, ga_generations=100, save_data=True, save_path='./Data', name='EIM-EGO', disable_tqdm=True): """ Initialize EIM-EGO algorithm. Parameters ---------- problem : MTOP Multi-task optimization problem instance n_initial : int or List[int], optional Number of initial samples per task (default: 11*dim-1) max_nfes : int or List[int], optional Maximum number of function evaluations per task (default: 200) eim_type : int, optional EIM infill criterion type (default: 1) 1 = Euclidean distance-based, 2 = Maximin distance-based, 3 = Hypervolume-based ga_pop_size : int, optional Population size for internal GA optimizer (default: 100) ga_generations : int, optional Number of generations for internal GA optimizer (default: 100) save_data : bool, optional Whether to save optimization data (default: True) save_path : str, optional Path to save results (default: './Data') name : str, optional Name for the experiment (default: 'EIM-EGO') disable_tqdm : bool, optional Whether to disable progress bar (default: True) """ self.problem = problem self.n_initial = n_initial self.max_nfes = max_nfes if max_nfes is not None else 200 self.eim_type = eim_type self.ga_pop_size = ga_pop_size self.ga_generations = ga_generations self.save_data = save_data self.save_path = save_path self.name = name self.disable_tqdm = disable_tqdm
[docs] def optimize(self): """ Execute the EIM-EGO algorithm. Returns ------- Results Optimization results containing decision variables, objectives, and runtime """ start_time = time.time() problem = self.problem nt = problem.n_tasks dims = problem.dims n_objs = problem.n_objs n_initial_per_task = par_list( self.n_initial if self.n_initial is not None else [11 * d - 1 for d in dims], nt) max_nfes_per_task = par_list(self.max_nfes, nt) # Generate initial samples using Latin Hypercube Sampling decs = initialization(problem, n_initial_per_task, method='lhs') objs, _ = evaluation(problem, decs) nfes_per_task = n_initial_per_task.copy() pbar = tqdm(total=sum(max_nfes_per_task), initial=sum(n_initial_per_task), desc=f"{self.name}", disable=self.disable_tqdm) while sum(nfes_per_task) < sum(max_nfes_per_task): active_tasks = [i for i in range(nt) if nfes_per_task[i] < max_nfes_per_task[i]] if not active_tasks: break for i in active_tasks: M = n_objs[i] D = dims[i] # Scale objectives to [0, 1] obj_min = objs[i].min(axis=0) obj_max = objs[i].max(axis=0) obj_range = obj_max - obj_min obj_range[obj_range < 1e-10] = 1.0 pop_obj_scaled = (objs[i] - obj_min) / obj_range # Build GP models for each objective on scaled objectives try: models = mo_gp_build(decs[i], pop_obj_scaled) except Exception: # If GP fitting fails, add a random point new_dec = np.random.rand(1, D) new_obj, _ = evaluation_single(problem, new_dec, i) decs[i] = np.vstack([decs[i], new_dec]) objs[i] = np.vstack([objs[i], new_obj]) nfes_per_task[i] += 1 pbar.update(1) continue # Select one candidate using EIM criterion via GA best_dec = _infill_sampling_eim( models, pop_obj_scaled, D, M, self.eim_type, self.ga_pop_size, self.ga_generations ) # Remove duplicates new_dec = remove_duplicates(best_dec.reshape(1, -1), decs[i]) if new_dec.shape[0] == 0: # If duplicate, perturb slightly new_dec = np.clip(best_dec + 1e-4 * np.random.randn(D), 0, 1).reshape(1, -1) # Evaluate the candidate new_obj, _ = evaluation_single(problem, new_dec, i) decs[i] = np.vstack([decs[i], new_dec]) objs[i] = np.vstack([objs[i], new_obj]) nfes_per_task[i] += 1 pbar.update(1) pbar.close() runtime = time.time() - start_time # Convert database to staircase history structure for results db_decs = [decs[i].copy() for i in range(nt)] db_objs = [objs[i].copy() for i in range(nt)] all_decs, all_objs = build_staircase_history(db_decs, db_objs, k=1) results = build_save_results(all_decs=all_decs, all_objs=all_objs, runtime=runtime, max_nfes=nfes_per_task, bounds=problem.bounds, save_path=self.save_path, filename=self.name, save_data=self.save_data) return results
# ============================================================================= # EIM Infill Criterion # ============================================================================= def _infill_sampling_eim(models, pop_obj_scaled, D, M, eim_type, ga_pop_size, ga_generations): """Select one candidate by maximizing the EIM criterion using GA. Parameters ---------- models : list Trained GP models, one per objective pop_obj_scaled : np.ndarray Scaled objective values, shape (N, M) D : int Number of decision variables M : int Number of objectives eim_type : int EIM criterion type (1=Euclidean, 2=Maximin, 3=Hypervolume) ga_pop_size : int GA population size ga_generations : int Number of GA generations Returns ------- best : np.ndarray Best candidate decision vector, shape (D,) """ # Get non-dominated front from scaled objectives front_no, _ = nd_sort(pop_obj_scaled, pop_obj_scaled.shape[0]) f = pop_obj_scaled[front_no == 1] # (p, M) p = f.shape[0] # Track global best best = None eim_max = np.inf # We negate EIM, so inf means worst # Initialize GA population with LHS offspring = initialization_single(D, ga_pop_size, method='lhs') for gen in range(ga_generations): # Predict mean and MSE from GP models pred_objs, pred_mse = mo_gp_predict(models, offspring, mse=True) u = pred_objs # (ga_pop_size, M) s = np.sqrt(np.maximum(0, pred_mse)) # (ga_pop_size, M) # Compute EIM criterion eim = _compute_eim(f, u, s, M, p, eim_type) # (ga_pop_size,), negated # Update global best best_idx = np.argmin(eim) if eim[best_idx] < eim_max: best = offspring[best_idx].copy() eim_max = eim[best_idx] # GA selection and evolution sorted_idx = np.argsort(eim) half = max(2, int(np.ceil(ga_pop_size / 2))) parents = offspring[sorted_idx[:half]] parent_eim = eim[sorted_idx[:half]] # First half: tournament selection + SBX crossover + polynomial mutation tournament_idx = _tournament_selection(parent_eim, parents.shape[0]) offspring1 = ga_generation(parents[tournament_idx], muc=20, mum=20) # Second half: mutation only on parents offspring2 = np.array([mutation(parents[j], mu=20) for j in range(parents.shape[0])]) # Combine and clip offspring = np.vstack([offspring1, offspring2])[:ga_pop_size] offspring = np.clip(offspring, 0, 1) return best def _compute_eim(f, u, s, M, p, eim_type): """Compute the EIM criterion for all candidates. Parameters ---------- f : np.ndarray Non-dominated front (scaled), shape (p, M) u : np.ndarray Predicted means, shape (N, M) s : np.ndarray Predicted stds, shape (N, M) M : int Number of objectives p : int Number of non-dominated solutions eim_type : int EIM type (1=Euclidean, 2=Maximin, 3=Hypervolume) Returns ------- eim : np.ndarray Negated EIM values, shape (N,), lower is better """ N = u.shape[0] # Reshape for broadcasting: f (p, M, 1), u (1, M, N), s (1, M, N) f_3d = f[:, :, np.newaxis] # (p, M, N_broadcast=1) u_3d = u.T[np.newaxis, :, :] # (1, M, N) s_3d = s.T[np.newaxis, :, :] # (1, M, N) # Avoid division by zero s_safe = np.maximum(s_3d, 1e-10) # EI matrix: EI = (f - u) * Phi((f-u)/s) + s * phi((f-u)/s) diff = f_3d - u_3d # (p, M, N) z = diff / s_safe # (p, M, N) ei_matrix = diff * norm.cdf(z) + s_3d * norm.pdf(z) # (p, M, N) if eim_type == 1: # Euclidean distance-based: min over p of sqrt(sum over M of EI^2) dist = np.sqrt(np.sum(ei_matrix ** 2, axis=1)) # (p, N) eim = -np.min(dist, axis=0) # (N,) elif eim_type == 2: # Maximin distance-based: min over p of max over M of EI maxmin = np.max(ei_matrix, axis=1) # (p, N) eim = -np.min(maxmin, axis=0) # (N,) elif eim_type == 3: # Hypervolume-based: min over p of (prod(1.1-f+EI) - prod(1.1-f)) ref = 1.1 * np.ones(M) prod_with_ei = np.prod(ref[np.newaxis, :, np.newaxis] - f_3d + ei_matrix, axis=1) # (p, N) prod_without_ei = np.prod(ref[np.newaxis, :] - f, axis=1) # (p,) hv_contrib = prod_with_ei - prod_without_ei[:, np.newaxis] # (p, N) eim = -np.min(hv_contrib, axis=0) # (N,) else: raise ValueError(f"Unknown EIM type: {eim_type}. Must be 1, 2, or 3.") return eim def _tournament_selection(fitness, n_select, n_tournament=2): """Binary tournament selection (lower fitness is better).""" N = len(fitness) selected = np.zeros(n_select, dtype=int) for k in range(n_select): candidates = np.random.randint(0, N, size=n_tournament) selected[k] = candidates[np.argmin(fitness[candidates])] return selected def initialization_single(D, n, method='lhs'): """Initialize n solutions in [0,1]^D for a single task.""" if method == 'lhs': from scipy.stats import qmc sampler = qmc.LatinHypercube(d=D) return sampler.random(n=n) else: return np.random.rand(n, D)