"""
Multi-objective Efficient Global Optimization (MultiObjectiveEGO)
This module implements MultiObjectiveEGO for computationally expensive multi-objective optimization.
It uses reference direction-based Augmented Achievement Scalarizing Function (AASF) to decompose
the multi-objective problem into scalar subproblems, builds a single GP model per infill, and
maximizes Standard Expected Improvement to select new evaluation points.
References
----------
[1] R. Hussein and K. Deb. A generative Kriging surrogate model for constrained and unconstrained multi-objective optimization. Proceedings of the Genetic and Evolutionary Computation Conference, 2016, 573-580.
Notes
-----
Author: Jiangtao Shen
Date: 2026.02.17
Version: 1.0
"""
from tqdm import tqdm
import time
import numpy as np
from scipy.stats import norm
from scipy.spatial.distance import cdist
from ddmtolab.Methods.Algo_Methods.algo_utils import *
from ddmtolab.Methods.Algo_Methods.bo_utils import gp_build, gp_predict
from ddmtolab.Methods.Algo_Methods.uniform_point import uniform_point
import warnings
warnings.filterwarnings("ignore")
[docs]
class MultiObjectiveEGO:
"""
Multi-objective Efficient Global Optimization for expensive
multi-objective optimization using reference direction-based
scalarization and Expected Improvement.
"""
algorithm_information = {
'n_tasks': '[1, K]',
'dims': 'unequal',
'objs': 'unequal',
'n_objs': '[2, M]',
'cons': 'equal',
'n_cons': '0',
'expensive': 'True',
'knowledge_transfer': 'False',
'n_initial': 'unequal',
'max_nfes': 'unequal'
}
@classmethod
def get_algorithm_information(cls, print_info=True):
return get_algorithm_information(cls, print_info)
[docs]
def __init__(self, problem, n_initial=None, max_nfes=None, alpha=0.7, num_k=5, H=21,
rho=1e-3, save_data=True, save_path='./Data', name='MultiObjectiveEGO',
disable_tqdm=True):
"""
Initialize MultiObjectiveEGO algorithm.
Parameters
----------
problem : MTOP
Multi-task optimization problem instance
n_initial : int or List[int], optional
Number of initial samples per task (default: 11*dim-1)
max_nfes : int or List[int], optional
Maximum number of function evaluations per task (default: 200)
alpha : float, optional
Portion of samples for Kriging construction (default: 0.7)
num_k : int, optional
Number of infill points per reference direction (default: 5)
H : int, optional
Number of reference directions (default: 21)
rho : float, optional
Parameter for AASF scalarization (default: 1e-3)
save_data : bool, optional
Whether to save optimization data (default: True)
save_path : str, optional
Path to save results (default: './Data')
name : str, optional
Name for the experiment (default: 'MultiObjectiveEGO')
disable_tqdm : bool, optional
Whether to disable progress bar (default: True)
"""
self.problem = problem
self.n_initial = n_initial
self.max_nfes = max_nfes if max_nfes is not None else 200
self.alpha = alpha
self.num_k = num_k
self.H = H
self.rho = rho
self.save_data = save_data
self.save_path = save_path
self.name = name
self.disable_tqdm = disable_tqdm
[docs]
def optimize(self):
"""
Execute the MultiObjectiveEGO algorithm.
Returns
-------
Results
Optimization results containing decision variables, objectives, and runtime
"""
start_time = time.time()
problem = self.problem
nt = problem.n_tasks
dims = problem.dims
n_objs = problem.n_objs
n_initial_per_task = par_list(
self.n_initial if self.n_initial is not None else [11 * d - 1 for d in dims], nt)
max_nfes_per_task = par_list(self.max_nfes, nt)
# Generate initial samples using Latin Hypercube Sampling
decs = initialization(problem, n_initial_per_task, method='lhs')
objs, _ = evaluation(problem, decs)
nfes_per_task = n_initial_per_task.copy()
pbar = tqdm(total=sum(max_nfes_per_task), initial=sum(n_initial_per_task),
desc=f"{self.name}", disable=self.disable_tqdm)
while sum(nfes_per_task) < sum(max_nfes_per_task):
active_tasks = [i for i in range(nt) if nfes_per_task[i] < max_nfes_per_task[i]]
if not active_tasks:
break
for i in active_tasks:
M = n_objs[i]
D = dims[i]
# Generate normalized reference directions
R, N_R = uniform_point(self.H, M)
R = R / np.sqrt(np.sum(R ** 2, axis=1, keepdims=True))
# Iterate over each reference direction
for r_idx in range(N_R):
if nfes_per_task[i] >= max_nfes_per_task[i]:
break
for j in range(self.num_k):
if nfes_per_task[i] >= max_nfes_per_task[i]:
break
N_pop = decs[i].shape[0]
r_vec = R[r_idx]
# Points_Selector: select alpha*N_Pop closest points to this direction
norm_p = np.sqrt(np.sum(objs[i] ** 2, axis=1))
norm_p[norm_p < 1e-10] = 1e-10
cosine_p = np.sum(objs[i] * r_vec, axis=1) / norm_p
cosine_p = np.clip(cosine_p, -1, 1)
dist_b = norm_p * np.sqrt(np.maximum(1 - cosine_p ** 2, 0))
sorted_idx = np.argsort(dist_b)
N_sub = max(3, int(np.ceil(self.alpha * N_pop)))
N_sub = min(N_sub, N_pop)
sub_idx = sorted_idx[:N_sub]
pop_dec = decs[i][sub_idx]
pop_obj = objs[i][sub_idx]
# Scale objectives to [0, 1]
obj_min = pop_obj.min(axis=0)
obj_max = pop_obj.max(axis=0)
obj_range = obj_max - obj_min
obj_range[obj_range < 1e-10] = 1.0
pop_obj_scaled = (pop_obj - obj_min) / obj_range
# Compute AASF scalarized value
# S = max(f_scaled / w) + rho * sum(f_scaled / w)
r_safe = np.maximum(r_vec, 1e-10)
weighted = pop_obj_scaled / r_safe
pop_smetric = np.max(weighted, axis=1) + self.rho * np.sum(weighted, axis=1)
# Build single GP model on scalarized values
try:
kriging_model = gp_build(pop_dec, pop_smetric.reshape(-1, 1))
except Exception:
continue
f_min = pop_smetric.min()
# Maximize EI using GA
best_x = _rga_ei(kriging_model, f_min, D)
# If too close to existing points, maximize distance
if np.min(cdist(best_x.reshape(1, -1), decs[i])) < 1e-8:
best_x = _rga_max_distance(decs[i], D)
# Evaluate
new_dec = best_x.reshape(1, -1)
new_obj, _ = evaluation_single(problem, new_dec, i)
decs[i] = np.vstack([decs[i], new_dec])
objs[i] = np.vstack([objs[i], new_obj])
nfes_per_task[i] += 1
pbar.update(1)
pbar.close()
runtime = time.time() - start_time
# Convert database to staircase history structure for results
db_decs = [decs[i].copy() for i in range(nt)]
db_objs = [objs[i].copy() for i in range(nt)]
all_decs, all_objs = build_staircase_history(db_decs, db_objs, k=1)
results = build_save_results(all_decs=all_decs, all_objs=all_objs, runtime=runtime,
max_nfes=nfes_per_task, bounds=problem.bounds,
save_path=self.save_path, filename=self.name,
save_data=self.save_data)
return results
# =============================================================================
# GA-based Infill Optimization
# =============================================================================
def _rga_ei(kriging_model, f_min, D, ga_pop_size=None, ga_generations=100):
"""Maximize Expected Improvement using real-coded GA.
Parameters
----------
kriging_model : SingleTaskGP
Trained GP model for the scalarized objective
f_min : float
Minimum observed scalarized value (best so far)
D : int
Number of decision variables
ga_pop_size : int, optional
GA population size (default: 10*D)
ga_generations : int, optional
Number of GA generations (default: 100)
Returns
-------
best : np.ndarray, shape (D,)
Best candidate decision vector
"""
if ga_pop_size is None:
ga_pop_size = max(20, 10 * D)
best = None
obj_max = np.inf
# Initial population (LHS)
from scipy.stats import qmc
sampler = qmc.LatinHypercube(d=D)
offspring = sampler.random(n=ga_pop_size)
for gen in range(ga_generations):
# Predict and compute EI
pred, std = gp_predict(kriging_model, offspring)
pred = pred.flatten()
std = std.flatten()
s = np.maximum(std, 1e-10)
# Standard EI: (f_min - y) * Phi((f_min-y)/s) + s * phi((f_min-y)/s)
diff = f_min - pred
z = diff / s
ei = diff * norm.cdf(z) + s * norm.pdf(z)
neg_ei = -ei # Minimize negative EI
# Track best
sorted_idx = np.argsort(neg_ei)
if neg_ei[sorted_idx[0]] < obj_max:
best = offspring[sorted_idx[0]].copy()
obj_max = neg_ei[sorted_idx[0]]
# Select top half as parents
half = max(2, int(np.ceil(ga_pop_size / 2)))
parents = offspring[sorted_idx[:half]]
parent_fitness = neg_ei[sorted_idx[:half]]
# First half: tournament selection + SBX(muc=20) + mutation(mum=20)
t_idx = _tournament_selection(parent_fitness, parents.shape[0])
offspring1 = ga_generation(parents[t_idx], muc=20, mum=20)
# Second half: SBX(muc=2) + mutation(mum=20, prob=1/D)
offspring2 = ga_generation(parents, muc=2, mum=20)
offspring = np.vstack([offspring1, offspring2])[:ga_pop_size]
offspring = np.clip(offspring, 0, 1)
return best
def _rga_max_distance(existing_decs, D, ga_pop_size=None, ga_generations=100):
"""Maximize minimum distance to existing points using GA.
Parameters
----------
existing_decs : np.ndarray
Existing evaluated decision variables, shape (N, D)
D : int
Number of decision variables
Returns
-------
best : np.ndarray, shape (D,)
Candidate maximizing distance to existing points
"""
if ga_pop_size is None:
ga_pop_size = max(20, 10 * D)
best = None
obj_max = np.inf
from scipy.stats import qmc
sampler = qmc.LatinHypercube(d=D)
offspring = sampler.random(n=ga_pop_size)
for gen in range(ga_generations):
# Compute negative minimum distance (to minimize)
dists = cdist(offspring, existing_decs)
neg_min_dist = -np.min(dists, axis=1)
sorted_idx = np.argsort(neg_min_dist)
if neg_min_dist[sorted_idx[0]] < obj_max:
best = offspring[sorted_idx[0]].copy()
obj_max = neg_min_dist[sorted_idx[0]]
half = max(2, int(np.ceil(ga_pop_size / 2)))
parents = offspring[sorted_idx[:half]]
parent_fitness = neg_min_dist[sorted_idx[:half]]
t_idx = _tournament_selection(parent_fitness, parents.shape[0])
offspring1 = ga_generation(parents[t_idx], muc=20, mum=20)
offspring2 = ga_generation(parents, muc=2, mum=20)
offspring = np.vstack([offspring1, offspring2])[:ga_pop_size]
offspring = np.clip(offspring, 0, 1)
return best
def _tournament_selection(fitness, n_select, n_tournament=2):
"""Binary tournament selection (lower fitness is better)."""
N = len(fitness)
selected = np.zeros(n_select, dtype=int)
for k in range(n_select):
candidates = np.random.randint(0, N, size=n_tournament)
selected[k] = candidates[np.argmin(fitness[candidates])]
return selected